asyncpg icon indicating copy to clipboard operation
asyncpg copied to clipboard

Terminate function does not work when a stream is cancelled prematurely when in an anyio task group

Open JordanZimmitti opened this issue 1 year ago • 7 comments

  • asyncpg version: 0.29.0
  • PostgreSQL version: 14.10
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Yes, I can reproduce it locally
  • Python version: 3.12
  • Platform: linux and Mac OS
  • Do you use pgbouncer?: No
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: n/a
  • Can the issue be reproduced under both asyncio and uvloop?: No, the issue is not producible with asyncio only with anyio

Hi, I have a Fast API application that uses SQLAlchemy and asyncpg. I opened up a discussion on the SQLAlchemy page here (full discussion) reporting a major problem that affected my application after an update they released.

To summarize the problem and what we concluded in that discussion,

I asynchronously stream data from my API to a client using the starlette StreamingResponse class which takes advantage ofanyio under the hood. For some reason when the anyio task group gets cancelled before the stream is finished the asyncpg terminate function does not terminate the connection when the close function is used first, leaving it stuck in an idle in transaction state. This causes connections to run up eventually stopping other applications from creating new connections.

With some help from the SQLAlchemy folks, after some time working through the problem we were able to reproduce a small example of my problem using only the asyncpg and anyio libraries here (Example of problem)

Any help resolving this issue would be greatly appreciated! If you need any more info from me please do not hesitate to message back

Thanks in advance!

JordanZimmitti avatar May 23 '24 20:05 JordanZimmitti

What happens is that anyio cancels await conn.close() before it has a chance to send the termination message to Postgres. Connection does get closed from the client side, but Postgres never receives the memo. I need to think if shielding Protocol.close() makes sense, but you can use the idle_in_transaction_session_timeout server setting as a workaround for now to kill inactive sessions.

elprans avatar Jul 17 '24 21:07 elprans

@elprans Thanks for the insight! I'll experiment with idle_in_transaction_session_timeout in the meantime though I will add that the connections can run up quickly on us. Keep me posted on any updates or if I can help provide anymore info/context on the situation.

Thanks for looking into this issue!

JordanZimmitti avatar Jul 31 '24 18:07 JordanZimmitti

@elprans Hi!

I am hoping for a small update on this, as it's still an ongoing issue and forcing me not to update the version of sqlalchemy I'm using

Thanks!

JordanZimmitti avatar Oct 21 '24 18:10 JordanZimmitti

I think I hit the same issue as this and put together a self contained reproducer in https://github.com/brittlesoft/repro-starlette-sa-conn-leak

There's also a similar issue that seem to only happen with direct_tls=true connections as shown in the direct_tls_leak.py script from the repo above.

I'm not sure if it is the same issue or not so I wanted to post here before opening a possibly duplicate issue.

jraby avatar Nov 14 '24 18:11 jraby

@elprans have you given shielding Protocol.close() any more thought?

circlingthesun avatar Dec 02 '24 16:12 circlingthesun

I don't believe the idle_in_transaction_session_timeout helps if a connections isn't in the middle of a transaction.

circlingthesun avatar Dec 03 '24 07:12 circlingthesun

Monkey patching the close function appears to work in the example copied from https://github.com/sqlalchemy/sqlalchemy/discussions/11128#discussioncomment-9539469

import asyncio

import anyio
import asyncpg
from asyncpg import Connection


def monkey_patch():
    original_close = Connection.close

    # shield the close method from being cancelled
    async def close(self, *, timeout=None):
        await asyncio.shield(original_close(self, timeout=timeout))

    Connection.close = close


# Uncomment for bug
monkey_patch()

async def partitions(conn, cursor, size):
    try:
        while True:
            partition = await cursor.fetch(size)
            if partition:
                yield partition
            else:
                break
    except BaseException as e:
        print(f"Got exception {e}; attempting to close")
        try:
            await conn.close(timeout=2)
        except (
            asyncio.CancelledError,
            asyncio.TimeoutError,
            OSError,
        ) as e2:
            print(
                f"Got a second exception {e2} while trying to close, trying to terminate"
            )
            conn.terminate()
            print(
                "Conn terminate succeeded!   is the connection closed?  no?  then that's a bug in asyncpg"
            )


async def run():
    async def go_raw_anyio():
        conn = await asyncpg.connect(
            user="doadmin",
            password="bookem",
            host="localhost",
            database="defaultdb",
            server_settings={"application_name": "bug"},
        )
        _transaction = conn.transaction()
        await _transaction.start()

        cursor = await conn.cursor(
            "SELECT anon_1 FROM "
            "generate_series(1::INTEGER, 500::INTEGER) AS anon_1"
        )

        async for _ in partitions(conn, cursor, 10):
            tg.cancel_scope.cancel()

    for _ in range(5):
        async with anyio.create_task_group() as tg:
            tg.start_soon(go_raw_anyio)

    await asyncio.sleep(5)

    print("Check the number of connections in the database:")
    print(
        "SELECT * FROM pg_stat_activity WHERE datname = 'defaultdb' and application_name = 'bug';"
    )
    input("Press Enter to continue...")


if __name__ == "__main__":
    asyncio.run(run())

circlingthesun avatar Dec 03 '24 09:12 circlingthesun