PiPPy icon indicating copy to clipboard operation
PiPPy copied to clipboard

Request for Examples of Pipeline Parallelism with Multiple Machines in PiPPy

Open littlefatfat opened this issue 2 years ago • 1 comments

I would like to use PiPPy for distributed inference with multiple machines and multiple GPUs. However, most of the test cases in the repository are for single-machine testing. Can you add examples for pipeline parallelism with multiple machines?

I tried modifying the example.py file to support multiple machines, by using two docker containers with four V100 GPUs. Each container uses two GPUs. Container 1 uses GPUs 0 and 1, while Container 2 uses GPUs 2 and 3.

The docker run command was:

docker run -itd --runtime=nvidia --shm-size=2gb --name $containername --network host pytorch/pytorch:latest

I used pippy.all_compile for compiling with multiple GPUs. However, I am not sure how to map multiple devices for multiple machines. According to my understanding, I set the rpc_backend_options for each worker as follows:

rpc_bk_options.set_device_map(f"worker0", {0: 0})
rpc_bk_options.set_device_map(f"worker1", {0: 1})

and

rpc_bk_options.set_device_map(f"worker0", {1: 0})
rpc_bk_options.set_device_map(f"worker1", {1: 1})

I have included the full code below. However, I received errors on both nodes. Can you please help me with my issue and provide examples for pipeline parallelism with multiple machines?

# Copyright (c) Meta Platforms, Inc. and affiliates
import sys
sys.path.append("/host/code/pytorch/PiPPy/")
from typing import Any
import torch
import torch.distributed.rpc as rpc

import argparse
import os
import pippy
from pippy import split_into_equal_size
from pippy.microbatch import TensorChunkSpec

import logging
logging.getLogger().setLevel(logging.DEBUG)
logging.basicConfig(format='%(asctime)s %(levelname)s %(filename)s %(funcName)s %(lineno)d: %(message)s', level=logging.DEBUG)

os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"

class MyNetworkBlock(torch.nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.lin = torch.nn.Linear(in_dim, out_dim)

    def forward(self, x):
        x = self.lin(x)
        x = torch.relu(x)
        return x


class MyNetwork(torch.nn.Module):
    def __init__(self, in_dim, layer_dims):
        super().__init__()

        prev_dim = in_dim
        for i, dim in enumerate(layer_dims):
            setattr(self, f"layer{i}", MyNetworkBlock(prev_dim, dim))
            prev_dim = dim

        self.num_layers = len(layer_dims)
        self.output_proj = torch.nn.Linear(layer_dims[-1], 10)

    def forward(self, x):
        for i in range(self.num_layers):
            x = getattr(self, f"layer{i}")(x)

        return self.output_proj(x)

def main(args):

    world_size = args.world_size
    rank = args.rank
    device = args.rank

    os.environ['MASTER_ADDR'] = args.master_addr
    os.environ['MASTER_PORT'] = args.master_port

    split_policy = split_into_equal_size(world_size)

    rpc_bk_options = rpc.TensorPipeRpcBackendOptions(rpc_timeout=10.0, num_worker_threads=256)

    if rank == 0:
        os.environ["CUDA_VISIBLE_DEVICES"] = '0,1'
        rpc_bk_options.set_device_map(f"worker0", {0: 0})
        rpc_bk_options.set_device_map(f"worker1", {0: 1})
        
    elif rank ==1:
        os.environ["CUDA_VISIBLE_DEVICES"] = '2,3'
        rpc_bk_options.set_device_map(f"worker0", {1: 0})
        rpc_bk_options.set_device_map(f"worker1", {1: 1})

    rpc.init_rpc(f"worker{rank}", rank=rank, world_size=world_size,
                rpc_backend_options=rpc_bk_options)

    print(f"rpc init success for worker{rank}.")

    args_chunk_spec: Any = (TensorChunkSpec(0),)
    kwargs_chunk_spec: Any = {}
    output_chunk_spec: Any = TensorChunkSpec(0)
    mn = MyNetwork(512, [512, 1024, 256]).to(device)
    mn.eval()
    pipe_driver, stage_mod = pippy.all_compile(
        mn,
        num_ranks=world_size,
        num_chunks=2,
        split_policy=split_policy,
        schedule = 'FillDrain',
        args_chunk_spec=args_chunk_spec,
        kwargs_chunk_spec=kwargs_chunk_spec,
        output_chunk_spec=output_chunk_spec,
    )

    x = torch.randn(512, 512).to(device)

    if args.rank == 0:

        output = pipe_driver(x)
        print("output.")

        reference_output = mn(x)
        print("reference_output.")

        torch.testing.assert_close(output, reference_output)

        print(" Pipeline parallel model ran successfully! ".center(80, "*"))
        
    rpc.shutdown()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--world_size", type=int, default=int(os.getenv("WORLD_SIZE", 2))
    )
    parser.add_argument("--rank", type=int, default=int(os.getenv("RANK", 0)))
    parser.add_argument(
        "--master_addr", type=str, default=os.getenv("MASTER_ADDR", "xx.xx.xx.xx")
    )
    parser.add_argument(
        "--master_port", type=str, default=os.getenv("MASTER_PORT", "29500")
    )

    args = parser.parse_args()
    main(args)

error on rank0

Traceback (most recent call last):
  File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 136, in <module>
    main(args)
  File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 110, in main
    output = pipe_driver(x)
  File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1194, in _call_impl
    return forward_call(*input, **kwargs)
  File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 2199, in forward
    local_results_and_last_grads = self._retrieve_output_values(
  File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1657, in _retrieve_output_values
    return pippy.fx.node.map_aggregate(
  File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in map_aggregate
    return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
  File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in <genexpr>
    return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
  File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 627, in map_aggregate
    return fn(a)
  File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1659, in <lambda>
    lambda a: a.wait() if isinstance(a, torch._C.Future) else a,
  File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 76, in wait
    return super().wait()
  File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 264, in raise_error
    raise fut_result
  File "/opt/conda/lib/python3.10/site-packages/torch/distributed/rpc/rref_proxy.py", line 58, in _complete_op
    result.set_result(fut.value())
RuntimeError: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)

error on rank1 Aborted (core dumped)

littlefatfat avatar May 30 '23 04:05 littlefatfat

log in rank0:

Namespace(world_size=2, rank=0, master_addr='30.57.186.38', master_port='29500')
rpc init success for worker0.
DEBUG > WorkerInfo(id=0, name=worker0)
INFO > [PiPPy] Tracing model ...
DEBUG > Total model size: 1052938, per stage size: 526469
DEBUG > layer0_lin has params: {'weight': 262144, 'bias': 512}
DEBUG > layer1_lin has params: {'weight': 524288, 'bias': 1024}
DEBUG > layer2_lin has params: {'weight': 262144, 'bias': 256}
DEBUG > output_proj has params: {'weight': 2560, 'bias': 10}
INFO > Pipeline is in evaluation mode, backward pass not generated
DEBUG > {device(type='cuda', index=0): device(type='cuda', index=0)}
INFO > Found device cuda:0 for rank 0
device: cuda:0
INFO > GraphModule(
  (submod_0): PipeStageModule(
    (layer0_lin): Linear(in_features=512, out_features=512, bias=True)
  )
  (submod_1): PipeStageModule(
    (layer1_lin): Linear(in_features=512, out_features=1024, bias=True)
    (layer2_lin): Linear(in_features=1024, out_features=256, bias=True)
    (output_proj): Linear(in_features=256, out_features=10, bias=True)
  )
)



def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return submod_1

# To see more debug info, please use `graph_module.print_readable()`
INFO > [PiPPy] Creating pipeline driver ...
INFO > [root] Creating pipeline driver with 2 workers: [0, 1]
DEBUG > [root] Sending stage_id = 0 mod to worker
INFO > [0] Instantiating RankWorker
DEBUG > [root] Sending stage_id = 1 mod to worker
INFO > Materializing submod_0 on cuda:0
INFO > Instantiating PipeStageExecutor for stage 0
DEBUG > [root] Waiting stage_id = 0 mod to be confirmed by worker
DEBUG > [root] Waiting stage_id = 1 mod to be confirmed by worker
x.
INFO > [root] Running pipeline with 2 micro-batches
DEBUG > [root] Instantiating microbatch interpreter for chunk 0
DEBUG > [root] RemoteInterpreter created with 4 nodes
DEBUG > [root] Instantiating microbatch interpreter for chunk 1
DEBUG > [root] RemoteInterpreter created with 4 nodes
DEBUG > [root] 2 instantiated
DEBUG > [0] Issue command to run %x : [#users=1] = placeholder[target=x]
DEBUG > [1] Issue command to run %x : [#users=1] = placeholder[target=x]
DEBUG > [0] Issue command to run %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
DEBUG > [root][0] Issuing Phase.FORWARD invocation for target submod_0 on stage 0
DEBUG > [0] Issue command to run %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [root][0] Issuing Phase.FORWARD invocation for target submod_1 on stage 1
DEBUG > [1] Issue command to run %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
DEBUG > [0][0] Received invoke call for %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
DEBUG > [root][1] Issuing Phase.FORWARD invocation for target submod_0 on stage 0
DEBUG > [0][0] Invoke call found 0 ValueReference arguments
DEBUG > [0][0] Invoke instantiated WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})) with key 0_submod_0
DEBUG > [0][0] No RRef arguments. Scheduling directly as READY workitem
DEBUG > [0] Current ready runlist keys: dict_keys([])
DEBUG > [0] Dequeueing workitem from set of 1
DEBUG > [1] Issue command to run %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [0][0] Got WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {}))
DEBUG > [root][1] Issuing Phase.FORWARD invocation for target submod_1 on stage 1
DEBUG > [0][1] Received invoke call for %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
INFO > [0] Running forward module for microbatch 0
DEBUG > [root] Retrieving output values from 2 chunks
DEBUG > [0][1] Invoke call found 0 ValueReference arguments
DEBUG > [0] Issue command to run return submod_1
DEBUG > [0][0] Executing transfer of value ValueReference(0, 0_submod_0) initiated by stage 1 for 0_submod_1
DEBUG > [0][1] Invoke instantiated WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})) with key 1_submod_0
DEBUG > [1] Issue command to run return submod_1
DEBUG > [0][1] No RRef arguments. Scheduling directly as READY workitem
DEBUG > [0] Current ready runlist keys: dict_keys([])
DEBUG > [0][1] Executing transfer of value ValueReference(0, 1_submod_0) initiated by stage 1 for 1_submod_1
DEBUG > [0][0] Populating result of type <class 'torch.Tensor'> for 0_submod_0
DEBUG > [0] Dequeueing workitem from set of 1
DEBUG > [0][1] Got WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {}))
INFO > [0] Running forward module for microbatch 1
DEBUG > [0][1] Populating result of type <class 'torch.Tensor'> for 1_submod_0
[W tensorpipe_agent.cpp:940] RPC agent for worker0 encountered error when reading incoming response from worker1: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:940] RPC agent for worker0 encountered error when reading incoming response from worker1: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:726] RPC agent for worker0 encountered error when reading incoming request from worker1: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Traceback (most recent call last):
  File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 136, in <module>
    main(args)
  File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 110, in main
    output = pipe_driver(x)
  File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1194, in _call_impl
    return forward_call(*input, **kwargs)
  File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 2199, in forward
    local_results_and_last_grads = self._retrieve_output_values(
  File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1657, in _retrieve_output_values
    return pippy.fx.node.map_aggregate(
  File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in map_aggregate
    return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
  File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in <genexpr>
    return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
  File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 627, in map_aggregate
    return fn(a)
  File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1659, in <lambda>
    lambda a: a.wait() if isinstance(a, torch._C.Future) else a,
  File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 76, in wait
    return super().wait()
  File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 264, in raise_error
    raise fut_result
  File "/opt/conda/lib/python3.10/site-packages/torch/distributed/rpc/rref_proxy.py", line 58, in _complete_op
    result.set_result(fut.value())
RuntimeError: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)

log in rank1:

Namespace(world_size=2, rank=1, master_addr='30.57.186.38', master_port='29500')
rpc init success for worker1.
DEBUG > WorkerInfo(id=1, name=worker1)
INFO > [PiPPy] Tracing model ...
DEBUG > Total model size: 1052938, per stage size: 526469
DEBUG > layer0_lin has params: {'weight': 262144, 'bias': 512}
DEBUG > layer1_lin has params: {'weight': 524288, 'bias': 1024}
DEBUG > layer2_lin has params: {'weight': 262144, 'bias': 256}
DEBUG > output_proj has params: {'weight': 2560, 'bias': 10}
INFO > Pipeline is in evaluation mode, backward pass not generated
DEBUG > {device(type='cuda', index=1): device(type='cuda', index=1)}
INFO > Found device cuda:1 for rank 1
device: cuda:1
INFO > [1] Instantiating RankWorker
x.
INFO > Materializing submod_1 on cuda:1
INFO > Instantiating PipeStageExecutor for stage 1
DEBUG > [1][0] Received invoke call for %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [1][0] Invoke call found 1 ValueReference arguments
DEBUG > [1][0] Invoke instantiated WorkItem WorkItem(%submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})) with key 0_submod_1
DEBUG > [1][0] Scheduling WorkItem as WAITING workitem
DEBUG > [1] Current waiting runlist keys: dict_keys([])
DEBUG > [1][0] Launching RPC data transfer for ValueReference 0 ValueReference(0, 0_submod_0)
DEBUG > [1][0] Requesting transfer of value ValueReference(0, 0_submod_0) for runlist item 0_submod_1 arg_idx 0
DEBUG > [1][1] Received invoke call for %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [1][1] Invoke call found 1 ValueReference arguments
DEBUG > [1][1] Invoke instantiated WorkItem WorkItem(%submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})) with key 1_submod_1
DEBUG > [1][1] Scheduling WorkItem as WAITING workitem
DEBUG > [1] Current waiting runlist keys: dict_keys(['0_submod_1'])
DEBUG > [1][1] Launching RPC data transfer for ValueReference 0 ValueReference(0, 1_submod_0)
DEBUG > [1][1] Requesting transfer of value ValueReference(0, 1_submod_0) for runlist item 1_submod_1 arg_idx 0
DEBUG > [1][-1] Executing transfer of value ValueReference(1, 0_submod_1) initiated by stage root for collect
DEBUG > [1][-1] Executing transfer of value ValueReference(1, 1_submod_1) initiated by stage root for collect
Aborted (core dumped)

littlefatfat avatar May 30 '23 06:05 littlefatfat