Benchmarking (and debugging) sort on Dask
I’m interested in benchmarking shuffle performance on Dask and am hoping to get some feedback on my setup. The eventual goal is to run this benchmark in memory vs. out-of-core, as well as single-node vs. multi-node.
Right now, I’m having some trouble getting the benchmark to finish reliably in the single-node setting, especially for the out-of-core case. Any help is appreciated!
Here is the script that I’m using and an example of the output (dask_memlimit=-1 means auto, duration=“x” means did not finish). It uses Dask tasks to randomly generate dataframes with a single column, then runs a sort. I wanted to make sure that the full sort is included in the run time, but also wanted to avoid having to read all of the data into the client. I think that this line should accomplish that:
print(df.set_index('a').head(10, npartitions=-1))
I’m starting a dask worker with the following command:
dask-worker localhost:8786 --nthreads 1 --nprocs <nprocs> --memory-limit=<mem> --local-directory /data
On a machine with 32 cores and 128GB RAM, I was able to get this to work reliably for 1-5GB, setting nprocs=32 and the memory limit to auto. On 10GB with 100 (100MB) partitions, I started to get a lot of errors. I posted a snippet here. I killed this after several minutes.
After that, I tried playing with nprocs and the memory limit. I was able to get the script to finish on 10GB with nprocs=8 and memory-limit=1e10 (10GB), although I still get the errors that said “Event loop was unresponsive”. I also tried lowering the “target” and “memory” parameters documented here, but that didn’t seem to change anything.
Now I’m trying to get the script to finish on 100GB (with 100 1GB partitions), but I’m seeing similar errors and haven’t been able to run it successfully yet.
I have a few questions:
- Sanity check: Is the script I wrote benchmarking a shuffle correctly? Do the results so far look about right?
- Any tips on getting the 100GB test to finish successfully?
- What is the meaning of the “Event loop was unresponsive” errors and the crashes that I saw?
Minimal Complete Verifiable Example: https://gist.github.com/stephanie-wang/f4d061ca237837f79069807cf88e6c14#file-test_sort-py
Anything else we need to know?:
Environment: AWS m5.xlarge (32 vCPU, 128GB RAM)
- Dask version: 2.14.0
- Python version: 3.7.7
- Operating System: Ubuntu 18.04
- Install method (conda, pip, source): conda
@mrocklin suggested that I post here, but please let me know if I should post this somewhere else!
Here is an image of the task graph generated by this script (thanks @xcharleslin!), if it helps!

Here is an image of the task graph generated by this script (thanks @xcharleslin!), if it helps!
(This is for 10 partitions instead of 100, btw!)