Consistent deadlock with `shuffle="p2p` when merging dataframes with many partitions
What happened:
The code below (needs typer in addition to usual dask/distributed/pandas/numpy) pretty consistently hangs after a worker AssertionError when using the p2p shuffle option. If I have both many workers, and many partitions per worker. In particular on a 40 physical core Broadwell machine with plentiful (1TB) RAM, the following execution nearly always crashes and then hangs:
$ python p2p-shuffle-hang.py --num-workers 40 --rows-per-worker 5_000_000 --partitions-per-worker 100 --shuffle-type p2p
...
2022-08-31 06:34:33,674 - distributed.worker - WARNING - Compute Failed
Key: ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 2068)
Function: shuffle_unpack
args: ('737f7325f3b9ae355d9fcea2be0ab659', 2068, None)
kwargs: {}
Exception: 'AssertionError()'
2022-08-31 06:34:37,712 - distributed.worker - WARNING - Compute Failed
Key: ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 264)
Function: shuffle_unpack
args: ('737f7325f3b9ae355d9fcea2be0ab659', 264, None)
kwargs: {}
Exception: 'AssertionError()'
At which point the dashboard shows that no tasks are processing (presumably because they are waiting for these now failed tasks), cluster dump attached below.
On the same system I could also reproduce with --num-workers 4 --partitions-per-worker 1000, though I was not able to on a different system (which has a faster disk and RAM).
Minimal Complete Verifiable Example:
Reproducer
import math
from enum import Enum, IntEnum, auto
from itertools import repeat
from typing import cast
import typer
import numpy as np
import pandas as pd
from dask.base import tokenize
from dask.dataframe.core import DataFrame
from distributed import Client, LocalCluster
from distributed.client import _wait, ALL_COMPLETED
class Type(IntEnum):
LEFT = auto()
RIGHT = auto()
def make_chunk(chunk_index, size, npartition, typ, match_fraction):
if typ == Type.LEFT:
start = size * chunk_index
stop = start + size
key = np.arange(start, stop, dtype=np.int64)
value = np.random.randint(0, 2000, size=size, dtype=np.int64)
return pd.DataFrame({"key": key, "value": value})
elif typ == Type.RIGHT:
sub_size = size // npartition
to_use = max(math.ceil(sub_size * match_fraction), 1)
arrays = []
for i in range(npartition):
start = size * i + (sub_size * chunk_index)
end = start + sub_size
arrays.append(
np.random.permutation(np.arange(start, end, dtype=np.int64)[:to_use])
)
key_match = np.concatenate(arrays, axis=0)
(got,) = key_match.shape
missing = size - got
start = size * npartition + size * chunk_index
end = start + missing
key_no_match = np.arange(start, end, dtype=np.int64)
key = np.concatenate([key_match, key_no_match], axis=0)
value = np.random.randint(0, 2000, size=size, dtype=np.int64)
return pd.DataFrame({"key": key, "value": value})
else:
raise ValueError(f"Unknown dataframe type {typ}")
def make_ddf(chunk_size, npartition, match_fraction, typ):
meta = pd.DataFrame(
{"key": np.empty(0, dtype=np.int64), "value": np.empty(0, dtype=np.int64)}
)
divisions = list(repeat(None, npartition + 1))
name = "generate-data-" + tokenize(chunk_size, npartition, match_fraction, typ)
dsk = {
(name, i): (make_chunk, i, chunk_size, npartition, typ, match_fraction)
for i in range(npartition)
}
return DataFrame(dsk, name, meta, divisions)
class ShuffleType(str, Enum):
P2P = "p2p"
DEFAULT = "default"
def main(
num_workers: int = typer.Option(
1, help="Number of workers"
),
rows_per_worker: int = typer.Option(
5_000_000, help="Total dataframe rows per worker"
),
partitions_per_worker: int = typer.Option(
1, help="Number of partitions per worker"
),
shuffle_type: ShuffleType = typer.Option(
None, help="Dask shuffle implementation"
)
):
cluster = LocalCluster(n_workers=num_workers, threads_per_worker=1)
client = Client(cluster, set_as_default=False)
rows_per_chunk = rows_per_worker // partitions_per_worker
npartition = partitions_per_worker * num_workers
left = make_ddf(rows_per_chunk, npartition, 0.3, Type.LEFT)
right = make_ddf(rows_per_chunk, npartition, 0.3, Type.RIGHT)
left = cast(DataFrame, client.persist(left))
right = cast(DataFrame, client.persist(right))
_ = client.sync(_wait, left, timeout=None, return_when=ALL_COMPLETED)
_ = client.sync(_wait, right, timeout=None, return_when=ALL_COMPLETED)
shuffle = {ShuffleType.DEFAULT: None}.get(shuffle_type, shuffle_type)
merged = left.merge(right, on=["key"], how="inner", shuffle=shuffle)
merged = client.persist(merged)
_ = client.sync(_wait, merged, timeout=None, return_when=ALL_COMPLETED)
del cluster
client.close()
client.shutdown()
del client
if __name__ == "__main__":
client = typer.run(main)
Environment:
- Dask version:
2022.8.1+7.g19a51474c - Distributed version:
2022.8.1+29.ga5d68657 - Python version:
3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21) \n[GCC 10.3.0] - Operating System: Ubuntu 18.04
- Install method (conda, pip, source): conda (
dask/label/devchannel)
Cluster Dump State:
@wence- do you have any idea what line number that assertion error is coming from?
@wence- do you have any idea what line number that assertion error is coming from?
I don't :(, I will try and find out.
It's https://github.com/dask/distributed/blob/acf607832c7191cc496a9b4a81760170de85062c/distributed/shuffle/multi_file.py#L259
2022-09-02 06:41:47,467 - distributed.worker - WARNING - Compute Failed
Key: ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 40)
Function: shuffle_unpack
args: ('737f7325f3b9ae355d9fcea2be0ab659', 40, None)
kwargs: {}
Exception: "AssertionError('Total size is 83544')"
Traceback: File ".../distributed/shuffle/shuffle.py", line 48, in shuffle_unpack
return get_ext().get_output_partition(id, output_partition)
File ".../distributed/shuffle/shuffle_extension.py", line 323, in get_output_partition
output = shuffle.get_output_partition(output_partition)
File ".../distributed/shuffle/shuffle_extension.py", line 201, in get_output_partition
sync(self.worker.loop, self.multi_file.flush)
File ".../distributed/utils.py", line 405, in sync
raise exc.with_traceback(tb)
File ".../distributed/utils.py", line 378, in f
result = yield future
File ".../tornado/gen.py", line 762, in run
value = future.result()
File ".../distributed/shuffle/multi_file.py", line 259, in flush
assert not self.total_size, f"Total size is {self.total_size}"
Ah, thanks. Probably another concurrency bug, I'd guess. The p2p shuffle code hasn't been touched in a while, and likely won't be touched for a while, so I don't expect anyone will try to fix this. Ok if I close?
Probably another concurrency bug, I'd guess.
Do you mean a concurrency bug in distributed, or in "external" libraries.
The p2p shuffle code hasn't been touched in a while, and likely won't be touched for a while, so I don't expect anyone will try to fix this. Ok if I close?
I suppose this is OK, if the intention is to replace p2p shuffle code with something else. Otherwise, if this is just "low priority, but we would in theory like this to work", I would be +epsilon on leaving open (or I can schedule a reminder to check again in 3 months...)
This seems like a valid bug. I don't think that it makes sense to close the issue because one person or one team chooses not to work on it. Others besides Gabe and the group around him can still jump in.
@wence- in https://github.com/dask/distributed/pull/7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem
@wence- in #7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem
Running on that branch I'm unable to reproduce the original error and (after a couple of repeats) have yet to see any hangs.
Nice.
On Mon, Oct 31, 2022 at 12:22 PM Lawrence Mitchell @.***> wrote:
@wence- https://github.com/wence- in #7195 https://github.com/dask/distributed/pull/7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem
Running on that branch I'm unable to reproduce the original error and (after a couple of repeats) have yet to see any hangs.
— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/6981#issuecomment-1297419078, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTERHM67QUVEGVBM723WF7545ANCNFSM6AAAAAAQBMT5FI . You are receiving this because you commented.Message ID: @.***>
Should be closed after https://github.com/dask/distributed/pull/7268 Please reopen if the issue is not resolved