Limit number of jobs in queue or running with dask-jobqueue
Original problem raised by @jmichel-otb in https://github.com/dask/dask-jobqueue/issues/196#issuecomment-548412595.
I currently encounter the limitation of maximum number of jobs in queue. Our maximum number of jobs in queue is 100, which is not much.
I understand that using job arrays is off the table, but maybe we could make dask jobqueue aware of those limits, and inform it that it needs to start say 200 jobs but it needs to ensure that there are never more than 100 queuing ?
I am not familiar with jobqueue code but I expect this to be a much smaller patch than enabling job arrays.
Several solutions to this:
- Submit jobs that span one complete node at once: https://github.com/dask/dask-jobqueue/issues/196#issuecomment-440036622.
- Implement job arrays in dask-jobqueue, #196,
- Add something else that prevent dask-jobqueue from having too many jobs in a queued state.
My recomandation was :
Be aware that there may not be an easy solution to this, and I'll have limited time to investigate. I also stick to my suggestion of making individual jobs bigger, and if it does not solve your problem I'd be curious as @lesteve to know why?
@jmichel-otb answered with:
@guillaumeeb the answer is that I can not get it to work with more workers per PBS job.
My current setup is 1 worker per job, with 1 process, 8 threads and 40 Gb of memory. The maximum processing power I can reach with cluster's limit is 200*8 threads (which is a lot already), but as you may know multithreaded code never reaches 100% efficiency, and there are part of the code that are not multithreaded.
I have been trying to change this setup to jobs with 8 single-threaded workers processes, 5 Gb each per job for the past week. This never reaches the end of the processing, with various errors ranging from workers restart from nanny, tornado errors, jobs killed by PBS ...
Part of the issue might be that dask does not order tasks in an optimal way for our graph (it leaves many intermediate results open instead of processing leaves nodes quickly so as to release ressources). I posted an issue about this here : dask/dask#5584
I must say that it is really hard to figure out what is going on when computation fails with this software stack (PBS, dask-jobqueue, dask). Logs are filled with clueless entries such as tornado errors, and it seems that the only option is trial and errors (and a bit of prayer).
Briefly chiming in here: I just had a chat with an HPC sysadmin who said that job arrays would not help solving the problem that forces him to limit the number of queued jobs per user.
They are enforcing these limits because the complexity of the scheduling problem ("Under the given policy, which jobs need to be started next and where to make everyone as happy as possible?") is growing with the number of jobs queued. From this point of view, job arrays are just a number of independently prioritised jobs and that would not reduce the load of the scheduler.
So my guess would be that implementing job arrays here would only "help" until the admins notice that our huge job arrays overload the scheduler.
My current setup is 1 worker per job, with 1 process, 8 threads and 40 Gb of memory.
So if you use:
PBSCluster(processes=1, cores=8, memory='40GB')
The code
PBSCluster(processes=3, cores=24, memory='120GB')
will have the same effect, but will lead to three times less jobs. Your individual jobs will probably take a little longer to start.
I have been trying to change this setup to jobs with 8 single-threaded workers processes, 5 Gb each per job for the past week.
Here again,
PBSCluster(processes=1, cores=1, memory='5GB')
PBSCluster(processes=8, cores=8, memory='40GB')
PBSCluster(processes=24, cores=24, memory='120GB')
will have identical result on dask side, but with big differences in term of launched jobs.
All this is a good way to limit the number of queued or running jobs. But it seems you're having other issues as well and you probably already understand that.
This never reaches the end of the processing, with various errors ranging from workers restart from nanny, tornado errors, jobs killed by PBS ...
Indeed, with several processes, each process has less memory so the worker will be more subject to memory errors. On a private conversation, if I understood correctly, you told me that in the processes=1,cores=8 case, you didn't launch 8 tasks in parallel, but one task was consuming the 8 cores at once (hence your sentence about multithreaded code and efficiency). In the processes=1, cores=1 cases, each individual tasks generate as much data as in the previous one, but each worker has much less memory to handle the load. That probably explains the problem you observe, and your best hope for this is the issue you opened https://github.com/dask/dask/issues/5584.
I must say that it is really hard to figure out what is going on when computation fails with this software stack (PBS, dask-jobqueue, dask). Logs are filled with clueless entries such as tornado errors, and it seems that the only option is trial and errors (and a bit of prayer).
This is a major problem that must be addressed somehow. What would you need to improve you experience in this area? Better documentation? More profiling tools? More explicative error messages? Some examples? We really need detailed inputs to improve this stack. cc @mrocklin for this last point.
So my guess would be that implementing job arrays here would only "help" until the admins notice that our huge job arrays overload the scheduler.
I'm not a batch scheduler internal expert, but I know job arrays eases the burden of scheduling job a little, but they're not a magic thing either. Some users already overload our system with too big job arrays.
I apologies because this thread is serving many different purposes ... Is there somewhere we can discuss dask usage on cluster in general ?
Indeed, with several processes, each process has less memory so the worker will be more subject to memory errors. On a private conversation, if I understood correctly, you told me that in the processes=1,cores=8 case, you didn't launch 8 tasks in parallel, but one task was consuming the 8 cores at once (hence your sentence about multithreaded code and efficiency). In the processes=1, cores=1 cases, each individual tasks generate as much data as in the previous one, but each worker has much less memory to handle the load. That probably explains the problem you observe, and your best hope for this is the issue you opened dask/dask#5584.
Yes I am aware that using PBSCluster(processes=8, cores=8, memory='40GB') means that each worker only has 5GB to process and store results. But I think it is still possible to complete the full computation. I have many A tasks (let say 10 000) each producing 12 Mo of data in 25s, and many B tasks (also 10 000, which are what I really want to compute) producing 18 Mo of data in 25s, each one depending on at most 16 (5 in average) B tasks. Moreover, those tasks are strongly organized (on a 2D grid) : neighboring B tasks will have A tasks in common, whereas far appart B tasks will not. What I need is to lure dask into finishing as much B tasks as possible so as to free A tasks results (which is not the behavior I get from graph optimization and task allocation to workers). This is the purpose of dask/dask#5584 .
I can stick with PBSCluster(processes=1, cores=8, memory='40GB') which is working for now, but it wastes ressources because part of code are not multi-threaded, and multi-threaded is less efficient. And if I want reduce the use of multi-threading and compensate with more workers, I run into both the memory problem and the jobs in queue limit.
This is a major problem that must be addressed somehow. What would you need to improve you experience in this area? Better documentation? More profiling tools? More explicative error messages? Some examples? We really need detailed inputs to improve this stack. cc @mrocklin for this last point.
For me the problem here is twofold.
First, logs are full of entries such as:
OSError: Timed out trying to connect to 'tcp://xxx:xxx:xxx:xxx' after 10 s: connect() didn't finish in time
(There is a bunch of exception blabla for each of those entries)
distributed.core - INFO - Event loop was unresponsive in Worker for 4.30s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
If I understand well, this is because my code does not release the GIL, but my process spends a lot of time in the C++ part (most of the time in fact), and I do not know how to release the GIL in that case.
This is not an issue itself, but it makes spotting real problems in logs very difficult.
Second, most of the time there is a single (or a few) root causes that make the cluster start to collapse, but the collapse itself is filling the logs with timeout error, exceptions stack and so on, spread across the many log files (one per PBS job). It is therefore very difficult to identifiy those root causes. Maybe I am doing something wrong but I would like to see those root causes pop out in the main console log, and access the full worker state where the problem occured.
This is a (potentially) a pretty complex topic. There are a couple of approaches not mentioned to date that could remedy this:
- A hybrid of
dask_jobqueueanddask-mpiso that the dask workers come in larger bundles. This is not easy to implement since you would need to become aware of MPI launchers as well as resource managers. - There's a new resource manager being developed in LLNL called flux, which is being designed to handle large numbers of jobs. It is designed so that it can (also) operate as a scheduler within a scheduler, which means sites could have a flux scheduler running on a dynamically changing number of nodes. This might be a nice model in this case, but is probably a research project on it's own.
We have already tackled some of the first issue in a library we've built that runs on top of jobqueue to handle tasks that use multiple nodes but it is very alpha, not meant for this purpose, and not integrated enough to handle scalability in a transparent way if it were used this way (a ton of workers would just report when an MPI dask job is launched).
@jmichel-otb
I apologies because this thread is serving many different purposes ... Is there somewhere we can discuss dask usage on cluster in general ?
I think this issue tracker is a good place for that.
Just to clarify for everyone, you should be using
cluster = PBSCluster(cores=1, processes=1, memory='40GB',
resource_spec='select=1:ncpus=8:mem=40GB')
because you run one task at a time that uses 8 cores (using OpenMP if I'm not mistaken?). The cores=8, processes=1 kwargs you're using just works because your task don't release the GIL.
See https://github.com/dask/dask-jobqueue/issues/231 for more details and other links.
And because the OpenMP (or anything else) parallelization of a task is not perfect, you want to disable it and parallelize more over the data, so more individual tasks at once.
if I want reduce the use of multi-threading and compensate with more workers, I run into both the memory problem and the jobs in queue limit.
PBSCluster(processes=8, cores=8, memory='40GB')
PBSCluster(processes=24, cores=24, memory='40GB')
should at least solve the jobs in queue limit problem. Have you tried with intermediate values? (I know, this is the trial/error option you don't like)
PBSCluster(processes=4, cores=8, memory='40GB') #OMP_NUM_THREAD = 2
PBSCluster(processes=2, cores=8, memory='40GB') #OMP_NUM_THREAD = 4
For the memory problem, we need to wait for dask/dask#5584.
For the logs or root causes of the problem, we need to find a way to better document things, and if possible to improve the messages and exceptions in the code. In your example, have you identified the root cause in the end (some memory problem ?), or not?