task resources is deleted after call to publish_dataset on the task future since 2023.4.0
Describe the issue:
After upgrading dask[distributed] from 2023.3.x to the latest 2023.5.0 I detected in my tests that the behavior of task resources changed.
In my system one can start a computational task, logout and check later the status of the computation. To this end I heavily rely on the dask client submit and publish_dataset methods. From version 2023.4.0 it seems that publishing the dataset actually has the side effect to also completely remove the task defined resources (e.g. CPU, RAM, etc) and therefore breaking my code.
Am I using a feature that was not supposed to work? I see inthe changelog of 2023.4 that there were it seems big changes with the dask-scheduler needing similar "software and hardware". is there a link with this sentence? Thanks a lot for the great work with dask, it's an awesome platform!
Minimal Complete Verifiable Example:
# requirements.txt
dask[distributed]
pytest
pytest-asyncio
from typing import AsyncIterator, Callable, Coroutine
from distributed import Client, Scheduler, SpecCluster, Worker, get_worker
import pytest
@pytest.fixture
async def dask_spec_local_cluster(
unused_tcp_port_factory: Callable,
) -> AsyncIterator[SpecCluster]:
# in this mode we can precisely create a specific cluster
workers = {
"cpu-worker": {
"cls": Worker,
"options": {
"nthreads": 2,
"resources": {"CPU": 2, "RAM": 48e9},
},
},
"gpu-worker": {
"cls": Worker,
"options": {
"nthreads": 1,
"resources": {
"CPU": 1,
"GPU": 1,
"RAM": 48e9,
},
},
},
"bigcpu-worker": {
"cls": Worker,
"options": {
"nthreads": 1,
"resources": {
"CPU": 8,
"RAM": 768e9,
},
},
},
}
scheduler = {
"cls": Scheduler,
"options": {
"port": unused_tcp_port_factory(),
"dashboard_address": f":{unused_tcp_port_factory()}",
},
}
async with SpecCluster(
workers=workers, scheduler=scheduler, asynchronous=True, name="pytest_cluster"
) as cluster:
yield cluster
@pytest.fixture
async def dask_client(dask_spec_local_cluster: SpecCluster) -> AsyncIterator[Client]:
async with Client(
dask_spec_local_cluster.scheduler_address, asynchronous=True
) as client:
yield client
def _retrieve_annotations() -> None:
worker = get_worker()
task = worker.state.tasks.get(worker.get_current_task())
return task.annotations
RESOURCES = {"CPU": 1.0, "RAM": 123423}
async def test_submit_future_with_resources(dask_client: Client):
future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
assert future
coro = future.result()
assert isinstance(coro, Coroutine)
assert await coro == {"resources": RESOURCES}
async def test_submit_future_with_resources_and_published_dataset(dask_client: Client):
future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
assert future
await dask_client.publish_dataset(future, name="myfuture")
coro = future.result()
assert isinstance(coro, Coroutine)
assert await coro == {"resources": RESOURCES}
run using the following code
pip install -r requirements.txt
pytest --asyncio-mode=auto -xv
Anything else we need to know?:
-
using
pip install dask[distributed]==2023.3will make the test run correctly, wherepip install dask[distributed]==2023.4will fail the second test (same with 2023.5) Environment: -
Dask version: 2023.3.x vs 2023.4.x,2023.5.x
-
Python version: 3.10.10
-
Operating System: ubuntu 22.04, 20.04, windows 11
-
Install method (conda, pip, source): pip
I can reproduce and I think I triaged the issue down to https://github.com/dask/distributed/pull/7564 . I don't have a lot of time to figure out what's going on . I'm not very familiar with this area of the code but shouldn't the tasks on scheduler have the annotations not the worker ?
very familiar with this area of the code but shouldn't the tasks on scheduler have the annotations not the worker ?
Indeed. The relevant annotations are on scheduler side. However, afaik we're just forwarding the scheduler annotations so I assume something broke
@quasiben @fjetter thank you for your answers. so this is indeed a bug it seems. Should I use another API in order to get the resources from inside the worker?
Would using total_resources or available_resources work for you ? I believe these are now in the state_machine of the worker
In [6]: client.run(lambda dask_worker: dask_worker.state.total_resources)
Out[6]: {'tcp://127.0.0.1:57326': {'GPU': 2.0}}
In [7]: client.run(lambda dask_worker: dask_worker.state.available_resources)
Out[7]: {'tcp://127.0.0.1:57326': {'GPU': 2.0}}
Thanks for your answer @quasiben . So I tried with what you propose, and the exact same issue arises.
As soon as I use publish_dataset, then available_resources == total_resources. Otherwise with not calling publish_dataset it would work
But then this is still an issue and for us that means we cannot upgrade dask anymore since we do run computations in the background.
@fjetter , @quasiben : sorry to insist here but is there any news/plan on this issue?
I was now testing with distributed==2023.7.0 and it got worse.
Now even the first test fails when the fixture of the dask_client is tearing down.
The error I get is ERROR test_publish_dataset.py::test_submit_future_with_resources - ValueError: <Token var=<ContextVar name='_current_client' default=None at 0x7f036fc2cb80> at 0x7f036f08e6c0> was created in a different Context
I have a feeling this might have to do with https://github.com/dask/distributed/pull/6527, was this expected? Any info on this @fjetter ? how should I change the test to make it work again?
Sorry for the slow response time. I'm struggling a bit to understand what your problems is about.
Is this about _retrieve_annotations not returning the correct annotations? A minimal reproducer (i.e. without the pytest stuff) would be helpful
@fjetter sorry was off for some time and I missed your answer. yes the problem is that the task annotations disappear from the worker as soon as I use publish_dataset.
In the code above the differences are:
async def test_submit_future_with_resources(dask_client: Client):
future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
assert future
coro = future.result()
assert isinstance(coro, Coroutine)
assert await coro == {"resources": RESOURCES} # THIS WORKS
async def test_submit_future_with_resources_and_published_dataset(dask_client: Client):
future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
assert future
await dask_client.publish_dataset(future, name="myfuture") # CALLING THIS MAKES THE ANNOTATIONS IN WORKER DISAPPEAR
coro = future.result()
assert isinstance(coro, Coroutine)
assert await coro == {"resources": RESOURCES} # THIS DOES NOT WORK
Yes _retrieve_annotations fails when I use publish_dataset.
After some thinking I guess I can also manually pass the resources as an argument of the function instead. then I do not rely on the Worker code anymore. But I guess that is not the intention of that code.
just for info I upgraded today to version 2024.1.1 and the issue described here still happens, e.g. the resources assigned to at task are not retrievable from the worker once the task is published on the scheduler. Any news here? @fjetter , @quasiben ?
Here is a simplified reproducible code that shows the issue:
from distributed import Client, SpecCluster, Worker, get_worker
def _retrieve_annotations() -> dict | None:
import time
worker = get_worker()
task = worker.state.tasks.get(worker.get_current_task())
print(f"retrieved {task=}")
print(f"{task.annotations=}")
print("going to sleep 60 seconds")
time.sleep(60)
return task.annotations
TASK_RESOURCES = {"RAM": 123}
WORKER_RESOURCES = {"CPU": 2.0, "RAM": 223423}
if __name__ == "__main__":
cluster = SpecCluster(
workers={
"cpu-worker": {
"cls": Worker,
"options": {
# "nthreads": 2,
"resources": WORKER_RESOURCES,
},
}
}
)
dask_client = Client(cluster)
print(dask_client.dashboard_link)
future = dask_client.submit(_retrieve_annotations, resources=TASK_RESOURCES)
assert future
dask_client.publish_dataset(future, name="myfuture")
result = future.result()
assert result == {"resources": TASK_RESOURCES}
looking at the dashboard you can see that the Consumed Resources are showing 0
If you comment out dask_client.publish_dataset(future, name="myfuture") then it works as it should and shows the correct consumed resources. But then I think there is something heavily broken with the resources assignment together with publishing a dataset.
for info that is: python 3.11.7 distributed 2024.1.1
Hey @fjetter, sorry to bother again but is there some way I can help in solving that issue? I and my company cannot upgrade dask anymore because of it since 2023.3.
@fjetter @quasiben I created a test inside of the distributed repository for you to demonstrate the issue:
# distributed/tests/test_publish_dataset_issue.py
from distributed.client import Client
from distributed.scheduler import Scheduler
from distributed.utils_test import gen_cluster
from distributed.worker import get_worker
def _retrieve_annotations() -> None:
print("starting task")
worker = get_worker()
task = worker.state.tasks.get(worker.get_current_task())
print("finished task")
return task.annotations
RESOURCES = {"CPU": 1.0, "RAM": 123423}
@gen_cluster(client=True, worker_kwargs={"resources":{"CPU": 2, "RAM": 48e9}})
async def test_submit(c: Client, s: Scheduler, a, b):
future = c.submit(_retrieve_annotations, resources=RESOURCES, pure=False)
await c.publish_dataset(future, name="thefailing")
assert await c.list_datasets() == ("thefailing",)
result = await future.result()
assert result == {"resources": RESOURCES}
-
git checkout 2023.3.2then run the test --> all is fine -
git checkout 2023.4.0then run the test --> fails -
git checkout mainthen run the test --> fails - interestingly if I add
await asyncio.sleep(2)before publishing then it works, but I am not sure if that is useful.
Sorry for the silence. We've been busy with the latest release since that changed a lot. The reproducer was quite helpful and I was able to track the issue down and fix it, see https://github.com/dask/distributed/pull/8577
It's a subtle race condition. As you already noticed yourself, if you sleep briefly before the publish, everything works as expected
@fjetter thank you! that is great and I'm looking forward to seeing this in the next release and test it!