Terminate function does not work when a stream is cancelled prematurely when in an anyio task group
- asyncpg version: 0.29.0
- PostgreSQL version: 14.10
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce the issue with a local PostgreSQL install?: Yes, I can reproduce it locally
- Python version: 3.12
- Platform: linux and Mac OS
- Do you use pgbouncer?: No
- Did you install asyncpg with pip?: Yes
- If you built asyncpg locally, which version of Cython did you use?: n/a
- Can the issue be reproduced under both asyncio and uvloop?: No, the issue is not producible with asyncio only with anyio
Hi, I have a Fast API application that uses SQLAlchemy and asyncpg. I opened up a discussion on the SQLAlchemy page here (full discussion) reporting a major problem that affected my application after an update they released.
To summarize the problem and what we concluded in that discussion,
I asynchronously stream data from my API to a client using the starlette StreamingResponse class which takes advantage ofanyio under the hood. For some reason when the anyio task group gets cancelled before the stream is finished the asyncpg terminate function does not terminate the connection when the close function is used first, leaving it stuck in an idle in transaction state. This causes connections to run up eventually stopping other applications from creating new connections.
With some help from the SQLAlchemy folks, after some time working through the problem we were able to reproduce a small example of my problem using only the asyncpg and anyio libraries here (Example of problem)
Any help resolving this issue would be greatly appreciated! If you need any more info from me please do not hesitate to message back
Thanks in advance!
What happens is that anyio cancels await conn.close() before it has a chance to send the termination message to Postgres. Connection does get closed from the client side, but Postgres never receives the memo. I need to think if shielding Protocol.close() makes sense, but you can use the idle_in_transaction_session_timeout server setting as a workaround for now to kill inactive sessions.
@elprans Thanks for the insight! I'll experiment with idle_in_transaction_session_timeout in the meantime though I will add that the connections can run up quickly on us. Keep me posted on any updates or if I can help provide anymore info/context on the situation.
Thanks for looking into this issue!
@elprans Hi!
I am hoping for a small update on this, as it's still an ongoing issue and forcing me not to update the version of sqlalchemy I'm using
Thanks!
I think I hit the same issue as this and put together a self contained reproducer in https://github.com/brittlesoft/repro-starlette-sa-conn-leak
There's also a similar issue that seem to only happen with direct_tls=true connections as shown in the direct_tls_leak.py script from the repo above.
I'm not sure if it is the same issue or not so I wanted to post here before opening a possibly duplicate issue.
@elprans have you given shielding Protocol.close() any more thought?
I don't believe the idle_in_transaction_session_timeout helps if a connections isn't in the middle of a transaction.
Monkey patching the close function appears to work in the example copied from https://github.com/sqlalchemy/sqlalchemy/discussions/11128#discussioncomment-9539469
import asyncio
import anyio
import asyncpg
from asyncpg import Connection
def monkey_patch():
original_close = Connection.close
# shield the close method from being cancelled
async def close(self, *, timeout=None):
await asyncio.shield(original_close(self, timeout=timeout))
Connection.close = close
# Uncomment for bug
monkey_patch()
async def partitions(conn, cursor, size):
try:
while True:
partition = await cursor.fetch(size)
if partition:
yield partition
else:
break
except BaseException as e:
print(f"Got exception {e}; attempting to close")
try:
await conn.close(timeout=2)
except (
asyncio.CancelledError,
asyncio.TimeoutError,
OSError,
) as e2:
print(
f"Got a second exception {e2} while trying to close, trying to terminate"
)
conn.terminate()
print(
"Conn terminate succeeded! is the connection closed? no? then that's a bug in asyncpg"
)
async def run():
async def go_raw_anyio():
conn = await asyncpg.connect(
user="doadmin",
password="bookem",
host="localhost",
database="defaultdb",
server_settings={"application_name": "bug"},
)
_transaction = conn.transaction()
await _transaction.start()
cursor = await conn.cursor(
"SELECT anon_1 FROM "
"generate_series(1::INTEGER, 500::INTEGER) AS anon_1"
)
async for _ in partitions(conn, cursor, 10):
tg.cancel_scope.cancel()
for _ in range(5):
async with anyio.create_task_group() as tg:
tg.start_soon(go_raw_anyio)
await asyncio.sleep(5)
print("Check the number of connections in the database:")
print(
"SELECT * FROM pg_stat_activity WHERE datname = 'defaultdb' and application_name = 'bug';"
)
input("Press Enter to continue...")
if __name__ == "__main__":
asyncio.run(run())