Is there a way to fail a test if an explicitly created background task fails after yielding from the fixture that created it?
I'll start with an example to provide some context:
import asyncio
import contextlib
import itertools
import pytest
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._queue = queue
async def run(self):
for i in itertools.count(0):
item = await self._queue.get()
print(item)
if i == 3:
raise ValueError('oups')
self._queue.task_done()
@pytest.fixture
async def queue() -> asyncio.Queue:
return asyncio.Queue()
@pytest.fixture
async def consumer_service(queue):
consumer = Consumer(queue)
background_task = asyncio.create_task(consumer.run())
yield
background_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await run_task
@pytest.mark.asyncio
@pytest.mark.usefixtures('consumer_service')
async def test_consumer_consumes(queue: asyncio.Queue):
queue.put_nowait('I')
queue.put_nowait('Will')
queue.put_nowait('Not')
queue.put_nowait('Hang')
await queue.join()
Here, the consumer service fixture will create a background task that runs the consumer's loop. When processing the last item in the queue, a value error is raised and the background task finishes with an exception before marking the last item as done. In this state, the test will hang and there will be no indication that something went astray.
Ideally, there would be some way to react to the background task failing by canceling and failing the test but I have not seen a way to do it. The closest I got to a solution was to attach a done callback to the background task and check for an exception there but I haven't come across an API that would allow me to recover from this or a way to signal the error (without it being captured by pytest).
I get it: queue.join blocks until all queued tasks are marked as done, but the last task is never marked done due to the ValueError.
But this behaviour is totally unrelated to pytest-asyncio, right? Are you proposing a new feature to detect those hangs?
You are right, the behavior itself is not directly related to pytest-asyncio itself.
What I want to point out however is that, at least to my knowledge, there is no graceful way to handle these situations at the moment while in the middle of running a test. The detection itself can be trivial to implement however, could I recover from this gracefully when the condition has been detected? For now I've resorted to the following hack, which while not pretty, it will print a stack trace and abort the process :sweat_smile::
import asyncio
import contextlib
import itertools
import traceback
from os import abort
import pytest
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._queue = queue
async def run(self):
for i in itertools.count(0):
item = await self._queue.get()
print(item)
if i == 3:
raise ValueError('oups')
self._queue.task_done()
@pytest.fixture
async def queue() -> asyncio.Queue:
return asyncio.Queue()
@pytest.fixture
async def consumer(queue, capsys):
consumer = Consumer(queue)
run_task = asyncio.create_task(consumer.run())
def panic_crash_clbk(task: asyncio.Task):
# This will crash the whole process but it's better than hanging
# indefinitely with no indication of what went wrong.
exception = task.exception()
if exception is not None:
with capsys.disabled():
traceback.print_exception(
None, exception, exception.__traceback__
)
abort()
run_task.add_done_callback(panic_crash_clbk)
yield
run_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await run_task
@pytest.mark.asyncio
@pytest.mark.usefixtures('consumer')
async def test_consumer_consumes(queue: asyncio.Queue):
queue.put_nowait('I')
queue.put_nowait('Will')
queue.put_nowait('Not')
queue.put_nowait('Hang')
await queue.join()
(Some other things that I looked at was calling pytest.exit and pytest.fail inside the callback but they both just raise an exception that appears to get lost.)
In general I think it would be pretty good to have a way to recover from these kind of situations gracefully (e.g. fail the test, teardown fixtures, carry on and report the error at the end).
Now that I think about it, if at all feasible, should this kind of functionality be provided by pytest itself instead?
This is a tough cookie. I don't think you should use a fixture for this. If it can fail a test, it should be part of the test itself.
Also I would put a timeout on the join and let that fail the test too.
Mhm, good point about the timeout on join, it would work in this case. Adding a per-test timeout would actually solve this issue in general, though arguably it can become finicky to figure out the right amount of time to wait for each individual test.
This makes me wonder though.. asyncio.wait_for raises an asyncio.TimeoutError in the context of the task that is running the test function. This reminds me that in the context of the completion callback attached to a task we do have access to all tasks. So, actually, the better approach would be to cancel all tasks instead of aborting the program:
def cancel_tasks_on_failure(task: asyncio.Task):
def maybe_cancel_clbk(t: asyncio.Task):
exception = t.exception()
if exception is None:
return
for task in asyncio.all_tasks():
task.cancel()
task.add_done_callback(maybe_cancel_clbk)
Then I would just call cancel_tasks_on_failure for every background task that must not fail during test execution. This definitely is an improvement over the previous approach but I feel that I should really be canceling only the test and not all tasks. Actually, I think I figured it out:
@pytest.fixture
def task_watchdog(request):
def cancel_test_on_exception(task: asyncio.Task):
def maybe_cancel_clbk(t: asyncio.Task):
exception = t.exception()
if exception is None:
return
for task in asyncio.all_tasks():
coro = task.get_coro()
if coro.__qualname__ == request.function.__qualname__:
task.cancel()
return
task.add_done_callback(maybe_cancel_clbk)
return cancel_test_on_exception
Then I can just change the consumer fixture as follows:
@pytest.fixture
async def consumer(queue, task_watchdog):
consumer = Consumer(queue)
run_task = asyncio.create_task(consumer.run())
task_watchdog(run_task)
yield
run_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await run_task
Apart from the fixture name (for which I'll have to figure out a better name), I'm actually pretty happy with how it works. Would this be something worth adding to pytest-asyncio? If not then I think we can close the issue.
Also, @Tinche, I got eager and overlooked this part of your comment:
I don't think you should use a fixture for this. If it can fail a test, it should be part of the test itself.
I agree but in my experience it tends to get ugly to handle it directly in the test. In my particular case, I have tests which depend on having a couple of control loops up and running. The alternative that I see right now would be to start the loops inside the tests and every time I await something I also wait on the loops (and check for exception when I regain control).
It works okay for 1-2 test but it quickly devolves in a DRY problem if you have a lot of tests with similar requirements.
I'm not the pytest police, you can do whatever works for you :) It's just my experience pytest fixtures aren't really suited to failing tests, and I don't mean only in an asyncio context.
I'm running into a similar problem trying to test code which uses asyncio.start_server - exceptions in callbacks aren't actually brought up to the main .serve_forever coroutine, so failing tests will just hang forever. This probably shouldn't be fixed by this plugin as you say; I feel like there is something missing from the asyncio API, especially in this case
@zoopp you should run the consumer and queue.join() in a background task (future) within the test, register a callback, which stops queue.join() future on failure.