openeo-python-client icon indicating copy to clipboard operation
openeo-python-client copied to clipboard

JobManager: create & start in parallel

Open jdries opened this issue 11 months ago • 19 comments

Creating and starting a job takes some time, which means there's an interval when the jobmanager is creating new jobs and resources are potentially unused. If we can start jobs in parallel, we can decrease this. Do note that we typically have rate-limiting in place on backends, so we have to be resilient there.

jdries avatar Jan 31 '25 07:01 jdries

@jdries @soxofaan could we run the start job across multiple threads e.g. by:

from threading import Thread

    threads = []
    for backend_name in self.backends:
        backend_load = per_backend.get(backend_name, 0)
        available_slots = 
        for i in not_started.index[:available_slots]:
            thread = Thread(target=self._launch_job, args=(...)
            thread.start()
            threads.append((thread, i))

    for thread, i in threads:
        thread.join()  

another option might be to look into asyncio which would run sinfle thread but jump between the various job creations/starts without waiting? asyncio might be the more scalable solution in case we want to start 100s of jobs at once.

HansVRP avatar Jan 31 '25 10:01 HansVRP

Threading won't work I'm afraid, because in Python only one thread can be active at a time and the current "start_job" requests are blocking. So the execution would not be parallel in reality .

We would have to use a non-blocking request library like https://www.python-httpx.org, or use multiprocessing to have effective parallelism

multiprocessing might be the easiest route for now (I'm not so sure how easy it will be to switch to httpx from our classic "requests" based implementation)

soxofaan avatar Jan 31 '25 10:01 soxofaan

another option might be to look into asyncio which would run sinfle thread but jump between the various job creations/starts without waiting? asyncio might be the more scalable solution in case we want to start 100s of jobs at once

Indeed, that would probably be a more modern approach, but it's not trivial to migrate everything (or at least a well chosen subset) we already have to this new paradigm

soxofaan avatar Jan 31 '25 10:01 soxofaan

reading a bit deeper into it, If we indeed want a full performance we need to make sure that all network requests, database queries, etc can run asynchronously.

This might make the code overtly complex since as a standard we only support 2 parallel jobs...

HansVRP avatar Jan 31 '25 10:01 HansVRP

Threading won't work I'm afraid, because in Python only one thread can be active at a time and the current "start_job" requests are blocking. So the execution would not be parallel in reality

Ok I did some testing with requests in threads, and apparently it does work to do requests in parallel that way. I was probably confusing it with another threading problem I had before. So yes basic threading is probably the easiest solution here

soxofaan avatar Jan 31 '25 11:01 soxofaan

Do we know what the upper limit is on the amount of threads we could use? Being able to add 20 jobs in parallel would already make a big difference. LCFM would probably prefer 100 at once

HansVRP avatar Jan 31 '25 12:01 HansVRP

Note that it might be counter-productive to do too much in parallel as well: flooding/saturating the openeo workers and resources, triggering rate limit middleware errors that are not retried by the client (because they don't use openeo conventions), etc.

I would default to something like 5, and maybe scale up a bit if you know what you are doing.

threading tutorials typically point to thread pools (with fixed limit) to solve this easily

soxofaan avatar Jan 31 '25 16:01 soxofaan

before we needed 2 minutes and 10 seconds to start all jobs:

Image

now it is reduced to 22 seconds. Notice how initially 5 jobs were started together (equal to the max thread pool)

Image

HansVRP avatar Feb 03 '25 12:02 HansVRP

Just to be sure, this is with the STAC-based implementation of the job manager?

As pandas dataframes are not thread-safe, so unless you explicitly add locks, this might run awry at scale.

JorisCod avatar Feb 03 '25 19:02 JorisCod

Same concern here (and in PR #723) about pandas. While pandas as kind of database API was handy in the proof of concept phase of the job manager, I have the feeling it is now actually making progress harder than it could be. We had various struggles in the past when implementing new job manager features, and it is now making threading based features quite challenging. I think we should try to get/keep pandas out of the code paths that we want to run in threads.

soxofaan avatar Feb 04 '25 08:02 soxofaan

So there are a couple of options which we could explore,

Instead of pandas we could take a look at DASK or MODIS dataframes, which both support parallelism in a more suitable way.

Nevertheless I am uncertain whether these would allow parallel writing.

Another option to explore the use of defaultdict just as how it is being used for the stats...

Lastly, we could consider only supporting threading for the stac based job manager?

HansVRP avatar Feb 06 '25 09:02 HansVRP

I would avoid complexity about data shared data structures and all the consistency challenges around that: just pass the bare essentials to a worker thread to do its task (start a job): the URL to request (a string) and an access token for auth reasons. All the rest (updating/managing/persisting dataframes, tracking stats, printing progress) can be left to the main thread.

soxofaan avatar Feb 06 '25 17:02 soxofaan

I see, so making launch_job more singular focused on launching the job, and pulling the status update away from it.

That way we can concurrently launch the jobs, without needing the threaded access to the dataframe

HansVRP avatar Feb 06 '25 20:02 HansVRP

with latest simplification update:

Image

HansVRP avatar Feb 06 '25 21:02 HansVRP

Image

HansVRP avatar Feb 24 '25 12:02 HansVRP

first stress test looked promising; was able to see the queued_for_start state, and the jobs got created within 10s.

@VictorVerhaert I believe it would make sense to perform a larger stress test

HansVRP avatar Feb 24 '25 12:02 HansVRP

and the jobs got created within 10s.

Creation is typically not the problem, right? It's the job starting that can take long with the geopyspark-driver backend

soxofaan avatar Feb 24 '25 16:02 soxofaan

Creation is typically not the problem, right? It's the job starting that can take long with the geopyspark-driver backend

Did a quick test on the LCFM jobs: creation takes on average 0.8 seconds starting the job takes on average 14 seconds.

The whole dataframe threading thus does not really seem that urgent, as multithreading the starting of the jobs should pose the most important improvement already.

VictorVerhaert avatar Mar 03 '25 12:03 VictorVerhaert

@JorisCod we are in the final review mode, I already ran some small extractions and noticed a 20% more efficient launching of jobs:

any LCFM related test you could propose here as part of the definition of done?

HansVRP avatar Apr 25 '25 06:04 HansVRP

https://github.com/Open-EO/openeo-python-client/pull/736 is not merged yet, let alone approved by real use cases, so I would keep this ticket open

soxofaan avatar Aug 21 '25 09:08 soxofaan

my bad! should we shift the assignees to @Pratichhya and @VictorVerhaert for testing purposes?

HansVRP avatar Aug 21 '25 12:08 HansVRP

created here for lcfm: https://github.com/VITO-RS-Vegetation/lcfm-production/issues/279

VictorVerhaert avatar Aug 21 '25 12:08 VictorVerhaert

also created for cropsar: https://github.com/orgs/Open-EO/projects/6/views/15?pane=issue&itemId=125333006

Pratichhya avatar Aug 21 '25 13:08 Pratichhya

Test result(s) here: https://github.com/VITO-RS-Vegetation/lcfm-production/issues/279

Copy: Error that occurs every ~10 job submissions

2025-09-02 13:24:11.342 | INFO     | lcfm_sentinel1.pipeline:start_yearly_job:271 - Starting Job: j-2509021124114865ab8beec41324c60b 
for tile 14VLP
/data/users/Private/lcfm.vm/openeo-python-client/openeo/rest/connection.py:1206: UserWarning: Property filtering with unsupported properties according to collection/STAC metadata: {'productType'} (supported: dict_keys(['bands', 'eo:bands'])).
  return DataCube.load_collection(
Failed to parse API error response: [429] 'Too Many Requests' (headers: {'Retry-After': '4', 'X-Retry-In': '3.218020038s', 'Date': 'Tue, 02 Sep 2025 11:24:12 GMT', 'Content-Length': '17'})
Failed to start job 'j-2509021124114865ab8beec41324c60b': OpenEoApiPlainError('[429] Too Many Requests')
2025-09-02 13:24:13.192 | INFO     | lcfm_sentinel1.pipeline:start_yearly_job:271 - Starting Job: j-2509021124134772af9b198697eb61c3 

Which leads to start_failed state for a significant number of jobs overall.

Still, I do get to a high number of parallel running jobs (~120 up till now)

JorisCod avatar Sep 02 '25 11:09 JorisCod

Would this be covered by try except approach discussed in:

https://github.com/Open-EO/openeo-python-client/issues/802?

@soxofaan ?

HansVRP avatar Sep 02 '25 11:09 HansVRP

Test result(s) here: https://github.com/VITO-RS-Vegetation/lcfm-production/issues/279

I have no access to that project

Failed to parse API error response: [429] 'Too Many Requests' (headers: {'Retry-After': '4', 'X-Retry-In': '3.218020038s', 'Date': 'Tue, 02 Sep 2025 11:24:12 GMT', 'Content-Length': '17'}) Would this be covered by try except approach discussed in: https://github.com/Open-EO/openeo-python-client/issues/802?

I think it's unrelated as #802 is about crashing the whole run with an uncaught exception, while here it's clearly a caught/handled exception

About the [429] 'Too Many Requests' issue however: what version of the client is being used here? Because since version 0.42.0 that failed request should be retried automatically.

soxofaan avatar Sep 02 '25 15:09 soxofaan

Ok I managed to reproduce the [429] 'Too Many Requests' when I run the job manager with enough "rows" to create jobs for.

problem is that the [429] 'Too Many Requests' happens on a POST request (POST /jobs/{job_id}/results), which isn't retried yet (only GET requests are possibly retried).

soxofaan avatar Sep 08 '25 11:09 soxofaan

I pushed an update to the feature branch (hv_issue719-job-manager-threaded-job-start) to also support retrying of job starting, which should resolve the Failed to start job ... [429] Too Many Requests') problem

writing unit tests for this situation is unfortunately very hard (because of conflict between request mocking and retry handling), so any real-world testing would be greatly appreciated

soxofaan avatar Sep 08 '25 13:09 soxofaan

Ok, after some more testing, I decided to merge #736 (through #806) in 41710047e0f02ba546de14390f1d390b0e2f6a5b so this ticket can be closed 🎈

soxofaan avatar Sep 09 '25 14:09 soxofaan