xarray icon indicating copy to clipboard operation
xarray copied to clipboard

Weird interaction between aggregation and multiprocessing on DaskArrays

Open saschahofmann opened this issue 1 year ago • 8 comments

What happened?

When I try to run a modified version of the example from the dropna documentation (see below), it creates a never terminating process. To reproduce it I added a rolling operation before dropping nans and then run 4 processes using the standard library multiprocessing Pool class on DaskArrays. Running the rolling + dropna in a for loop finishes as expectedly in no time.

What did you expect to happen?

There is nothing obvious to me why this wouldn't just work unless there is a weird interaction between the Dask threads and the different processes. Using Xarray+Dask+Multiprocessing seems to work for me on other functions, it seems to be this particular combination that is problematic.

Minimal Complete Verifiable Example

import xarray as xr
import numpy as np
from multiprocessing import Pool

datasets = [xr.Dataset(
    {
        "temperature": (
            ["time", "location"],
            [[23.4, 24.1], [np.nan if i>1 else 23.4, 22.1 if i<2 else np.nan], [21.8 if i<3 else np.nan, 24.2], [20.5, 25.3]],
        )
    },
    coords={"time": [1, 2, 3, 4], "location": ["A", "B"]},
).chunk(time=2) for i in range(4)]


def process(dataset):
    return dataset.rolling(dim={'time':2}).sum().dropna(dim="time", how="all").compute()

# This works as expected
dropped = []
for dataset in datasets:
    dropped.append(process(dataset))

# This seems to never finish
with Pool(4) as p:
    dropped = p.map(process, datasets)

MVCE confirmation

  • [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [X] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [X] New issue — a search of GitHub Issues suggests this is not a duplicate.
  • [ ] Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

No response

Anything else we need to know?

I am still running on 2023.08.0 see below for more details about the environment

Environment

INSTALLED VERSIONS

commit: None python: 3.11.6 (main, Jan 25 2024, 20:42:03) [GCC 7.5.0] python-bits: 64 OS: Linux OS-release: 5.4.0-124-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.12.2 libnetcdf: 4.9.3-development

xarray: 2023.8.0 pandas: 2.1.4 numpy: 1.26.3 scipy: 1.12.0 netCDF4: 1.6.5 pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.16.1 cftime: 1.6.3 nc_time_axis: 1.4.1 PseudoNetCDF: None iris: None bottleneck: 1.3.7 dask: 2024.1.1 distributed: 2024.1.1 matplotlib: 3.8.2 cartopy: 0.22.0 seaborn: None numbagg: None fsspec: 2023.12.2 cupy: None pint: 0.23 sparse: None flox: 0.9.0 numpy_groupies: 0.10.2 setuptools: 69.0.3 pip: 23.2.1 conda: None pytest: 8.0.0 mypy: None IPython: 8.18.1 sphinx: None

saschahofmann avatar Feb 05 '24 11:02 saschahofmann

That is pretty curious @saschahofmann .

I'm trying to repro. I also get a process that doesn't finish, but a) I get lots of errors, b), I also get it with

         ...: def process(dataset):
         ...:     return dataset.compute()
         ...: with Pool(4) as p:
         ...:     dropped = p.map(process, datasets)
         ...:
Process SpawnPoolWorker-3:
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  F  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
ile "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process' on <module '__main__' (built-in)>
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process' on <module '__main__' (built-in)>
Traceback (most recent call last):

...are we sure it's not just a Dask issue with running Dask operations within a Pool?

Can we try minimizing the example more, and creating an example with non-xarray Dask computations that does work?

max-sixty avatar Feb 06 '24 16:02 max-sixty

I think this error is actually multiprocessing not working at all. I just tried on a different machine and got the same code just running their basic example

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

I would bet its the same case for you, so in that case its completely independent from dask or xarray. I am trying to figure out whats causing this second error now.

saschahofmann avatar Feb 06 '24 22:02 saschahofmann

I think this caused by running the code in a Jupyter Notebook. I could run the minimum multiprocessing example from my last answer in the same environment in a normal python script with no problems. My first response was also running in a notebook but I ran it in VSCode. I am not familiar with the differences but maybe you could try and see whether you get right of at least the AttributeError @max-sixty ?

saschahofmann avatar Feb 06 '24 23:02 saschahofmann

Of course, my "reproducible" example now started working in the script as well.

saschahofmann avatar Feb 06 '24 23:02 saschahofmann

Thanks for investigating.

If you can generate a repro with xarray, which works without xarray, that would be great...

max-sixty avatar Feb 06 '24 23:02 max-sixty

Alright. It seems to be the version of Xarray or Dask. I pinned them to the ones mentioned in the original post and now it also doesn't finish in the environment where it worked before.

Will play around and see what causes it

saschahofmann avatar Feb 13 '24 08:02 saschahofmann

he example from the original post doesnt work even after I updated the packages but if I change the chunking e.g. like this chunk(location=1,time=2) it starts working. Weirdly enough it still wouldnt work with the older xarray version.

saschahofmann avatar Feb 13 '24 09:02 saschahofmann

I simplied the example now the "process" function is only

def process(dataset):
    return dataset.sum().compute()

It still fails.

An example using the same data but only using dask without xarray would look like this

from dask.dataframe import from_pandas

dds = [from_pandas(d.to_dataframe().reset_index(), chunksize=2) for d in datasets]


with Pool(4) as p:
    dropped = p.map(process, dds)

It works just fine although the chunking of the data is probably different than in the example which fails which might be the real problem.

saschahofmann avatar Feb 13 '24 09:02 saschahofmann

What is d is that example? Can we make an MCVE with vs without xarray?

max-sixty avatar Apr 28 '24 20:04 max-sixty

The example was using [d for d in datasets] with datasets from the original post. But I just ran my breaking example and it finished without no problem. I assume that something was fixed in the latest versions. Since then I upgraded xarray so if you want I close this issue!

saschahofmann avatar Apr 29 '24 14:04 saschahofmann