bb8 icon indicating copy to clipboard operation
bb8 copied to clipboard

fix: Notification to waiting clients during drop

Open vaikzs opened this issue 1 year ago • 2 comments

Following up with @djc's intuition (re: https://github.com/djc/bb8/pull/194) -- I noticed that the final call to spawn_replenishing_approvals(approvals) is incomplete due to empty approvals and returns early after several attempts to put_back keeping 5 connections waiting. Due to this, the call stack doesn't reach pool.notify.notify_one() at the end, and the pool.notify.notify_one() invocation at every connection polling (i.e.) also fails.

However, instead of adding pool.notify.notify_waiters() in put_back() which makes it redundant, replacing pool.notify.notify_one() in PoolInternals::put to pool.notify.notify_waiters() (no permit is stored to be used by the next call to notified().await) solves this. I tested with the same test case (shown below and already included in the main branch) provided by @xortive

#[tokio::test]
async fn test_broken_connections_dont_starve_pool() {
    use std::sync::RwLock;
    use std::{convert::Infallible, time::Duration};

    #[derive(Default)]
    struct ConnectionManager {
        counter: RwLock<u16>,
    }
    #[derive(Debug)]
    struct Connection;

    #[async_trait::async_trait]
    impl bb8::ManageConnection for ConnectionManager {
        type Connection = Connection;
        type Error = Infallible;

        async fn connect(&self) -> Result<Self::Connection, Self::Error> {
            Ok(Connection)
        }

        async fn is_valid(&self, _: &mut Self::Connection) -> Result<(), Self::Error> {
            Ok(())
        }

        fn has_broken(&self, _: &mut Self::Connection) -> bool {
            let mut counter = self.counter.write().unwrap();
            let res = *counter < 5;
            *counter += 1;
            res
        }
    }

    let pool = bb8::Pool::builder()
        .max_size(5)
        .connection_timeout(Duration::from_secs(10))
        .build(ConnectionManager::default())
        .await
        .unwrap();

    let mut futures = Vec::new();

    for _ in 0..10 {
        let pool = pool.clone();
        futures.push(tokio::spawn(async move {
            let conn = pool.get().await.unwrap();
            drop(conn);
        }));
    }

    for future in futures {
        future.await.unwrap();
    }
}

vaikzs avatar Jun 03 '24 23:06 vaikzs

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 74.90%. Comparing base (3190c75) to head (8a1949c). Report is 29 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #201      +/-   ##
==========================================
- Coverage   74.95%   74.90%   -0.05%     
==========================================
  Files           6        6              
  Lines         519      518       -1     
==========================================
- Hits          389      388       -1     
  Misses        130      130              

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Jun 04 '24 03:06 codecov[bot]

Sorry, I still don't completely understand why this is necessary. Why doesn't my call stack in https://github.com/djc/bb8/pull/194#issuecomment-2041180473 hold? If something is returning an empty approval iter in a state where that is not sufficient, why is that? Pointing at an existing test case doesn't help me because that test already passes today.

djc avatar Jun 04 '24 12:06 djc