[core] Perf: Fair dispatching of the tasks for different scheduling_class to reduce the intermediate data storage
Why are these changes needed?
This PR proposes a change in the task dispatching logic within the LocalTaskManager of Ray. The current implementation dispatches tasks based on their arrival order without considering the size of the task arguments. This can lead to suboptimal memory usage and increased task latency, especially when larger tasks remain in the queue for longer due to their late arrival.
By changing the dispatching order to prioritize tasks based on the size of their arguments, we can ensure that tasks which consume large intermediate results are processed earlier. This helps in reducing the memory footprint as large data objects are likely to be consumed and released sooner.
@nemo9cby @liuxsh9 @Superskyyy I think this can help our application to reduce the intermediate data store and reduce the object spilling.
Benchmark Results
For a script that generate the data then consume the data like this that we first generate the data then we consume the data,
import os
os.environ["RAY_DEDUP_LOGS"] = "0"
import ray
import time
ray.init(address = "10.193.182.83:6274")
import time
t1 = time.time()
@ray.remote
def generate_data(x):
print(f"Generate data: {x}")
return [x] * (1024 * 10024 )
@ray.remote
def aggregate_data(x, y, z, a, b):
print(f"Consumer data: [{x[0]} , {y[0]} , {z[0]}, {a[0]}, {b[0]}]")
return sum(x) + sum(y) + sum(z) + sum(a) + sum(b)
data = [generate_data.remote(i) for i in range(300)]
for i,d in enumerate(data):
print(i,d)
res = []
while len(data) > 1:
res.append(aggregate_data.remote(data.pop(0), data.pop(0), data.pop(0), data.pop(0), data.pop(0)))
a = ray.get(res)
print(f"time use: {time.time() - t1}")
As the execution order is not stable,ray will sometime(mostly) finish all the generate_data tasks first then it will execute the aggregate_data tasks. That is the memory usage will go to 100*(size of the data generated) and store them in the memory. The log is below which means it will finish all the tasks to generate the data then run the task to consume the data.
(generate_data pid=367605) Generate data: 0
(generate_data pid=367610) Generate data: 2
(generate_data pid=367606) Generate data: 1
(generate_data pid=367609) Generate data: 5
(generate_data pid=367613) Generate data: 3
(generate_data pid=367611) Generate data: 8
(generate_data pid=367608) Generate data: 4
(generate_data pid=367612) Generate data: 7
(generate_data pid=367607) Generate data: 6
(generate_data pid=367605) Generate data: 10
(generate_data pid=367614) Generate data: 9
(generate_data pid=367610) Generate data: 12
(generate_data pid=367609) Generate data: 11
(generate_data pid=367613) Generate data: 13
(generate_data pid=367606) Generate data: 14
(generate_data pid=367605) Generate data: 15
(generate_data pid=367611) Generate data: 20
(generate_data pid=367608) Generate data: 16
(generate_data pid=367612) Generate data: 17
(generate_data pid=367614) Generate data: 18
(generate_data pid=367607) Generate data: 19
(generate_data pid=367610) Generate data: 21
(generate_data pid=367609) Generate data: 24
(generate_data pid=367613) Generate data: 25
(generate_data pid=367612) Generate data: 22
(generate_data pid=367614) Generate data: 23
(generate_data pid=367610) Generate data: 29
(generate_data pid=367606) Generate data: 28
(generate_data pid=367605) Generate data: 26
(generate_data pid=367611) Generate data: 30
(generate_data pid=367608) Generate data: 27
(generate_data pid=367612) Generate data: 32
(generate_data pid=367607) Generate data: 31
(generate_data pid=367605) Generate data: 37
(generate_data pid=367609) Generate data: 36
(generate_data pid=367613) Generate data: 33
(generate_data pid=367608) Generate data: 35
(generate_data pid=367614) Generate data: 34
(generate_data pid=367610) Generate data: 39
(generate_data pid=367606) Generate data: 38
(generate_data pid=367612) Generate data: 40
(generate_data pid=367605) Generate data: 43
(generate_data pid=367611) Generate data: 42
(generate_data pid=367607) Generate data: 41
(generate_data pid=367609) Generate data: 44
(generate_data pid=367613) Generate data: 46
(generate_data pid=367608) Generate data: 47
(generate_data pid=367612) Generate data: 45
(generate_data pid=367610) Generate data: 48
(generate_data pid=367606) Generate data: 50
(generate_data pid=367614) Generate data: 49
(generate_data pid=367605) Generate data: 51
(generate_data pid=367609) Generate data: 52
(generate_data pid=367611) Generate data: 53
(generate_data pid=367612) Generate data: 54
(generate_data pid=367607) Generate data: 55
(generate_data pid=367606) Generate data: 57
(generate_data pid=367613) Generate data: 56
(generate_data pid=367608) Generate data: 58
(generate_data pid=367610) Generate data: 59
(generate_data pid=367611) Generate data: 61
(generate_data pid=367614) Generate data: 60
(generate_data pid=367605) Generate data: 65
(generate_data pid=367609) Generate data: 64
(generate_data pid=367612) Generate data: 63
(generate_data pid=367607) Generate data: 62
(generate_data pid=367606) Generate data: 67
(generate_data pid=367613) Generate data: 66
(generate_data pid=367611) Generate data: 69
(generate_data pid=367608) Generate data: 68
(generate_data pid=367614) Generate data: 70
(generate_data pid=367610) Generate data: 71
(generate_data pid=367609) Generate data: 72
(generate_data pid=367612) Generate data: 73
(generate_data pid=367607) Generate data: 74
(generate_data pid=367605) Generate data: 75
(generate_data pid=367606) Generate data: 79
(generate_data pid=367609) Generate data: 78
(generate_data pid=367613) Generate data: 80
(generate_data pid=367611) Generate data: 77
(generate_data pid=367608) Generate data: 76
(generate_data pid=367612) Generate data: 82
(generate_data pid=367607) Generate data: 81
(generate_data pid=367610) Generate data: 84
(generate_data pid=367605) Generate data: 85
(generate_data pid=367614) Generate data: 83
(generate_data pid=367606) Generate data: 86
(generate_data pid=367613) Generate data: 88
(generate_data pid=367612) Generate data: 89
and end with to consume all the data in the end.
(generate_data pid=369340) Generate data: 292
(generate_data pid=369427) Generate data: 293
(generate_data pid=369339) Generate data: 297
(generate_data pid=369337) Generate data: 296
(generate_data pid=369334) Generate data: 295
(generate_data pid=369335) Generate data: 298
(generate_data pid=369342) Generate data: 299
(aggregate_data pid=369341) Consumer data: [10 , 11 , 12, 13, 14]
(aggregate_data pid=369427) Consumer data: [20 , 21 , 22, 23, 24]
(aggregate_data pid=369339) Consumer data: [35 , 36 , 37, 38, 39]
(aggregate_data pid=369338) Consumer data: [15 , 16 , 17, 18, 19]
(aggregate_data pid=369337) Consumer data: [25 , 26 , 27, 28, 29]
(aggregate_data pid=369336) Consumer data: [0 , 1 , 2, 3, 4]
(aggregate_data pid=369340) Consumer data: [5 , 6 , 7, 8, 9]
(aggregate_data pid=369334) Consumer data: [30 , 31 , 32, 33, 34]
(aggregate_data pid=369335) Consumer data: [40 , 41 , 42, 43, 44]
(aggregate_data pid=369342) Consumer data: [45 , 46 , 47, 48, 49]
(aggregate_data pid=369339) Consumer data: [65 , 66 , 67, 68, 69]
(aggregate_data pid=369341) Consumer data: [55 , 56 , 57, 58, 59]
(aggregate_data pid=369337) Consumer data: [60 , 61 , 62, 63, 64]
(aggregate_data pid=369427) Consumer data: [50 , 51 , 52, 53, 54]
(aggregate_data pid=369338) Consumer data: [75 , 76 , 77, 78, 79]
(aggregate_data pid=369336) Consumer data: [70 , 71 , 72, 73, 74]
(aggregate_data pid=369340) Consumer data: [85 , 86 , 87, 88, 89]
(aggregate_data pid=369334) Consumer data: [80 , 81 , 82, 83, 84]
(aggregate_data pid=369335) Consumer data: [90 , 91 , 92, 93, 94]
(aggregate_data pid=369342) Consumer data: [95 , 96 , 97, 98, 99]
(aggregate_data pid=369341) Consumer data: [100 , 101 , 102, 103, 104]
(aggregate_data pid=369339) Consumer data: [105 , 106 , 107, 108, 109]
(aggregate_data pid=369338) Consumer data: [120 , 121 , 122, 123, 124]
(aggregate_data pid=369337) Consumer data: [110 , 111 , 112, 113, 114]
(aggregate_data pid=369427) Consumer data: [115 , 116 , 117, 118, 119]
(aggregate_data pid=369340) Consumer data: [125 , 126 , 127, 128, 129]
(aggregate_data pid=369336) Consumer data: [130 , 131 , 132, 133, 134]
After this improvement, The logs is as below, which means it will consume the data when there are generate data tasks.
generate_data pid=663564) Generate data: 6
(generate_data pid=663565) Generate data: 8
(generate_data pid=663566) Generate data: 7
(generate_data pid=663570) Generate data: 9
(generate_data pid=663568) Generate data: 2
(generate_data pid=663572) Generate data: 4
(generate_data pid=663569) Generate data: 5
(generate_data pid=663567) Generate data: 3
(generate_data pid=663571) Generate data: 0
(generate_data pid=663573) Generate data: 1
(generate_data pid=663568) Generate data: 11
(generate_data pid=663572) Generate data: 12
(generate_data pid=663569) Generate data: 13
(generate_data pid=663571) Generate data: 10
(generate_data pid=663573) Generate data: 14
(generate_data pid=663564) Generate data: 16
(generate_data pid=663565) Generate data: 15
(generate_data pid=663567) Generate data: 17
(generate_data pid=663566) Generate data: 19
(generate_data pid=663570) Generate data: 18
(generate_data pid=663565) Generate data: 22
(generate_data pid=663572) Generate data: 20
(generate_data pid=663569) Generate data: 23
(generate_data pid=663573) Generate data: 21
(generate_data pid=663564) Generate data: 25
(generate_data pid=663566) Generate data: 24
(generate_data pid=663565) Generate data: 27
(generate_data pid=663567) Generate data: 26
(generate_data pid=663573) Generate data: 28
(generate_data pid=663564) Generate data: 31
(generate_data pid=663566) Generate data: 32
(aggregate_data pid=663568) Consumer data: [5 , 6 , 7, 8, 9]
(generate_data pid=663572) Generate data: 29
(generate_data pid=663569) Generate data: 30
(aggregate_data pid=663571) Consumer data: [0 , 1 , 2, 3, 4]
(aggregate_data pid=663570) Consumer data: [10 , 11 , 12, 13, 14]
(generate_data pid=663565) Generate data: 35
(generate_data pid=663569) Generate data: 33
(generate_data pid=663567) Generate data: 34
(generate_data pid=663564) Generate data: 37
(generate_data pid=663566) Generate data: 38
(generate_data pid=663571) Generate data: 36
(generate_data pid=663570) Generate data: 39
(generate_data pid=663564) Generate data: 42
(generate_data pid=663565) Generate data: 41
(aggregate_data pid=663572) Consumer data: [15 , 16 , 17, 18, 19]
(generate_data pid=663569) Generate data: 40
(generate_data pid=663571) Generate data: 43
(aggregate_data pid=663568) Consumer data: [25 , 26 , 27, 28, 29]
(generate_data pid=663567) Generate data: 44
(aggregate_data pid=663573) Consumer data: [20 , 21 , 22, 23, 24]
(generate_data pid=663564) Generate data: 48
(generate_data pid=663566) Generate data: 45
(generate_data pid=663570) Generate data: 47
(generate_data pid=663569) Generate data: 46
(generate_data pid=663564) Generate data: 53
(generate_data pid=663570) Generate data: 52
(generate_data pid=663572) Generate data: 50
(generate_data pid=663569) Generate data: 51
(generate_data pid=663567) Generate data: 54
(generate_data pid=663571) Generate data: 49
(generate_data pid=663566) Generate data: 55
^C(generate_data pid=663564) Generate data: 57
(generate_data pid=663570) Generate data: 58
(generate_data pid=663572) Generate data: 56
(aggregate_data pid=663573) Consumer data: [30 , 31 , 32, 33, 34]
(aggregate_data pid=663565) Consumer data: [35 , 36 , 37, 38, 39]
(generate_data pid=663566) Generate data: 61
(generate_data pid=663569) Generate data: 60
(generate_data pid=663571) Generate data: 59
(generate_data pid=663564) Generate data: 62
(generate_data pid=663572) Generate data: 63
(generate_data pid=663570) Generate data: 64
(aggregate_data pid=663568) Consumer data: [40 , 41 , 42, 43, 44]
(generate_data pid=663565) Generate data: 65
(generate_data pid=663566) Generate data: 66
(generate_data pid=663569) Generate data: 68
(aggregate_data pid=663567) Consumer data: [45 , 46 , 47, 48, 49]
(generate_data pid=663571) Generate data: 67
(generate_data pid=663570) Generate data: 70
(generate_data pid=663568) Generate data: 69
(aggregate_data pid=663573) Consumer data: [50 , 51 , 52, 53, 54]
(generate_data pid=663566) Generate data: 71
(generate_data pid=663572) Generate data: 72
(generate_data pid=663571) Generate data: 73
^C(aggregate_data pid=663564) Consumer data: [55 , 56 , 57, 58, 59]
(generate_data pid=663565) Generate data: 75
(generate_data pid=663569) Generate data: 74
(generate_data pid=663566) Generate data: 76
Here is the time line.
The green one is the task to generate the data and the red one is the task to consume the data.
default (All the green task first to generate the data then all the red task to consume the data)
Improved (When task dependency is solved, red task is dispatched first just after the green tasks)
Related issue number
Checks
- [√] I've signed off every commit(by using the -s flag, i.e.,
git commit -s) in this PR. - [√] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/under the corresponding.rstfile.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [√] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [√] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
@jjyao I think this PR can help to reduce the object memory store usage by dispatching the task with large args first. But I just want to check if the task execution order matters or we should keep the task dispatching order in same scheduling class? As far as I know, I checked several tests and the out put order keeps.
Hi @Bye-legumes,
You are absolutely right that Ray is currently dumb on selecting which task to run first so it can achieve global optimal outcome. When we change the strategy, we should think about what the possible impacts on different workloads. For example, one risk of this PR is that it might cause starvation.
While we are thinking about it, what you can do now is implementing some application logic to bypass the Ray core limitations (this is exactly what Ray Data does): instead of submitting all producer and consumer tasks at once, you can submit some producer tasks -> submit some consumer tasks -> producer tasks -> consumer tasks -> repeat...
Hi @Bye-legumes,
You are absolutely right that Ray is currently dumb on selecting which task to run first so it can achieve global optimal outcome. When we change the strategy, we should think about what the possible impacts on different workloads. For example, one risk of this PR is that it might cause starvation.
While we are thinking about it, what you can do now is implementing some application logic to bypass the Ray core limitations (this is exactly what Ray Data does): instead of submitting all producer and consumer tasks at once, you can submit some producer tasks -> submit some consumer tasks -> producer tasks -> consumer tasks -> repeat...
That's true but the current problem for ray data is even we use ray data that submit the task in this way and optimized by streaming (The producer tasks and consumer tasks may in the queue at the same time), when ray core choose the task to dispatching, it will still try to submit the producer tasks as mostly it will have smaller scheduling class in the flat_hash_map (although it's unordered map) and that will lead to the increase of the memory peak. I think what we need is to make both modifications on ray data and ray core so that the improvement of the backpressure on ray data works on ray core.
My motivation to implementation in this way is the memory is unpredictable so compared with starvation, it's my acceptable that we keep the memory safe. Another way is we implement a exactly a same method similar to ray data that we choose the op based on object memory store produced.
@jjyao I think another way is to loop the tasks in the queue based on the rate. Here I use P1,P2 for the Producer tasks and C1, C2 for the Consumer tasks.
If the task in the queue is P1, P2,P3,P4,P5,P6 and C1,C2, we should loop in the way P1, P2,P3,C1, P4,P5,P6, C2 where we should loop each scheduling based on the ratio? But I worried that after dispatching of P1., there will new task P7 put into the queue so the rate is unchanged for the next dispatching. Or we just random choose the scheduling class to loop based on the rate of the task, i.e. the scheduling class with more task will have higher probability to be selected to loop first. But all this is better than choose one scheduling class and loop all the tasks inside it.
Another direction is that we can implement priorities for tasks and actors so that applications can set higher priority for consumer than producer.
Another direction is that we can implement priorities for tasks and actors so that applications can set higher priority for consumer than producer.
If we implemented in this way, we should add a new tag in the @remote to specify it's a consumer task that should be dispatched with priority, right? That args can also be use for map_batches task.
cc @stephanie-wang
Hmm this actually looks like it may be a regression. Last I remembered, Ray's LocalTaskManager should have a fairness policy based on each task's scheduling class. So if there are both generate and consume tasks queued at a raylet, the raylet will try to equalize the number of running tasks from each class.
Hmm this actually looks like it may be a regression. Last I remembered, Ray's LocalTaskManager should have a fairness policy based on each task's scheduling class. So if there are both generate and consume tasks queued at a raylet, the raylet will try to equalize the number of running tasks from each class.
- Currently there is no
fairnessalthough the flat_hash_map(unordered map) is used. But most of the time it will prioritize based on the scheduling class number from small to large. - I saw that test and the sched_cls_cap_enabled_ and sched_cls_info.capacity is used to control that. see ray-project/ray/python/ray/tests/test_basic_4.py
Hmm this actually looks like it may be a regression. Last I remembered, Ray's LocalTaskManager should have a fairness policy based on each task's scheduling class. So if there are both generate and consume tasks queued at a raylet, the raylet will try to equalize the number of running tasks from each class.
1. Currently there is no `fairness` although the flat_hash_map(unordered map) is used. But most of the time it will prioritize based on the scheduling class number from small to large. 2. I saw that test and the sched_cls_cap_enabled_ and sched_cls_info.capacity is used to control that. see [ray-project/ray/python/ray/tests/test_basic_4.py ](https://github.com/ray-project/ray/blob/63cd1aca146f3178d9f231db9e9889c10324e59a/python/ray/tests/test_basic_4.py#L217)
Yes, what I meant is that this was the previous behavior. So if the scheduling policy is no longer doing that then it is a regression. Perhaps just reverting to the previous approximately fair policy will also fix the issue, without needing to consider task args size, which as @jjyao pointed out can create other issues.
Hmm this actually looks like it may be a regression. Last I remembered, Ray's LocalTaskManager should have a fairness policy based on each task's scheduling class. So if there are both generate and consume tasks queued at a raylet, the raylet will try to equalize the number of running tasks from each class.
1. Currently there is no `fairness` although the flat_hash_map(unordered map) is used. But most of the time it will prioritize based on the scheduling class number from small to large. 2. I saw that test and the sched_cls_cap_enabled_ and sched_cls_info.capacity is used to control that. see [ray-project/ray/python/ray/tests/test_basic_4.py ](https://github.com/ray-project/ray/blob/63cd1aca146f3178d9f231db9e9889c10324e59a/python/ray/tests/test_basic_4.py#L217)Yes, what I meant is that this was the previous behavior. So if the scheduling policy is no longer doing that then it is a regression. Perhaps just reverting to the previous approximately fair policy will also fix the issue, without needing to consider task args size, which as @jjyao pointed out can create other issues.
I noticed that since the born of the local_task_manager(maybe 2 years ago?) https://github.com/ray-project/ray/blob/ab53848dfcbee53ccb7996aba9268a0de711ee85/src/ray/raylet/scheduling/local_task_manager.cc#L103, the dispatching is not fair, it will choose the scheduling class in a fixed order in this loop. mostly the task with smalled shceduling class will be dispatched first..Maybe there are other versions before 2 years?
Sure, the starvationis the issue that may be created, but it can solve the memory accumulation..As compared with memory accumulation, starvation is more acceptable to keep the node memory safe.
Hmm I'm having trouble finding the code reference now. Maybe it did not exist before...Will try to look into it more when I get some time.
In the meantime, can you prototype a fairness policy based on a task's SchedulingClass? A simple way for now is just to equalize the number of running tasks per SchedulingClass if there are more tasks than can run at the same time. I believe this should bring all of the benefit of the task args approach, and is also preferable for other cases too:
- multiplexing for tasks with no args
- for tasks with args, keeps the pipeline full
Hmm I'm having trouble finding the code reference now. Maybe it did not exist before...Will try to look into it more when I get some time.
In the meantime, can you prototype a fairness policy based on a task's
SchedulingClass? A simple way for now is just to equalize the number of running tasks per SchedulingClass if there are more tasks than can run at the same time. I believe this should bring all of the benefit of the task args approach, and is also preferable for other cases too:
- multiplexing for tasks with no args
- for tasks with args, keeps the pipeline full
Sure. I will try to give a fairness policy.
Hi, @stephanie-wang I just have a prototype of fairness policy dispatching that insert between the shcheduling class loop and worker loop. That is we skip that scheduling class when that kind of task is above the fair share. (I considered to add that in the work loop but I think that will takes more computation time when we check each work each time. Also I think even between the shcheduling class loop and worker loop it can achieve the suboptimal balance of fair share, as in long run when all workers are busy, usually each time it can only dispatch one task so it can achieve the balance. Another implementation is at here https://github.com/ray-project/ray/pull/45228).
That is my current idea and thinking. Currently it can achieve the fair dispatching to reduce the memory usage. What do you think?
Hi, @stephanie-wang I just have a prototype of fairness policy dispatching that insert between the shcheduling class loop and worker loop. That is we skip that scheduling class when that kind of task is above the fair share. (I considered to add that in the work loop but I think that will takes more computation time when we check each work each time. Also I think even between the shcheduling class loop and worker loop it can achieve the suboptimal balance of fair share, as in long run when all workers are busy, usually each time it can only dispatch one task so it can achieve the balance. Another implementation is at here #45228).
That is my current idea and thinking. Currently it can achieve the fair dispatching to reduce the memory usage. What do you think?
Ah nice, thanks! Just to confirm, this current diff is able to fix the original issue you posted?
I think the general structure looks good, but let's make sure it's work-conserving. One way is to only impose the fairness cap if the total requests in tasks_to_dispatch_ is over the node's total resource capacity. We can make this more efficient by tracking the total resources requested as tasks are added/removed from tasks_to_dispatch_.
It would also be great to add some C++ tests for this. Thanks!
Hi, @stephanie-wang I just have a prototype of fairness policy dispatching that insert between the shcheduling class loop and worker loop. That is we skip that scheduling class when that kind of task is above the fair share. (I considered to add that in the work loop but I think that will takes more computation time when we check each work each time. Also I think even between the shcheduling class loop and worker loop it can achieve the suboptimal balance of fair share, as in long run when all workers are busy, usually each time it can only dispatch one task so it can achieve the balance. Another implementation is at here #45228). That is my current idea and thinking. Currently it can achieve the fair dispatching to reduce the memory usage. What do you think?
Ah nice, thanks! Just to confirm, this current diff is able to fix the original issue you posted?
I think the general structure looks good, but let's make sure it's work-conserving. One way is to only impose the fairness cap if the total requests in
tasks_to_dispatch_is over the node's total resource capacity. We can make this more efficient by tracking the total resources requested as tasks are added/removed fromtasks_to_dispatch_.It would also be great to add some C++ tests for this. Thanks!
Thanks for your good idea! yes. Currently this can fix the issue I mentioned. Sure! I will check total resource capacity and add tests!
Thanks! Also, just remembered - it can get a bit complicated if there are multiple resource types to consider. I believe fairness is most critical for CPU right now, so let's apply the cap just for tasks requesting CPUs. For example, if we have N foo tasks that each need 1 CPU and then 1 train task that needs a GPU, we should still give the foo tasks all CPUs.
Thanks! Also, just remembered - it can get a bit complicated if there are multiple resource types to consider. I believe fairness is most critical for CPU right now, so let's apply the cap just for tasks requesting CPUs. For example, if we have N
footasks that each need 1 CPU and then 1traintask that needs a GPU, we should still give thefootasks all CPUs.
I just add the cpu track whenever we add/remove task from tasks_to_dispatch_, also I add python test which I think is more simple as there are not test for the local_task_manager.
Hmm for this test it would be better to add a C++ test for local_task_manager since the Python test is going to be dependent on timing. Not sure if we can write a Python test that will not flake.
I see, I will add a cpp test and fix what you mentioned! Thanks!!
HI, @stephanie-wang , I just add tests for the disptaching policy and I think it's ready now, can you have a review?
HI, @stephanie-wang , I just add tests for the disptaching policy and I think it's ready now, can you have a review?
Awesome, thanks! I think @rynewang is going to take over reviewing this PR from here so I'll leave it to him. I can take a last look once it's ready to merge.
Reviewed - @rynewang will take it over the line.
Approved. @stephanie-wang would you mind merging it after CI? Thanks
Approved. @stephanie-wang would you mind merging it after CI? Thanks
Sorry about that, I missed the notification. Kicked off pre-merge and will merge once it's done. Thanks for the contribution, looks great!
Approved. @stephanie-wang would you mind merging it after CI? Thanks
Sorry about that, I missed the notification. Kicked off pre-merge and will merge once it's done. Thanks for the contribution, looks great!
Hi, @stephanie-wang , I just update the master branch and it passed all CI now. Auto merge is disabled so maybe need to manually merge it? Thanks!