Weird interaction between aggregation and multiprocessing on DaskArrays
What happened?
When I try to run a modified version of the example from the dropna documentation (see below), it creates a never terminating process. To reproduce it I added a rolling operation before dropping nans and then run 4 processes using the standard library multiprocessing Pool class on DaskArrays. Running the rolling + dropna in a for loop finishes as expectedly in no time.
What did you expect to happen?
There is nothing obvious to me why this wouldn't just work unless there is a weird interaction between the Dask threads and the different processes. Using Xarray+Dask+Multiprocessing seems to work for me on other functions, it seems to be this particular combination that is problematic.
Minimal Complete Verifiable Example
import xarray as xr
import numpy as np
from multiprocessing import Pool
datasets = [xr.Dataset(
{
"temperature": (
["time", "location"],
[[23.4, 24.1], [np.nan if i>1 else 23.4, 22.1 if i<2 else np.nan], [21.8 if i<3 else np.nan, 24.2], [20.5, 25.3]],
)
},
coords={"time": [1, 2, 3, 4], "location": ["A", "B"]},
).chunk(time=2) for i in range(4)]
def process(dataset):
return dataset.rolling(dim={'time':2}).sum().dropna(dim="time", how="all").compute()
# This works as expected
dropped = []
for dataset in datasets:
dropped.append(process(dataset))
# This seems to never finish
with Pool(4) as p:
dropped = p.map(process, datasets)
MVCE confirmation
- [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
- [X] Complete example — the example is self-contained, including all data and the text of any traceback.
- [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
- [X] New issue — a search of GitHub Issues suggests this is not a duplicate.
- [ ] Recent environment — the issue occurs with the latest version of xarray and its dependencies.
Relevant log output
No response
Anything else we need to know?
I am still running on 2023.08.0 see below for more details about the environment
Environment
INSTALLED VERSIONS
commit: None python: 3.11.6 (main, Jan 25 2024, 20:42:03) [GCC 7.5.0] python-bits: 64 OS: Linux OS-release: 5.4.0-124-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.12.2 libnetcdf: 4.9.3-development
xarray: 2023.8.0 pandas: 2.1.4 numpy: 1.26.3 scipy: 1.12.0 netCDF4: 1.6.5 pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.16.1 cftime: 1.6.3 nc_time_axis: 1.4.1 PseudoNetCDF: None iris: None bottleneck: 1.3.7 dask: 2024.1.1 distributed: 2024.1.1 matplotlib: 3.8.2 cartopy: 0.22.0 seaborn: None numbagg: None fsspec: 2023.12.2 cupy: None pint: 0.23 sparse: None flox: 0.9.0 numpy_groupies: 0.10.2 setuptools: 69.0.3 pip: 23.2.1 conda: None pytest: 8.0.0 mypy: None IPython: 8.18.1 sphinx: None
That is pretty curious @saschahofmann .
I'm trying to repro. I also get a process that doesn't finish, but a) I get lots of errors, b), I also get it with
...: def process(dataset):
...: return dataset.compute()
...: with Pool(4) as p:
...: dropped = p.map(process, datasets)
...:
Process SpawnPoolWorker-3:
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
task = get()
F File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
ile "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
task = get()
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
task = get()
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process' on <module '__main__' (built-in)>
File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 367, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process' on <module '__main__' (built-in)>
Traceback (most recent call last):
...are we sure it's not just a Dask issue with running Dask operations within a Pool?
Can we try minimizing the example more, and creating an example with non-xarray Dask computations that does work?
I think this error is actually multiprocessing not working at all. I just tried on a different machine and got the same code just running their basic example
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
I would bet its the same case for you, so in that case its completely independent from dask or xarray. I am trying to figure out whats causing this second error now.
I think this caused by running the code in a Jupyter Notebook. I could run the minimum multiprocessing example from my last answer in the same environment in a normal python script with no problems. My first response was also running in a notebook but I ran it in VSCode. I am not familiar with the differences but maybe you could try and see whether you get right of at least the AttributeError @max-sixty ?
Of course, my "reproducible" example now started working in the script as well.
Thanks for investigating.
If you can generate a repro with xarray, which works without xarray, that would be great...
Alright. It seems to be the version of Xarray or Dask. I pinned them to the ones mentioned in the original post and now it also doesn't finish in the environment where it worked before.
Will play around and see what causes it
he example from the original post doesnt work even after I updated the packages but if I change the chunking e.g. like this chunk(location=1,time=2) it starts working. Weirdly enough it still wouldnt work with the older xarray version.
I simplied the example now the "process" function is only
def process(dataset):
return dataset.sum().compute()
It still fails.
An example using the same data but only using dask without xarray would look like this
from dask.dataframe import from_pandas
dds = [from_pandas(d.to_dataframe().reset_index(), chunksize=2) for d in datasets]
with Pool(4) as p:
dropped = p.map(process, dds)
It works just fine although the chunking of the data is probably different than in the example which fails which might be the real problem.
What is d is that example? Can we make an MCVE with vs without xarray?
The example was using [d for d in datasets] with datasets from the original post.
But I just ran my breaking example and it finished without no problem. I assume that something was fixed in the latest versions. Since then I upgraded xarray so if you want I close this issue!