distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Consistent deadlock with `shuffle="p2p` when merging dataframes with many partitions

Open wence- opened this issue 3 years ago • 6 comments

What happened:

The code below (needs typer in addition to usual dask/distributed/pandas/numpy) pretty consistently hangs after a worker AssertionError when using the p2p shuffle option. If I have both many workers, and many partitions per worker. In particular on a 40 physical core Broadwell machine with plentiful (1TB) RAM, the following execution nearly always crashes and then hangs:

$ python p2p-shuffle-hang.py --num-workers 40 --rows-per-worker 5_000_000 --partitions-per-worker 100 --shuffle-type p2p
...
2022-08-31 06:34:33,674 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 2068)
Function:  shuffle_unpack
args:      ('737f7325f3b9ae355d9fcea2be0ab659', 2068, None)
kwargs:    {}
Exception: 'AssertionError()'

2022-08-31 06:34:37,712 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 264)
Function:  shuffle_unpack
args:      ('737f7325f3b9ae355d9fcea2be0ab659', 264, None)
kwargs:    {}
Exception: 'AssertionError()'

At which point the dashboard shows that no tasks are processing (presumably because they are waiting for these now failed tasks), cluster dump attached below.

On the same system I could also reproduce with --num-workers 4 --partitions-per-worker 1000, though I was not able to on a different system (which has a faster disk and RAM).

Minimal Complete Verifiable Example:

Reproducer
import math
from enum import Enum, IntEnum, auto
from itertools import repeat
from typing import cast

import typer
import numpy as np
import pandas as pd
from dask.base import tokenize
from dask.dataframe.core import DataFrame
from distributed import Client, LocalCluster
from distributed.client import _wait, ALL_COMPLETED

class Type(IntEnum):
    LEFT = auto()
    RIGHT = auto()


def make_chunk(chunk_index, size, npartition, typ, match_fraction):
    if typ == Type.LEFT:
        start = size * chunk_index
        stop = start + size
        key = np.arange(start, stop, dtype=np.int64)
        value = np.random.randint(0, 2000, size=size, dtype=np.int64)
        return pd.DataFrame({"key": key, "value": value})
    elif typ == Type.RIGHT:
        sub_size = size // npartition
        to_use = max(math.ceil(sub_size * match_fraction), 1)
        arrays = []
        for i in range(npartition):
            start = size * i + (sub_size * chunk_index)
            end = start + sub_size
            arrays.append(
                np.random.permutation(np.arange(start, end, dtype=np.int64)[:to_use])
            )
        key_match = np.concatenate(arrays, axis=0)
        (got,) = key_match.shape
        missing = size - got
        start = size * npartition + size * chunk_index
        end = start + missing
        key_no_match = np.arange(start, end, dtype=np.int64)
        key = np.concatenate([key_match, key_no_match], axis=0)
        value = np.random.randint(0, 2000, size=size, dtype=np.int64)
        return pd.DataFrame({"key": key, "value": value})
    else:
        raise ValueError(f"Unknown dataframe type {typ}")


def make_ddf(chunk_size, npartition, match_fraction, typ):
    meta = pd.DataFrame(
        {"key": np.empty(0, dtype=np.int64), "value": np.empty(0, dtype=np.int64)}
    )
    divisions = list(repeat(None, npartition + 1))

    name = "generate-data-" + tokenize(chunk_size, npartition, match_fraction, typ)

    dsk = {
        (name, i): (make_chunk, i, chunk_size, npartition, typ, match_fraction)
        for i in range(npartition)
    }

    return DataFrame(dsk, name, meta, divisions)


class ShuffleType(str, Enum):
    P2P = "p2p"
    DEFAULT = "default"


def main(
    num_workers: int = typer.Option(
        1, help="Number of workers"
    ),
    rows_per_worker: int = typer.Option(
        5_000_000, help="Total dataframe rows per worker"
    ),
    partitions_per_worker: int = typer.Option(
        1, help="Number of partitions per worker"
    ),
    shuffle_type: ShuffleType = typer.Option(
        None, help="Dask shuffle implementation"
    )
):
    cluster = LocalCluster(n_workers=num_workers, threads_per_worker=1)
    client = Client(cluster, set_as_default=False)

    rows_per_chunk = rows_per_worker // partitions_per_worker
    npartition = partitions_per_worker * num_workers
    left = make_ddf(rows_per_chunk, npartition, 0.3, Type.LEFT)
    right = make_ddf(rows_per_chunk, npartition, 0.3, Type.RIGHT)
    left = cast(DataFrame, client.persist(left))
    right = cast(DataFrame, client.persist(right))
    _ = client.sync(_wait, left, timeout=None, return_when=ALL_COMPLETED)
    _ = client.sync(_wait, right, timeout=None, return_when=ALL_COMPLETED)

    shuffle = {ShuffleType.DEFAULT: None}.get(shuffle_type, shuffle_type)
    merged = left.merge(right, on=["key"], how="inner", shuffle=shuffle)
    merged = client.persist(merged)
    _ = client.sync(_wait, merged, timeout=None, return_when=ALL_COMPLETED)
    del cluster
    client.close()
    client.shutdown()
    del client


if __name__ == "__main__":
    client = typer.run(main)

Environment:

  • Dask version: 2022.8.1+7.g19a51474c
  • Distributed version: 2022.8.1+29.ga5d68657
  • Python version: 3.9.13 | packaged by conda-forge | (main, May 27 2022, 16:56:21) \n[GCC 10.3.0]
  • Operating System: Ubuntu 18.04
  • Install method (conda, pip, source): conda (dask/label/dev channel)
Cluster Dump State:

cluster-dump.msgpack.gz

wence- avatar Aug 31 '22 13:08 wence-

@wence- do you have any idea what line number that assertion error is coming from?

gjoseph92 avatar Aug 31 '22 22:08 gjoseph92

@wence- do you have any idea what line number that assertion error is coming from?

I don't :(, I will try and find out.

wence- avatar Sep 01 '22 08:09 wence-

It's https://github.com/dask/distributed/blob/acf607832c7191cc496a9b4a81760170de85062c/distributed/shuffle/multi_file.py#L259

2022-09-02 06:41:47,467 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-unpack-737f7325f3b9ae355d9fcea2be0ab659', 40)
Function:  shuffle_unpack
args:      ('737f7325f3b9ae355d9fcea2be0ab659', 40, None)
kwargs:    {}
Exception: "AssertionError('Total size is 83544')"
Traceback:   File ".../distributed/shuffle/shuffle.py", line 48, in shuffle_unpack
    return get_ext().get_output_partition(id, output_partition)
  File ".../distributed/shuffle/shuffle_extension.py", line 323, in get_output_partition
    output = shuffle.get_output_partition(output_partition)
  File ".../distributed/shuffle/shuffle_extension.py", line 201, in get_output_partition
    sync(self.worker.loop, self.multi_file.flush)
  File ".../distributed/utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File ".../distributed/utils.py", line 378, in f
    result = yield future
  File ".../tornado/gen.py", line 762, in run
    value = future.result()
  File ".../distributed/shuffle/multi_file.py", line 259, in flush
    assert not self.total_size, f"Total size is {self.total_size}"

wence- avatar Sep 02 '22 13:09 wence-

Ah, thanks. Probably another concurrency bug, I'd guess. The p2p shuffle code hasn't been touched in a while, and likely won't be touched for a while, so I don't expect anyone will try to fix this. Ok if I close?

gjoseph92 avatar Sep 02 '22 19:09 gjoseph92

Probably another concurrency bug, I'd guess.

Do you mean a concurrency bug in distributed, or in "external" libraries.

The p2p shuffle code hasn't been touched in a while, and likely won't be touched for a while, so I don't expect anyone will try to fix this. Ok if I close?

I suppose this is OK, if the intention is to replace p2p shuffle code with something else. Otherwise, if this is just "low priority, but we would in theory like this to work", I would be +epsilon on leaving open (or I can schedule a reminder to check again in 3 months...)

wence- avatar Sep 05 '22 12:09 wence-

This seems like a valid bug. I don't think that it makes sense to close the issue because one person or one team chooses not to work on it. Others besides Gabe and the group around him can still jump in.

mrocklin avatar Sep 06 '22 17:09 mrocklin

@wence- in https://github.com/dask/distributed/pull/7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem

fjetter avatar Oct 28 '22 15:10 fjetter

@wence- in #7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem

Running on that branch I'm unable to reproduce the original error and (after a couple of repeats) have yet to see any hangs.

wence- avatar Oct 31 '22 17:10 wence-

Nice.

On Mon, Oct 31, 2022 at 12:22 PM Lawrence Mitchell @.***> wrote:

@wence- https://github.com/wence- in #7195 https://github.com/dask/distributed/pull/7195 I fixed a couple of deadlocks that are connected to swallowed exceptions. In that PR we should see the exceptions, if that's the problem

Running on that branch I'm unable to reproduce the original error and (after a couple of repeats) have yet to see any hangs.

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/6981#issuecomment-1297419078, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTERHM67QUVEGVBM723WF7545ANCNFSM6AAAAAAQBMT5FI . You are receiving this because you commented.Message ID: @.***>

mrocklin avatar Oct 31 '22 17:10 mrocklin

Should be closed after https://github.com/dask/distributed/pull/7268 Please reopen if the issue is not resolved

fjetter avatar Nov 11 '22 15:11 fjetter