asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

Connection pool aenter get connection from queue and not return connection to user.

Open matemax opened this issue 3 years ago • 0 comments

  • asyncpg version: 0.26.0
  • PostgreSQL version: 12
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: no, local installation
  • Python version: 3.9
  • Platform: linux, centos8
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: we use poetry
  • If you built asyncpg locally, which version of Cython did you use?: no
  • Can the issue be reproduced under both asyncio and uvloop?: didn't try, problem with reproducing, we use uvloop

We use connection pool. And connections are ending after while and our app is hang up (all new response has status code 500 and error "can not get connection from pool"). We observe this behavior when postgres process several queries simultaneous long time and we try to get new connection from the pool with timeout. This is not not stable and can happen 1 per week.

We debug and see following strangeness:

Our code

ctx: PoolAcquireContext = self.pool.acquire(timeout=timeoutBudget)
try: 
    connection = await ctx.__aenter__()
    logger.debug("connection was received")
except asyncio.TimeoutError:
  ...

asyncpg code (we added log)

    async def _acquire(self, timeout):
        async def _acquire_impl():
            ch = await self._queue.get()  # type: PoolConnectionHolder
            try:
                proxy = await ch.acquire()  # type: PoolConnectionProxy
            except (Exception, asyncio.CancelledError):
                self._queue.put_nowait(ch)
                raise
            else:
                # Record the timeout, as we will apply it by default
                # in release().
                ch._timeout = timeout
                logger.debug("connection was gotten from queue")
                return proxy

        if self._closing:
            raise exceptions.InterfaceError('pool is closing')
        self._check_init()

        if timeout is None:
            return await _acquire_impl()
        else:
            return await compat.wait_for(
                _acquire_impl(), timeout=timeout)

We calculate "connection was received" messages count (let be A ) and "connection was gotten from queue" messages count (let be B ) after after app hanged up. B - A = connection pool size.

We have hypothesis that problem here . Future is completed but occurred timeout before compat.wait_for return result.

We try to use following code

       ctx: PoolAcquireContext = self.pool.acquire()
       connection = None
        try:
           async with async_timeout.timeout(timeoutBudget):
               connection = await ctx.__aenter__()
       except aTimeoutError as e:
           if connection:
               # connection valid but pool returns it too late. Exception was raised in timeout
               # __aexit__, return connection to loop
               await ctx.__aexit__()

[async-timeout](https://github.com/aio-libs/async-timeout) does not create additional task from ctx.__aenter__() coroutine. This code work for us (but maybe we're lucky).

matemax avatar Sep 19 '22 15:09 matemax