RxGo icon indicating copy to clipboard operation
RxGo copied to clipboard

What is the type of rxgo.Serialize parameter?

Open dalianzhu opened this issue 5 years ago • 6 comments

I want to write a program that processes data concurrently(with rxgo.WithPool and rxgo.Serialize), but outputs sequentially. So I give each data an index. code:


var s = []interface{}{4, 2, 1, 3, 5, 6}
count := 0
<-rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
	for _, item := range s {
		count++
		next <- rxgo.Of(map[string]int{
			"index": count,
			"val":   item.(int),
		})
	}
}}).
	Map(func(c context.Context, i interface{}) (interface{}, error) { // map 1
		fmt.Println("1:", i)
		tp := i.(map[string]int)
		index := tp["index"]
		val := tp["val"]

		// yes, I want to change the incoming value of the next step
		return []int{index, val}, nil
	}, rxgo.WithPool(10), rxgo.Serialize(func(i interface{}) int {
		// Is the type of i an array or a map?
		fmt.Println("serial:", i)
		tp := i.([]int)
		return tp[0]
	}), rxgo.WithBufferedChannel(1)).
	Map(func(c context.Context, i interface{}) (interface{}, error) { // map 2
		fmt.Println("2:", i)
		return i, nil
	}).Run()

outputs:

1: map[index:1 val:4]
serial: map[index:1 val:4]  //  why? Parameter of Serialize is not the return value of map1?
panic: interface conversion: interface {} is map[string]int, not []int

In this case, I found that the parameter of Serialize can be a map or an array. How can I fix it?

dalianzhu avatar Sep 23 '20 11:09 dalianzhu

Hi @dalianzhu, The Serialize option occurs before the operator (in your case map 1, that's why you catch a map. Instead, you may want to use the Serialize operator that will occur after the map. Yet, you need to provide a from index which is the starting index of the observable. It is necessary as elements can arrive in whatever order, we need the starting point to reorder them.

<-rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		for _, item := range s {
			count++
			next <- rxgo.Of(map[string]int{
				"index": count,
				"val":   item.(int),
			})
		}
	}}).
		Map(func(c context.Context, i interface{}) (interface{}, error) { // map 1
			tp := i.(map[string]int)
			index := tp["index"]
			val := tp["val"]
			return []int{index, val}, nil
		}, rxgo.WithPool(10), rxgo.WithBufferedChannel(1)).
		Serialize(1, func(i interface{}) int {
			tp := i.([]int)
			return tp[0]
		}).
		Map(func(c context.Context, i interface{}) (interface{}, error) { // map 2
			return i, nil
		}).DoOnNext(func(i interface{}) {
		fmt.Printf("%v\n", i)
	})

Is it helpful?

teivah avatar Sep 30 '20 20:09 teivah

@teivah Thank you for your reply. It works. You said, "The Serialize option occurs before the operator". But I found that the Serialize option may have two types of parameters. I modified my example, but it still runs incorrectly.

	var s = []interface{}{4, 2, 1, 3, 5, 6}
	count := 0

	<-rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		for _, item := range s {
			count++
			next <- rxgo.Of(map[string]int{
				"index": count,
				"val":   item.(int),
			})
		}
	}}).
		Map(func(c context.Context, i interface{}) (interface{}, error) { // map 1
			fmt.Println("1:", i)
			tp := i.(map[string]int)
			index := tp["index"]
			val := tp["val"]

			// yes, I want to change the incoming value of the next step
			return []int{index, val}, nil
		}, rxgo.WithPool(10), rxgo.Serialize(func(i interface{}) int {
			fmt.Println("serial:", i)
			tp := i.(map[string]int) // set tp := i.([]int) will still cause panic.
			return tp["index"]
		}), rxgo.WithBufferedChannel(1)).
		Map(func(c context.Context, i interface{}) (interface{}, error) { // map 2
			fmt.Println("2:", i)
			return i, nil
		}).Run()

output:

1: map[index:1 val:4]
serial: map[index:1 val:4]   // yes ,it is a map
1: map[index:2 val:2]
1: map[index:3 val:1]
serial: [1 4]    // oh? 
1: map[index:4 val:3]
1: map[index:5 val:5]
panic: interface conversion: interface {} is []int, not map[string]int

dalianzhu avatar Oct 14 '20 07:10 dalianzhu

I tried the patch that @dalianzhu supplied with this test https://gist.github.com/maguro/223110ddf8b30df146fb366ce016be43 and got

=== RUN   TestMapWithPoolSerialize
  1: map[index:1 val:d]
    s: map[1: d] -> 3
  1: map[index:2 val:b]
    s: map[2: b] -> 1
  1: map[index:3 val:a]
    s: map[3: a] -> 0
  1: map[index:4 val:c]
  1: map[index:5 val:e]
  1: map[index:6 val:f]
    s: [1 d] -> 3
    s: [2 b] -> 1
    s: [3 a] -> 0
    s: [4 c] -> 2
    s: [5 e] -> 4
    s: [6 f] -> 5
  2: [1 d]
3: [1 d]
3: done
done

Note that the second stage map only gets one item.

maguro avatar Mar 27 '21 19:03 maguro

I tried the patch that @dalianzhu supplied with this test https://gist.github.com/maguro/223110ddf8b30df146fb366ce016be43 and got

=== RUN   TestMapWithPoolSerialize
  1: map[index:1 val:d]
    s: map[1: d] -> 3
  1: map[index:2 val:b]
    s: map[2: b] -> 1
  1: map[index:3 val:a]
    s: map[3: a] -> 0
  1: map[index:4 val:c]
  1: map[index:5 val:e]
  1: map[index:6 val:f]
    s: [1 d] -> 3
    s: [2 b] -> 1
    s: [3 a] -> 0
    s: [4 c] -> 2
    s: [5 e] -> 4
    s: [6 f] -> 5
  2: [1 d]
3: [1 d]
3: done
done

Note that the second stage map only gets one item.

I'm very sorry, this patch does not work, and it has its own problems. I have deleted it and look forward to the author to fix it

dalianzhu avatar Mar 29 '21 01:03 dalianzhu

I'm very sorry, this patch does not work, and it has its own problems. I have deleted it and look forward to the author to fix it

No worries! :)

maguro avatar Mar 29 '21 17:03 maguro

I think find the cause of the issue.

Serialize is called after map except when waiting for the first item (used to create a from for serialize). It calls the serialization function using the input item instead of the return item. Also runFirstItem did not stop after getting the first item in the channel : => All items treat with it will use the item in the input for serialization in a first time and will be send to firstItemIDCh => All items treat with runParallel will be only serialized using the item returns by map

You can fix it like this :

https://github.com/ReactiveX/RxGo/blob/a90456ba46066d4560566844f7c2d258b0c0f432/observable.go#L410-L416

if i.Error() {
  op.err(ctx, i, next, operator)
  i.SendContext(ctx, notif)
} else {
  tmpNext := make(chan Item, 1)
  op.next(ctx, i, tmpNext, operator)
  select {
    case i, _ := <-tmpNext:
      next <- i
      Of(f(i.V)).SendContext(ctx, notif)
  }
}
break loop

This way the serialize function is always called after map.

Edit : Your lib is really cool! (Used to work in JS with RxJS, a pleasure to have it in golang :) )

K4ST0R avatar Jan 31 '23 16:01 K4ST0R