What is the type of rxgo.Serialize parameter?
I want to write a program that processes data concurrently(with rxgo.WithPool and rxgo.Serialize), but outputs sequentially. So I give each data an index. code:
var s = []interface{}{4, 2, 1, 3, 5, 6}
count := 0
<-rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
for _, item := range s {
count++
next <- rxgo.Of(map[string]int{
"index": count,
"val": item.(int),
})
}
}}).
Map(func(c context.Context, i interface{}) (interface{}, error) { // map 1
fmt.Println("1:", i)
tp := i.(map[string]int)
index := tp["index"]
val := tp["val"]
// yes, I want to change the incoming value of the next step
return []int{index, val}, nil
}, rxgo.WithPool(10), rxgo.Serialize(func(i interface{}) int {
// Is the type of i an array or a map?
fmt.Println("serial:", i)
tp := i.([]int)
return tp[0]
}), rxgo.WithBufferedChannel(1)).
Map(func(c context.Context, i interface{}) (interface{}, error) { // map 2
fmt.Println("2:", i)
return i, nil
}).Run()
outputs:
1: map[index:1 val:4]
serial: map[index:1 val:4] // why? Parameter of Serialize is not the return value of map1?
panic: interface conversion: interface {} is map[string]int, not []int
In this case, I found that the parameter of Serialize can be a map or an array. How can I fix it?
Hi @dalianzhu,
The Serialize option occurs before the operator (in your case map 1, that's why you catch a map.
Instead, you may want to use the Serialize operator that will occur after the map. Yet, you need to provide a from index which is the starting index of the observable. It is necessary as elements can arrive in whatever order, we need the starting point to reorder them.
<-rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
for _, item := range s {
count++
next <- rxgo.Of(map[string]int{
"index": count,
"val": item.(int),
})
}
}}).
Map(func(c context.Context, i interface{}) (interface{}, error) { // map 1
tp := i.(map[string]int)
index := tp["index"]
val := tp["val"]
return []int{index, val}, nil
}, rxgo.WithPool(10), rxgo.WithBufferedChannel(1)).
Serialize(1, func(i interface{}) int {
tp := i.([]int)
return tp[0]
}).
Map(func(c context.Context, i interface{}) (interface{}, error) { // map 2
return i, nil
}).DoOnNext(func(i interface{}) {
fmt.Printf("%v\n", i)
})
Is it helpful?
@teivah Thank you for your reply. It works. You said, "The Serialize option occurs before the operator". But I found that the Serialize option may have two types of parameters. I modified my example, but it still runs incorrectly.
var s = []interface{}{4, 2, 1, 3, 5, 6}
count := 0
<-rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
for _, item := range s {
count++
next <- rxgo.Of(map[string]int{
"index": count,
"val": item.(int),
})
}
}}).
Map(func(c context.Context, i interface{}) (interface{}, error) { // map 1
fmt.Println("1:", i)
tp := i.(map[string]int)
index := tp["index"]
val := tp["val"]
// yes, I want to change the incoming value of the next step
return []int{index, val}, nil
}, rxgo.WithPool(10), rxgo.Serialize(func(i interface{}) int {
fmt.Println("serial:", i)
tp := i.(map[string]int) // set tp := i.([]int) will still cause panic.
return tp["index"]
}), rxgo.WithBufferedChannel(1)).
Map(func(c context.Context, i interface{}) (interface{}, error) { // map 2
fmt.Println("2:", i)
return i, nil
}).Run()
output:
1: map[index:1 val:4]
serial: map[index:1 val:4] // yes ,it is a map
1: map[index:2 val:2]
1: map[index:3 val:1]
serial: [1 4] // oh?
1: map[index:4 val:3]
1: map[index:5 val:5]
panic: interface conversion: interface {} is []int, not map[string]int
I tried the patch that @dalianzhu supplied with this test https://gist.github.com/maguro/223110ddf8b30df146fb366ce016be43 and got
=== RUN TestMapWithPoolSerialize
1: map[index:1 val:d]
s: map[1: d] -> 3
1: map[index:2 val:b]
s: map[2: b] -> 1
1: map[index:3 val:a]
s: map[3: a] -> 0
1: map[index:4 val:c]
1: map[index:5 val:e]
1: map[index:6 val:f]
s: [1 d] -> 3
s: [2 b] -> 1
s: [3 a] -> 0
s: [4 c] -> 2
s: [5 e] -> 4
s: [6 f] -> 5
2: [1 d]
3: [1 d]
3: done
done
Note that the second stage map only gets one item.
I tried the patch that @dalianzhu supplied with this test https://gist.github.com/maguro/223110ddf8b30df146fb366ce016be43 and got
=== RUN TestMapWithPoolSerialize 1: map[index:1 val:d] s: map[1: d] -> 3 1: map[index:2 val:b] s: map[2: b] -> 1 1: map[index:3 val:a] s: map[3: a] -> 0 1: map[index:4 val:c] 1: map[index:5 val:e] 1: map[index:6 val:f] s: [1 d] -> 3 s: [2 b] -> 1 s: [3 a] -> 0 s: [4 c] -> 2 s: [5 e] -> 4 s: [6 f] -> 5 2: [1 d] 3: [1 d] 3: done doneNote that the second stage map only gets one item.
I'm very sorry, this patch does not work, and it has its own problems. I have deleted it and look forward to the author to fix it
I'm very sorry, this patch does not work, and it has its own problems. I have deleted it and look forward to the author to fix it
No worries! :)
I think find the cause of the issue.
Serialize is called after map except when waiting for the first item (used to create a from for serialize). It calls the serialization function using the input item instead of the return item.
Also runFirstItem did not stop after getting the first item in the channel :
=> All items treat with it will use the item in the input for serialization in a first time and will be send to firstItemIDCh
=> All items treat with runParallel will be only serialized using the item returns by map
You can fix it like this :
https://github.com/ReactiveX/RxGo/blob/a90456ba46066d4560566844f7c2d258b0c0f432/observable.go#L410-L416
if i.Error() {
op.err(ctx, i, next, operator)
i.SendContext(ctx, notif)
} else {
tmpNext := make(chan Item, 1)
op.next(ctx, i, tmpNext, operator)
select {
case i, _ := <-tmpNext:
next <- i
Of(f(i.V)).SendContext(ctx, notif)
}
}
break loop
This way the serialize function is always called after map.
Edit : Your lib is really cool! (Used to work in JS with RxJS, a pleasure to have it in golang :) )