GraphScope
GraphScope copied to clipboard
[BUG] pegasus dataflow panic when worker not in peers broadcast
Following dataflow panic when a worker which is not in peers broadcast a batch
conf.set_workers(2);
let mut results = pegasus::run(conf, || {
|input, output| {
let worker_id = input.get_worker_index();
let workers = pegasus::get_current_worker().total_peers() as u64;
let stream = input.input_from(vec![worker_id as u64])?;
stream
.flat_map(|id| {
Ok(Some(id).into_iter())
})?
.repartition(move |id| Ok(*id % (workers/2)))
.filter_map(|source| {
Ok(Some(source))
})?
.broadcast()
.sink_into(output)
}
}).expect("run job fail;");
while let Some(next) = results.next() {
let n = next.unwrap();
println!("{}", n);
}