GraphScope icon indicating copy to clipboard operation
GraphScope copied to clipboard

[BUG] pegasus dataflow panic when worker not in peers broadcast

Open lnfjpt opened this issue 3 years ago • 0 comments

Following dataflow panic when a worker which is not in peers broadcast a batch

    conf.set_workers(2);
    let mut results = pegasus::run(conf, || {
        |input, output| {
            let worker_id = input.get_worker_index();
            let workers = pegasus::get_current_worker().total_peers() as u64;
            let stream = input.input_from(vec![worker_id as u64])?;
            stream
                .flat_map(|id| {
                    Ok(Some(id).into_iter())
                })?
                .repartition(move |id| Ok(*id % (workers/2)))
                .filter_map(|source| {
                    Ok(Some(source))
                })?
                .broadcast()
                .sink_into(output)
        }

    }).expect("run job fail;");

    while let Some(next) = results.next() {
        let n = next.unwrap();
        println!("{}", n);
    }

lnfjpt avatar Aug 10 '22 02:08 lnfjpt