bb8 icon indicating copy to clipboard operation
bb8 copied to clipboard

Add support for customized future executor

Open gaoqiangz opened this issue 3 years ago • 4 comments

Support to customize the runtime associated with spawn to solve the problem of across runtimes. Scenario: The connection connected by the runtime A, use by the runtime B, destroy A before B while connection are using will cause IO driver has terminated.

gaoqiangz avatar Dec 02 '22 13:12 gaoqiangz

What runtime are you using? Why do you want to use bb8 instead of another connection pool that already supports your runtime?

djc avatar Dec 06 '22 09:12 djc

Please see this example:

  • Cargo.toml
bb8 = "0.8.0"
bb8-tiberius = { version = "0.13.0", features = ["sql-browser"] }
  • main.rs
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let host = ".\\JQZ"; //local instance `JQZ`
    let conn_str = format!(
        "Server={host};Database=master;TrustServerCertificate=true;IntegratedSecurity=true;"
    );

    let mgr = bb8_tiberius::ConnectionManager::build(conn_str.as_str())?.using_named_connection();
    let pool = bb8::Pool::builder().max_size(1).build_unchecked(mgr);

    let (tx_connected, rx_connected) = tokio::sync::oneshot::channel();
    let thread_1 = std::thread::spawn({
        let pool = pool.clone();
        move || {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async {
                //connect in this runtime
                println!("thread_1 connecting");
                let conn = pool.get().await.unwrap();
                println!("thread_1 connected");
                tx_connected.send(());
                //make `thread_2` block at the `get` call
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                //release `thread_2`
                println!("thread_1 release");
                drop(conn);
                tokio::time::sleep(tokio::time::Duration::from_secs(0)).await;
            });
            println!("thread_1 runtime dropping");
            //cause `thread_2` error
            drop(rt);
            println!("thread_1 runtime droped");
        }
    });

    let thread_2 = std::thread::spawn({
        let pool = pool.clone();
        move || {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async {
                rx_connected.await;
                //use here
                println!("thread_2 getting connection");
                let mut conn = pool.get().await.unwrap();
                println!("thread_2 querying");
                //sleep 10 seconds, should have a error `IO driver has terminated`
                let e = conn.simple_query("WAITFOR DELAY '00:00:10'").await.err();
                println!("thread_2 queried: {e:?}");
            });
        }
    });

    thread_1.join().unwrap();
    println!("thread_1 exited");
    thread_2.join().unwrap();
    println!("thread_2 exited");

    Ok(())
}
  • log
thread_1 connecting
thread_1 connected
thread_2 getting connection
thread_1 release
thread_2 querying
thread_1 runtime dropping
thread_2 queried: Some(Io { kind: Other, message: "IO driver has terminated" })
thread_1 runtime droped
thread_1 exited
thread_2 exited

gaoqiangz avatar Dec 07 '22 02:12 gaoqiangz

Okay, so you want to use a per-thread single-threaded Tokio runtime. Could have just said so...

djc avatar Dec 07 '22 08:12 djc

Yes, there are multiple runtimes in our program,such as independent timer threads.

gaoqiangz avatar Dec 07 '22 08:12 gaoqiangz