torchx icon indicating copy to clipboard operation
torchx copied to clipboard

[Ray] Add elasticity and fault tolerance features to jobs launched on ray cluster

Open ntlm1686 opened this issue 3 years ago • 7 comments

Two features for Elastic Distributed Training are added to job launched by TorchX on Ray Cluster in this PR:

  1. Fault Tolerance - Node failure throws RayActorError which can be captured. Placement Groups have built-in fault tolerance, and can recover from node failure automatically.
  2. Elasticity - the execution of placement groups are pending tasks that will be scheduled by GCS when resources become availiable.

The logic of the new ray_driver.py is in the plot below:

Drawing 2022-08-02 15 51 39 excalidraw

Test plan:

[Note]: This PR is related to the previous PR #559. All future changes will be submitted to this PR.

ntlm1686 avatar Aug 02 '22 19:08 ntlm1686

@ljjsalt @d4l3k

I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.

you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.

PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.

additionally, we can @remote these fns so both of these can be run in parallel

https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue

atinsood avatar Aug 03 '22 03:08 atinsood

@ljjsalt @d4l3k

I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.

you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.

PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.

additionally, we can @Remote these fns so both of these can be run in parallel

https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue

@atinsood

The reason that we should use Placement Group is: if we only use ray actors here, even though we can rerun the failed actor after it throws a RayActorError. But this job could potentially lose the computation resource it used to have, then fail the job even the node is recovered.

For example, there are 3 nodes each with 1 cpu, job 1 requires minimum nodes of 3, and it’s running with all 3 nodes, another job 2 launched later and it requires 1 cpu, and becomes a pending task. Once a node failure happens, we have to rerun the actor which becomes a pending task after job 2, and job 2 will take that node, job 1 will fail since there aren't enough nodes to restart.

Since placement groups rescheduling have the highest priority, this situation won't happen.

I didn't get it why to use queue, since we didn't use any inter process communication here.

I understand your concern in using ray.wait here. So I wrapped the return value of command(remote) actors with RayResult class. If anything unexpected happen, it throws an error, we will know right away. But based on the logic that ray.wait supposed to work, I think it's right to use it here.

Besides, considering each command actor is a finite state machine that has four states(SCHEDULING, FAILING, RUNNING, TASK_COMPLETED). For each command actor, it starts with SCHEDULING state, and actor.schedule.remote() becomes an asynchronous step function. When it's SCHEDULING, the next state is RUNNING. When it's RUNNING, the next state is TASK_COMPLETED or FAILING. When it's FAILING, the next state is SCHEDULING.

ntlm1686 avatar Aug 03 '22 04:08 ntlm1686

The reason that we should use Placement Group is

yeah, not disagreeing on that. I am just thinking how do we manage the interaction between pg creation and command actor creation

@ray.remote
class PlacementGroupManager(object):
    def __init__(self, min_nodes, max_nodes, queue):
        self.queue = queue
        pass

    def run() -> PlacementGroup:
        # step 1. create a PG with min replicas

        #put the initial pg in the queue
        self.queue.put(ready)

        # step 2. while loop to keep create the rest of PGs incrementatlly,

        #go through the rest of the pgs one by one and keep adding them to the queue
        #we can deal with the logic of pg failure here or when to stop pg creation if needed


@ray.remote
class CommandActorManager(object):
    def __init__(self, queue, active_workers):
        self.queue = queue
        self.active_workers = []
        pass

    def run(self):
        #some logic on when to stop the while loop, either a poison pill or a signal actor
        while True:
            self.queue.get() #get notified that the pg was created
            # logic to create the command actor and goes here

            active_workers.append(command_actor.exec_module.remote()) # or may be use another queue, pretty sure there is a better way to deal with this

            #once you are done reading all the PGs from the queue or if the queue 





def main() -> None:  # pragma: no cover
    actors: List[RayActor] = load_actor_json("actors.json")
    # pyre-fixme[16]: Module `worker` has no attribute `init`.
    ray.init(address="auto", namespace="torchx-ray")
    q = Queue() # we can set the set of the queue upfront
    pg_manager = PlacementGroupManager(2,4,q) #min, max and queue
    pg_manager.run.remote()
    cmd_actor_manager = CommandActorManager(q, active_workers)
    cmd_actor_manager.run.remote()
    # Await return result of remote ray function
    while len(active_workers) > 0:
        _logger.info(f"running ray.wait on {active_workers}")

        # pyre-fixme[16]: Module `worker` has no attribute `wait`.
        completed_workers, active_workers = ray.wait(active_workers)
        # If a failure occurs the ObjectRef will be marked as completed.
        # Calling ray.get will expose the failure as a RayActorError.
        for object_ref in completed_workers:
            ray.get(object_ref)


I was thinking something like this, this is not the correct code, but more of a thought process on how to do interaction between the logic that is creating placement groups and the logic that is creating command actors and how to keep them compartmentalized

atinsood avatar Aug 03 '22 04:08 atinsood

@ljjsalt @d4l3k

I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.

you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.

PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.

additionally, we can @Remote these fns so both of these can be run in parallel

https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue

        # step 2. while loop to keep create the rest of PGs incrementatlly,

        #go through the rest of the pgs one by one and keep adding them to the queue
        #we can deal with the logic of pg failure here or when to stop pg creation if needed

Actually, this step is not necessary. First, increasing PG one by one can be a problem once the number of nodes is large. Screen Shot 2022-08-03 at 12 31 14 AM Please read the description of the X axis, it only makes the time that a PG creation event added to GCS pending queue longer.

The second change: instead of creating the command actor after a placement group has been scheduled, we create all the command actors at the beginning too(with SCHEDULING state).

ntlm1686 avatar Aug 03 '22 04:08 ntlm1686

Codecov Report

Merging #572 (3228b9c) into main (b051e3f) will increase coverage by 0.15%. The diff coverage is 98.41%.

@@            Coverage Diff             @@
##             main     #572      +/-   ##
==========================================
+ Coverage   94.85%   95.00%   +0.15%     
==========================================
  Files          66       66              
  Lines        4042     4144     +102     
==========================================
+ Hits         3834     3937     +103     
+ Misses        208      207       -1     
Impacted Files Coverage Δ
torchx/schedulers/ray_scheduler.py 95.26% <ø> (ø)
torchx/schedulers/ray/ray_driver.py 97.10% <98.07%> (+1.26%) :arrow_up:
torchx/components/dist.py 96.42% <100.00%> (+7.06%) :arrow_up:
torchx/schedulers/ray/ray_common.py 100.00% <100.00%> (ø)
torchx/specs/api.py 98.40% <100.00%> (+<0.01%) :arrow_up:
torchx/runner/api.py 96.85% <0.00%> (+0.01%) :arrow_up:

:mega: Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

codecov[bot] avatar Aug 03 '22 15:08 codecov[bot]

If we did want to separate the concerns here with scheduling vs retries we could use the builtin task max_retries config though that has some other implications.

I'm not sure that using a queue with two actors would simplify this logic -- state machines that you can step through are nice from a testing perspective

d4l3k avatar Aug 03 '22 18:08 d4l3k

If we did want to separate the concerns here with scheduling vs retries we could use the builtin task max_retries config though that has some other implications.

I'm not sure that using a queue with two actors would simplify this logic -- state machines that you can step through are nice from a testing perspective

But we are just running schedule(which cannot go wrong) and exec_module functions(which cannot go wrong unless script fails), it it necessary to use max_retries here? Of course, the feature can be easily added.

ntlm1686 avatar Aug 04 '22 15:08 ntlm1686