distributed icon indicating copy to clipboard operation
distributed copied to clipboard

task resources is deleted after call to publish_dataset on the task future since 2023.4.0

Open sanderegg opened this issue 2 years ago • 13 comments

Describe the issue: After upgrading dask[distributed] from 2023.3.x to the latest 2023.5.0 I detected in my tests that the behavior of task resources changed. In my system one can start a computational task, logout and check later the status of the computation. To this end I heavily rely on the dask client submit and publish_dataset methods. From version 2023.4.0 it seems that publishing the dataset actually has the side effect to also completely remove the task defined resources (e.g. CPU, RAM, etc) and therefore breaking my code.

Am I using a feature that was not supposed to work? I see inthe changelog of 2023.4 that there were it seems big changes with the dask-scheduler needing similar "software and hardware". is there a link with this sentence? Thanks a lot for the great work with dask, it's an awesome platform!

Minimal Complete Verifiable Example:

# requirements.txt
dask[distributed]
pytest
pytest-asyncio
from typing import AsyncIterator, Callable, Coroutine
from distributed import Client, Scheduler, SpecCluster, Worker, get_worker
import pytest


@pytest.fixture
async def dask_spec_local_cluster(
    unused_tcp_port_factory: Callable,
) -> AsyncIterator[SpecCluster]:
    # in this mode we can precisely create a specific cluster
    workers = {
        "cpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 2,
                "resources": {"CPU": 2, "RAM": 48e9},
            },
        },
        "gpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 1,
                "resources": {
                    "CPU": 1,
                    "GPU": 1,
                    "RAM": 48e9,
                },
            },
        },
        "bigcpu-worker": {
            "cls": Worker,
            "options": {
                "nthreads": 1,
                "resources": {
                    "CPU": 8,
                    "RAM": 768e9,
                },
            },
        },
    }
    scheduler = {
        "cls": Scheduler,
        "options": {
            "port": unused_tcp_port_factory(),
            "dashboard_address": f":{unused_tcp_port_factory()}",
        },
    }

    async with SpecCluster(
        workers=workers, scheduler=scheduler, asynchronous=True, name="pytest_cluster"
    ) as cluster:
        yield cluster


@pytest.fixture
async def dask_client(dask_spec_local_cluster: SpecCluster) -> AsyncIterator[Client]:
    async with Client(
        dask_spec_local_cluster.scheduler_address, asynchronous=True
    ) as client:
        yield client


def _retrieve_annotations() -> None:
    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    return task.annotations


RESOURCES = {"CPU": 1.0, "RAM": 123423}


async def test_submit_future_with_resources(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES}


async def test_submit_future_with_resources_and_published_dataset(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    await dask_client.publish_dataset(future, name="myfuture")
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES}

run using the following code

pip install -r requirements.txt
pytest --asyncio-mode=auto -xv

Anything else we need to know?:

  • using pip install dask[distributed]==2023.3 will make the test run correctly, where pip install dask[distributed]==2023.4 will fail the second test (same with 2023.5) Environment:

  • Dask version: 2023.3.x vs 2023.4.x,2023.5.x

  • Python version: 3.10.10

  • Operating System: ubuntu 22.04, 20.04, windows 11

  • Install method (conda, pip, source): pip

sanderegg avatar May 25 '23 07:05 sanderegg

I can reproduce and I think I triaged the issue down to https://github.com/dask/distributed/pull/7564 . I don't have a lot of time to figure out what's going on . I'm not very familiar with this area of the code but shouldn't the tasks on scheduler have the annotations not the worker ?

quasiben avatar May 25 '23 14:05 quasiben

very familiar with this area of the code but shouldn't the tasks on scheduler have the annotations not the worker ?

Indeed. The relevant annotations are on scheduler side. However, afaik we're just forwarding the scheduler annotations so I assume something broke

fjetter avatar May 25 '23 16:05 fjetter

@quasiben @fjetter thank you for your answers. so this is indeed a bug it seems. Should I use another API in order to get the resources from inside the worker?

sanderegg avatar May 27 '23 07:05 sanderegg

Would using total_resources or available_resources work for you ? I believe these are now in the state_machine of the worker


In [6]: client.run(lambda dask_worker: dask_worker.state.total_resources)
Out[6]: {'tcp://127.0.0.1:57326': {'GPU': 2.0}}

In [7]: client.run(lambda dask_worker: dask_worker.state.available_resources)
Out[7]: {'tcp://127.0.0.1:57326': {'GPU': 2.0}}

quasiben avatar May 30 '23 19:05 quasiben

Thanks for your answer @quasiben . So I tried with what you propose, and the exact same issue arises. As soon as I use publish_dataset, then available_resources == total_resources. Otherwise with not calling publish_dataset it would work

sanderegg avatar May 30 '23 21:05 sanderegg

But then this is still an issue and for us that means we cannot upgrade dask anymore since we do run computations in the background.

sanderegg avatar Jun 01 '23 19:06 sanderegg

@fjetter , @quasiben : sorry to insist here but is there any news/plan on this issue?

sanderegg avatar Jun 27 '23 11:06 sanderegg

I was now testing with distributed==2023.7.0 and it got worse.

Now even the first test fails when the fixture of the dask_client is tearing down. The error I get is ERROR test_publish_dataset.py::test_submit_future_with_resources - ValueError: <Token var=<ContextVar name='_current_client' default=None at 0x7f036fc2cb80> at 0x7f036f08e6c0> was created in a different Context

I have a feeling this might have to do with https://github.com/dask/distributed/pull/6527, was this expected? Any info on this @fjetter ? how should I change the test to make it work again?

sanderegg avatar Jul 20 '23 13:07 sanderegg

Sorry for the slow response time. I'm struggling a bit to understand what your problems is about.

Is this about _retrieve_annotations not returning the correct annotations? A minimal reproducer (i.e. without the pytest stuff) would be helpful

fjetter avatar Aug 02 '23 08:08 fjetter

@fjetter sorry was off for some time and I missed your answer. yes the problem is that the task annotations disappear from the worker as soon as I use publish_dataset. In the code above the differences are:

async def test_submit_future_with_resources(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES} # THIS WORKS


async def test_submit_future_with_resources_and_published_dataset(dask_client: Client):
    future = dask_client.submit(_retrieve_annotations, resources=RESOURCES)
    assert future
    await dask_client.publish_dataset(future, name="myfuture") # CALLING THIS MAKES THE ANNOTATIONS IN WORKER DISAPPEAR
    coro = future.result()
    assert isinstance(coro, Coroutine)
    assert await coro == {"resources": RESOURCES} # THIS DOES NOT WORK

Yes _retrieve_annotations fails when I use publish_dataset.

After some thinking I guess I can also manually pass the resources as an argument of the function instead. then I do not rely on the Worker code anymore. But I guess that is not the intention of that code.

sanderegg avatar Aug 21 '23 09:08 sanderegg

just for info I upgraded today to version 2024.1.1 and the issue described here still happens, e.g. the resources assigned to at task are not retrievable from the worker once the task is published on the scheduler. Any news here? @fjetter , @quasiben ?

sanderegg avatar Jan 31 '24 07:01 sanderegg

Here is a simplified reproducible code that shows the issue:

from distributed import Client, SpecCluster, Worker, get_worker


def _retrieve_annotations() -> dict | None:
    import time

    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    print(f"retrieved {task=}")
    print(f"{task.annotations=}")
    print("going to sleep 60 seconds")
    time.sleep(60)
    return task.annotations


TASK_RESOURCES = {"RAM": 123}
WORKER_RESOURCES = {"CPU": 2.0, "RAM": 223423}

if __name__ == "__main__":
    cluster = SpecCluster(
        workers={
            "cpu-worker": {
                "cls": Worker,
                "options": {
                    # "nthreads": 2,
                    "resources": WORKER_RESOURCES,
                },
            }
        }
    )
    dask_client = Client(cluster)
    print(dask_client.dashboard_link)
    future = dask_client.submit(_retrieve_annotations, resources=TASK_RESOURCES)
    assert future
    dask_client.publish_dataset(future, name="myfuture")
    result = future.result()

    assert result == {"resources": TASK_RESOURCES}

looking at the dashboard you can see that the Consumed Resources are showing 0 image

If you comment out dask_client.publish_dataset(future, name="myfuture") then it works as it should and shows the correct consumed resources. But then I think there is something heavily broken with the resources assignment together with publishing a dataset.

for info that is: python 3.11.7 distributed 2024.1.1

sanderegg avatar Jan 31 '24 08:01 sanderegg

Hey @fjetter, sorry to bother again but is there some way I can help in solving that issue? I and my company cannot upgrade dask anymore because of it since 2023.3.

sanderegg avatar Mar 05 '24 21:03 sanderegg

@fjetter @quasiben I created a test inside of the distributed repository for you to demonstrate the issue:

# distributed/tests/test_publish_dataset_issue.py
from distributed.client import Client
from distributed.scheduler import Scheduler
from distributed.utils_test import gen_cluster

from distributed.worker import get_worker

def _retrieve_annotations() -> None:
    print("starting task")
    worker = get_worker()
    task = worker.state.tasks.get(worker.get_current_task())
    print("finished task")
    return task.annotations

RESOURCES = {"CPU": 1.0, "RAM": 123423}
@gen_cluster(client=True, worker_kwargs={"resources":{"CPU": 2, "RAM": 48e9}})
async def test_submit(c: Client, s: Scheduler, a, b):
    future = c.submit(_retrieve_annotations, resources=RESOURCES, pure=False)
    
    await c.publish_dataset(future, name="thefailing")
    assert await c.list_datasets() == ("thefailing",)

    result = await future.result()
    assert result == {"resources": RESOURCES}
  1. git checkout 2023.3.2 then run the test --> all is fine
  2. git checkout 2023.4.0 then run the test --> fails
  3. git checkout main then run the test --> fails
  4. interestingly if I add await asyncio.sleep(2) before publishing then it works, but I am not sure if that is useful.

sanderegg avatar Mar 13 '24 10:03 sanderegg

Sorry for the silence. We've been busy with the latest release since that changed a lot. The reproducer was quite helpful and I was able to track the issue down and fix it, see https://github.com/dask/distributed/pull/8577

It's a subtle race condition. As you already noticed yourself, if you sleep briefly before the publish, everything works as expected

fjetter avatar Mar 13 '24 11:03 fjetter

@fjetter thank you! that is great and I'm looking forward to seeing this in the next release and test it!

sanderegg avatar Mar 13 '24 15:03 sanderegg