sentry-python icon indicating copy to clipboard operation
sentry-python copied to clipboard

Task was destroyed but it is pending: WebsocketImplProtocol.async_data_received (Sentry-wrapped)

Open cnicodeme opened this issue 2 months ago • 9 comments

How do you use Sentry?

Sentry Saas (sentry.io)

Version

2.43.0

Steps to Reproduce

Hi team!

I'm having quite a few errors like these popping up in my Sentry account. At first, I thought it was an issue with my Websocket implementation, but looking at the stacktrace, some of the errors are from a standard GET request, such as this one: https://cnicodeme.sentry.io/share/issue/7f8aa28baa194938bd13f44953014819/ (For the Sentry team, feel free to access this ticket, and all the other that have the "Task was destroyed but it is pending" currently open in my organization).

I suspect that the Sentry system doesn't have the time to send the data to Sentry.io and is killed before, somehow. Unfortunately the error doesn't contain the original one, making it harder to trace where it originated and/or fix it.

Expected Result

It shouldn't throw a "Task was destroyed but it is pending" but instead the actual underlying error. I suspect the task of sending that error to Sentry is killed before it has the chance to deliver the payload.

Actual Result

I'm using Sanic 24.6.

Happy to share anything more that is needed

cnicodeme avatar Nov 06 '25 07:11 cnicodeme

PY-1966

linear[bot] avatar Nov 06 '25 07:11 linear[bot]

Hi @cnicodeme ,

Thanks for the link!

In general, the errors you are seeing have the form

Task was destroyed but it is pending!
task: <Task pending name='...' coro=<...> wait_for=<Future pending cb=[...]>>

The Sentry AsyncioIntegration

  • modifies the task name by appending a (Sentry-wrapped); and
  • wraps the coroutine, so some of the metadata is lost. Ideally this information should not be erased, we'll look into this.

I would be reluctant to think that the exception originates from the sentry_sdk event flushing mechanism, as events are sent in non-async code paths up to and including version 2.43.0 of sentry_sdk.

Could you provide a reproduction where the exception is raised? It would be useful to know

  • how you are running the Sanic server; in particular whether you pass loop or protocol arguments to Sanic.run(); and
  • if you are creating your own tasks, how you schedule them, and what you do with coroutines on application shutdown, or generally before the event loop is closed. For example, do you cancel coroutines on shutdown?

alexander-alderman-webb avatar Nov 07 '25 13:11 alexander-alderman-webb

Hi @alexander-alderman-webb , thank you for your response (and sorry for my late reply, I was out for a long weekend and couldn't reply properly).

It's hard for me to provide a reproducible issue since I don't have the stack of when this is happening from the "Task was destroyed but it is pending!" issue. I know that these have been increasing a lot on my account.

All of the issues I have logged at Sentry have the same 4 last (recent) stack items:

__init__.py in handle at line 948
__init__.py in handle at line 1595
__init__.py in _log at line 1585
__init__.py in error at line 1471

Then the (probably) actual file that threw (or logged) the error.

Not all, but many seems to be coming from issues related to the database, even though that might be issues from my end I need to resolve.

If I can modify the Sentry code to have a more debugging output of the situation, even by writing on /tmp while we debug this and find the original issue, that would be very helpful to me.

Let me know what I can do else.

Thanks!

cnicodeme avatar Nov 12 '25 07:11 cnicodeme

I'm still getting more and more issues created at Sentry, and I previously forgot to discuss how I create and handle tasks in the Websocket environment I run. So here's the (simplified) code I have. Hopefully this will help:

# -*- coding:utf-8 -*-
from sanic.server.websockets.impl import WebsocketImplProtocol, State
from sanic.exceptions import WebsocketClosed
from surge.database import db, get_model
from auth.models import Session
from utils import camelize
from utils.event_handler import EventHandler
from utils.stream import StreamProvider, StreamCacheResponse
from extensions import redis as redis_client
from utils.appversion import get_version
import json, asyncio, time


# @see https://gist.github.com/ahopkins/5b6d380560d8e9d49e25281ff964ed81
@app.websocket('')
async def streamable(request, ws: WebsocketImplProtocol):
    session_id = request.args.get('session', None)
    try:
        assert session_id != 'undefined'
        assert session_id != 'null'
        assert len(session_id) == 36
        assert session_id.count('-') == 4
        assert session_id.replace('-', '').isalnum()
    except AssertionError:
        # We clear it
        session_id = None

    session = await Session.find_by_token(token)
    if not session:
        # Calling "fail connection" instead of close causes an exception and make the server freeze
        return await ws.close(code=1011, reason=json.dumps({'code': 401, 'error': 'Please authenticate yourself.'}))

    agent = await session.get_agent()
    request.ctx.agent = agent

    receiver_name = 'ws-receiver:{}:{}'.format(agent.id, session_id or request.id)
    if session_id:
        # We kill a previous existing one if it exists
        await request.app.cancel_task(receiver_name, raise_exception=False)

    channel_name = redis_client.get_channel(agent)
    request.app.add_task(ws_receiver(ws, channel_name, session_id), name=receiver_name)

    client = redis_client.get_client()
    try:
        """
        Receives messages from the client
        """
        while True:
            await asyncio.sleep(0)  # IMPORTANT otherwise the scheduler will never be able to cancel that task!
            try:
                message = await ws.recv(timeout=5)
                if not message:
                    continue

                event = json.loads(message)
                action = event.get('action')
                document = event.get('value', None)

                await EventHandler.dispatch(action, document, agent_id=agent.id, organization_id=agent.organization_id)
            except json.decoder.JSONDecodeError:
                # Invalid data read, we ignore
                continue
    except Exception as e:
        logger.info('ERROR in streamable:')
        logger.info(e.__class__)
        logger.info(e)
        logger.info('')
    finally:
        await request.app.cancel_task(receiver_name, raise_exception=False)
        request.app.purge_tasks()


async def ws_receiver(ws, channel_name, session_id) -> None:
    """
    Receives messages from the PubSub system at Redis
    Responsible to send them back to the client
    """
    pubsub = None
    last_event = time.perf_counter()
    try:
        pubsub = redis_client.get_client().pubsub()
        await pubsub.subscribe(channel_name)

        while True:
            try:
                if ws.ws_proto.state == State.CLOSED:
                    break

                now = time.perf_counter()
                if now - last_event >= 30.0:
                    await ws.send('PONG')
                    last_event = now

                raw = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
                if not raw:
                    continue

                data = raw['data'].decode()
                await ws.send(data)
                last_event = now  # No need to send a PONG if we just sent data
            except asyncio.CancelledError:
                break
    except WebsocketClosed:
        pass
    except Exception as e:
        logger.info('ERROR in ws_receiver (root):')
        logger.info(e.__class__)
        logger.info(e)
        logger.info('')
    finally:
        if pubsub:
            await pubsub.unsubscribe(channel_name)
            await pubsub.reset()

cnicodeme avatar Nov 14 '25 07:11 cnicodeme

Hi @cnicodeme,

Thank you for the snippet. The most recent stack frame lines

__init__.py in handle at line 948
__init__.py in handle at line 1595
__init__.py in _log at line 1585
__init__.py in error at line 1471

refer to code in the standard library logging module. As per our logging documentation, logs with error severity show up as errors in Sentry.

Do you know what changed before you started receiving more errors of this form in Sentry? Was the

  • the version of sentry-sdk updated; or
  • other dependency versions updated; or
  • the Python version changed?

alexander-alderman-webb avatar Nov 14 '25 09:11 alexander-alderman-webb

Hi @alexander-alderman-webb ,

The Sentry-SDK definitely got updated on October 30th, from 2.14.0 to 2.43.0

Some other packages were upgraded, but has no relation at all with the issue (it's a third party library that is not linked with the issues).

The Python version was not updated.

I remain available if you need any other details.

Thank you!

cnicodeme avatar Nov 14 '25 09:11 cnicodeme

Hi @cnicodeme,

I've taken a closer look at the errors in your Sentry.

The frustrating part is: the stacktrace lines below the 4 most recent ones you have highlighted are seemingly unrelated. At least there's a wide range of different lines below the 4 most recent logging ones, so there is no pattern I can identify across events.

I am curious how the destroyed task messages are passed to logging and therefore end up in Sentry. With a minimal reproduction of a destroyed task, the messages seem to be routed to stderr directly. I'll investigate further next week and follow up then.

For others, there is a previous issue here https://github.com/getsentry/sentry-python/issues/2908.

alexander-alderman-webb avatar Nov 14 '25 10:11 alexander-alderman-webb

Hi @alexander-alderman-webb,

That's also what I found too. I suspect an underlying SQL issue as it often boils down to that, but I'm not entirely sure, and the lack of details in the Sentry issue makes the situation harder to debug.

For now, I've reverted to 2.14.0 as of Fri Nov 14 13:12:51 UTC 2025, so if you stop receiving this issue from our account, this is why.

Please also note that I've merged all the "Task was destroyed" issues together, but their origin are all different so, if you can, you can separate them if it can help better identify the issue.

I remain available if you need any other details.

cnicodeme avatar Nov 14 '25 13:11 cnicodeme

Hi @cnicodeme,

We've released https://github.com/getsentry/sentry-python/commit/ca19d6300f53178e77e77ded477a91338ad9be09 as part of version 2.46.0 of sentry-sdk. If coro_name() is the coroutine that is pending when the event loop is closed, then the error message in the Sentry issue will include the name, and therefore have the form

Task was destroyed but it is pending!
task: <Task pending name='...' coro=<coro_name()> wait_for=<Future pending cb=[...]>>

Older versions of sentry-sdk erased coro_name() with patch_asyncio.<locals>._sentry_task_factory.<locals>._task_with_sentry_span_creation(), resulting in something like

Task was destroyed but it is pending!
task: <Task pending name='...' coro=<patch_asyncio.<locals>._sentry_task_factory.<locals>._task_with_sentry_span_creation()> wait_for=<Future pending cb=[...]>>

Let us know if you are able to try out sentry-sdk version 2.46.0. It like won't reduce the volume of new issues your Sentry compared with 2.43.0, but the errors will include crucial information for finding out the root cause of the errors!

Once we know which coroutine is not handled properly when the event loop is closed, we can fix the cause if it is related to sentry-sdk.

alexander-alderman-webb avatar Nov 25 '25 12:11 alexander-alderman-webb