[Ray] Add elasticity and fault tolerance features to jobs launched on ray cluster
Two features for Elastic Distributed Training are added to job launched by TorchX on Ray Cluster in this PR:
- Fault Tolerance - Node failure throws RayActorError which can be captured. Placement Groups have built-in fault tolerance, and can recover from node failure automatically.
- Elasticity - the execution of placement groups are pending tasks that will be scheduled by GCS when resources become availiable.
The logic of the new ray_driver.py is in the plot below:

Test plan:
[Note]: This PR is related to the previous PR #559. All future changes will be submitted to this PR.
@ljjsalt @d4l3k
I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.
you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.
PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.
additionally, we can @remote these fns so both of these can be run in parallel
https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue
@ljjsalt @d4l3k
I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.
you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.
PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.
additionally, we can @Remote these fns so both of these can be run in parallel
https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue
@atinsood
The reason that we should use Placement Group is: if we only use ray actors here, even though we can rerun the failed actor after it throws a RayActorError. But this job could potentially lose the computation resource it used to have, then fail the job even the node is recovered.
For example, there are 3 nodes each with 1 cpu, job 1 requires minimum nodes of 3, and it’s running with all 3 nodes, another job 2 launched later and it requires 1 cpu, and becomes a pending task. Once a node failure happens, we have to rerun the actor which becomes a pending task after job 2, and job 2 will take that node, job 1 will fail since there aren't enough nodes to restart.
Since placement groups rescheduling have the highest priority, this situation won't happen.
I didn't get it why to use queue, since we didn't use any inter process communication here.
I understand your concern in using ray.wait here. So I wrapped the return value of command(remote) actors with RayResult class. If anything unexpected happen, it throws an error, we will know right away. But based on the logic that ray.wait supposed to work, I think it's right to use it here.
Besides, considering each command actor is a finite state machine that has four states(SCHEDULING, FAILING, RUNNING, TASK_COMPLETED).
For each command actor, it starts with SCHEDULING state, and actor.schedule.remote() becomes an asynchronous step function. When it's SCHEDULING, the next state is RUNNING. When it's RUNNING, the next state is TASK_COMPLETED or FAILING. When it's FAILING, the next state is SCHEDULING.
The reason that we should use Placement Group is
yeah, not disagreeing on that. I am just thinking how do we manage the interaction between pg creation and command actor creation
@ray.remote
class PlacementGroupManager(object):
def __init__(self, min_nodes, max_nodes, queue):
self.queue = queue
pass
def run() -> PlacementGroup:
# step 1. create a PG with min replicas
#put the initial pg in the queue
self.queue.put(ready)
# step 2. while loop to keep create the rest of PGs incrementatlly,
#go through the rest of the pgs one by one and keep adding them to the queue
#we can deal with the logic of pg failure here or when to stop pg creation if needed
@ray.remote
class CommandActorManager(object):
def __init__(self, queue, active_workers):
self.queue = queue
self.active_workers = []
pass
def run(self):
#some logic on when to stop the while loop, either a poison pill or a signal actor
while True:
self.queue.get() #get notified that the pg was created
# logic to create the command actor and goes here
active_workers.append(command_actor.exec_module.remote()) # or may be use another queue, pretty sure there is a better way to deal with this
#once you are done reading all the PGs from the queue or if the queue
def main() -> None: # pragma: no cover
actors: List[RayActor] = load_actor_json("actors.json")
# pyre-fixme[16]: Module `worker` has no attribute `init`.
ray.init(address="auto", namespace="torchx-ray")
q = Queue() # we can set the set of the queue upfront
pg_manager = PlacementGroupManager(2,4,q) #min, max and queue
pg_manager.run.remote()
cmd_actor_manager = CommandActorManager(q, active_workers)
cmd_actor_manager.run.remote()
# Await return result of remote ray function
while len(active_workers) > 0:
_logger.info(f"running ray.wait on {active_workers}")
# pyre-fixme[16]: Module `worker` has no attribute `wait`.
completed_workers, active_workers = ray.wait(active_workers)
# If a failure occurs the ObjectRef will be marked as completed.
# Calling ray.get will expose the failure as a RayActorError.
for object_ref in completed_workers:
ray.get(object_ref)
I was thinking something like this, this is not the correct code, but more of a thought process on how to do interaction between the logic that is creating placement groups and the logic that is creating command actors and how to keep them compartmentalized
@ljjsalt @d4l3k
I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.
you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.
PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.
additionally, we can @Remote these fns so both of these can be run in parallel
https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue
# step 2. while loop to keep create the rest of PGs incrementatlly,
#go through the rest of the pgs one by one and keep adding them to the queue
#we can deal with the logic of pg failure here or when to stop pg creation if needed
Actually, this step is not necessary. First, increasing PG one by one can be a problem once the number of nodes is large.
Please read the description of the X axis, it only makes the time that a PG creation event added to GCS pending queue longer.
The second change: instead of creating the command actor after a placement group has been scheduled, we create all the command actors at the beginning too(with SCHEDULING state).
Codecov Report
Merging #572 (3228b9c) into main (b051e3f) will increase coverage by
0.15%. The diff coverage is98.41%.
@@ Coverage Diff @@
## main #572 +/- ##
==========================================
+ Coverage 94.85% 95.00% +0.15%
==========================================
Files 66 66
Lines 4042 4144 +102
==========================================
+ Hits 3834 3937 +103
+ Misses 208 207 -1
| Impacted Files | Coverage Δ | |
|---|---|---|
| torchx/schedulers/ray_scheduler.py | 95.26% <ø> (ø) |
|
| torchx/schedulers/ray/ray_driver.py | 97.10% <98.07%> (+1.26%) |
:arrow_up: |
| torchx/components/dist.py | 96.42% <100.00%> (+7.06%) |
:arrow_up: |
| torchx/schedulers/ray/ray_common.py | 100.00% <100.00%> (ø) |
|
| torchx/specs/api.py | 98.40% <100.00%> (+<0.01%) |
:arrow_up: |
| torchx/runner/api.py | 96.85% <0.00%> (+0.01%) |
:arrow_up: |
:mega: Codecov can now indicate which changes are the most critical in Pull Requests. Learn more
If we did want to separate the concerns here with scheduling vs retries we could use the builtin task max_retries config though that has some other implications.
I'm not sure that using a queue with two actors would simplify this logic -- state machines that you can step through are nice from a testing perspective
If we did want to separate the concerns here with scheduling vs retries we could use the builtin task max_retries config though that has some other implications.
I'm not sure that using a queue with two actors would simplify this logic -- state machines that you can step through are nice from a testing perspective
But we are just running schedule(which cannot go wrong) and exec_module functions(which cannot go wrong unless script fails), it it necessary to use max_retries here? Of course, the feature can be easily added.