database-stream-processor icon indicating copy to clipboard operation
database-stream-processor copied to clipboard

Unexpected CyclicCircuit error

Open all-is-to-be-dared opened this issue 3 years ago • 2 comments

Ran into the following error (CyclicCircuit) trying to build a circuit; I'm fairly certain this should work.

Minimal reproducible example:

use dbsp::Circuit;

fn main() {
    let (_, _) = Circuit::build(|circuit| {
        let (action_stream, action_handle) = circuit.add_input_stream();

        let _ = circuit.iterate(|child| {
            let _ = action_stream
                .apply_owned(|action: i32| action)
                .delta0(child);
            Ok((
                move || Ok(true),
                ()
            ))
        }).unwrap();

        action_handle
    }).unwrap();
}

Errors with the following on the current git (dbsp = { git = "https://github.com/vmware/database-stream-processor" }):

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: CyclicCircuit { node_id: GlobalNodeId([NodeId(1)]) }', crates/pact-server/src/main.rs:24:8
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

all-is-to-be-dared avatar Sep 12 '22 16:09 all-is-to-be-dared

Thanks for reporting! This error message isn't very helpful in this case. It's not currently enforced by the type system, but the correct way to use a stream from an outer scope inside a nested scope (created by iterate) is to import it into the nested scope using delta0 (or another ImportOperator, but delta0 is the only one we have today). A nested circuit runs on its own logical clock, which can tick multiple times for each parent clock cycle. An ImportOperator bridges the two clock domains, reading the value from the parent stream once per parent clock tick and yielding a value to its output stream for each child clock tick.

The following doesn't crash (I placed apply_owned after delta0).

use dbsp::Circuit;

fn main() {
    let (_, _) = Circuit::build(|circuit| {
        let (action_stream, action_handle) = circuit.add_input_stream();

        let _ = circuit.iterate(|child| {
            let _ = action_stream
                .delta0(child)
                .apply_owned(|action: i32| action);
            Ok((
                move || Ok(true),
                ()
            ))
        }).unwrap();

        action_handle
    }).unwrap();
}

ryzhyk avatar Sep 12 '22 17:09 ryzhyk

Yeah, I also noticed that doing

let action_stream_2 = action_stream.apply_owned(|action: i32| action);
let _ = circuit.iterate(|child| {
  let _ = action_stream_2.delta0(child);
  /* ... */

seems to work as well.

all-is-to-be-dared avatar Sep 12 '22 17:09 all-is-to-be-dared