Request for Examples of Pipeline Parallelism with Multiple Machines in PiPPy
I would like to use PiPPy for distributed inference with multiple machines and multiple GPUs. However, most of the test cases in the repository are for single-machine testing. Can you add examples for pipeline parallelism with multiple machines?
I tried modifying the example.py file to support multiple machines, by using two docker containers with four V100 GPUs. Each container uses two GPUs. Container 1 uses GPUs 0 and 1, while Container 2 uses GPUs 2 and 3.
The docker run command was:
docker run -itd --runtime=nvidia --shm-size=2gb --name $containername --network host pytorch/pytorch:latest
I used pippy.all_compile for compiling with multiple GPUs. However, I am not sure how to map multiple devices for multiple machines. According to my understanding, I set the rpc_backend_options for each worker as follows:
rpc_bk_options.set_device_map(f"worker0", {0: 0})
rpc_bk_options.set_device_map(f"worker1", {0: 1})
and
rpc_bk_options.set_device_map(f"worker0", {1: 0})
rpc_bk_options.set_device_map(f"worker1", {1: 1})
I have included the full code below. However, I received errors on both nodes. Can you please help me with my issue and provide examples for pipeline parallelism with multiple machines?
# Copyright (c) Meta Platforms, Inc. and affiliates
import sys
sys.path.append("/host/code/pytorch/PiPPy/")
from typing import Any
import torch
import torch.distributed.rpc as rpc
import argparse
import os
import pippy
from pippy import split_into_equal_size
from pippy.microbatch import TensorChunkSpec
import logging
logging.getLogger().setLevel(logging.DEBUG)
logging.basicConfig(format='%(asctime)s %(levelname)s %(filename)s %(funcName)s %(lineno)d: %(message)s', level=logging.DEBUG)
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
class MyNetworkBlock(torch.nn.Module):
def __init__(self, in_dim, out_dim):
super().__init__()
self.lin = torch.nn.Linear(in_dim, out_dim)
def forward(self, x):
x = self.lin(x)
x = torch.relu(x)
return x
class MyNetwork(torch.nn.Module):
def __init__(self, in_dim, layer_dims):
super().__init__()
prev_dim = in_dim
for i, dim in enumerate(layer_dims):
setattr(self, f"layer{i}", MyNetworkBlock(prev_dim, dim))
prev_dim = dim
self.num_layers = len(layer_dims)
self.output_proj = torch.nn.Linear(layer_dims[-1], 10)
def forward(self, x):
for i in range(self.num_layers):
x = getattr(self, f"layer{i}")(x)
return self.output_proj(x)
def main(args):
world_size = args.world_size
rank = args.rank
device = args.rank
os.environ['MASTER_ADDR'] = args.master_addr
os.environ['MASTER_PORT'] = args.master_port
split_policy = split_into_equal_size(world_size)
rpc_bk_options = rpc.TensorPipeRpcBackendOptions(rpc_timeout=10.0, num_worker_threads=256)
if rank == 0:
os.environ["CUDA_VISIBLE_DEVICES"] = '0,1'
rpc_bk_options.set_device_map(f"worker0", {0: 0})
rpc_bk_options.set_device_map(f"worker1", {0: 1})
elif rank ==1:
os.environ["CUDA_VISIBLE_DEVICES"] = '2,3'
rpc_bk_options.set_device_map(f"worker0", {1: 0})
rpc_bk_options.set_device_map(f"worker1", {1: 1})
rpc.init_rpc(f"worker{rank}", rank=rank, world_size=world_size,
rpc_backend_options=rpc_bk_options)
print(f"rpc init success for worker{rank}.")
args_chunk_spec: Any = (TensorChunkSpec(0),)
kwargs_chunk_spec: Any = {}
output_chunk_spec: Any = TensorChunkSpec(0)
mn = MyNetwork(512, [512, 1024, 256]).to(device)
mn.eval()
pipe_driver, stage_mod = pippy.all_compile(
mn,
num_ranks=world_size,
num_chunks=2,
split_policy=split_policy,
schedule = 'FillDrain',
args_chunk_spec=args_chunk_spec,
kwargs_chunk_spec=kwargs_chunk_spec,
output_chunk_spec=output_chunk_spec,
)
x = torch.randn(512, 512).to(device)
if args.rank == 0:
output = pipe_driver(x)
print("output.")
reference_output = mn(x)
print("reference_output.")
torch.testing.assert_close(output, reference_output)
print(" Pipeline parallel model ran successfully! ".center(80, "*"))
rpc.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--world_size", type=int, default=int(os.getenv("WORLD_SIZE", 2))
)
parser.add_argument("--rank", type=int, default=int(os.getenv("RANK", 0)))
parser.add_argument(
"--master_addr", type=str, default=os.getenv("MASTER_ADDR", "xx.xx.xx.xx")
)
parser.add_argument(
"--master_port", type=str, default=os.getenv("MASTER_PORT", "29500")
)
args = parser.parse_args()
main(args)
error on rank0
Traceback (most recent call last):
File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 136, in <module>
main(args)
File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 110, in main
output = pipe_driver(x)
File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1194, in _call_impl
return forward_call(*input, **kwargs)
File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 2199, in forward
local_results_and_last_grads = self._retrieve_output_values(
File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1657, in _retrieve_output_values
return pippy.fx.node.map_aggregate(
File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in map_aggregate
return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in <genexpr>
return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 627, in map_aggregate
return fn(a)
File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1659, in <lambda>
lambda a: a.wait() if isinstance(a, torch._C.Future) else a,
File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 76, in wait
return super().wait()
File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 264, in raise_error
raise fut_result
File "/opt/conda/lib/python3.10/site-packages/torch/distributed/rpc/rref_proxy.py", line 58, in _complete_op
result.set_result(fut.value())
RuntimeError: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
error on rank1
Aborted (core dumped)
log in rank0:
Namespace(world_size=2, rank=0, master_addr='30.57.186.38', master_port='29500')
rpc init success for worker0.
DEBUG > WorkerInfo(id=0, name=worker0)
INFO > [PiPPy] Tracing model ...
DEBUG > Total model size: 1052938, per stage size: 526469
DEBUG > layer0_lin has params: {'weight': 262144, 'bias': 512}
DEBUG > layer1_lin has params: {'weight': 524288, 'bias': 1024}
DEBUG > layer2_lin has params: {'weight': 262144, 'bias': 256}
DEBUG > output_proj has params: {'weight': 2560, 'bias': 10}
INFO > Pipeline is in evaluation mode, backward pass not generated
DEBUG > {device(type='cuda', index=0): device(type='cuda', index=0)}
INFO > Found device cuda:0 for rank 0
device: cuda:0
INFO > GraphModule(
(submod_0): PipeStageModule(
(layer0_lin): Linear(in_features=512, out_features=512, bias=True)
)
(submod_1): PipeStageModule(
(layer1_lin): Linear(in_features=512, out_features=1024, bias=True)
(layer2_lin): Linear(in_features=1024, out_features=256, bias=True)
(output_proj): Linear(in_features=256, out_features=10, bias=True)
)
)
def forward(self, x):
submod_0 = self.submod_0(x); x = None
submod_1 = self.submod_1(submod_0); submod_0 = None
return submod_1
# To see more debug info, please use `graph_module.print_readable()`
INFO > [PiPPy] Creating pipeline driver ...
INFO > [root] Creating pipeline driver with 2 workers: [0, 1]
DEBUG > [root] Sending stage_id = 0 mod to worker
INFO > [0] Instantiating RankWorker
DEBUG > [root] Sending stage_id = 1 mod to worker
INFO > Materializing submod_0 on cuda:0
INFO > Instantiating PipeStageExecutor for stage 0
DEBUG > [root] Waiting stage_id = 0 mod to be confirmed by worker
DEBUG > [root] Waiting stage_id = 1 mod to be confirmed by worker
x.
INFO > [root] Running pipeline with 2 micro-batches
DEBUG > [root] Instantiating microbatch interpreter for chunk 0
DEBUG > [root] RemoteInterpreter created with 4 nodes
DEBUG > [root] Instantiating microbatch interpreter for chunk 1
DEBUG > [root] RemoteInterpreter created with 4 nodes
DEBUG > [root] 2 instantiated
DEBUG > [0] Issue command to run %x : [#users=1] = placeholder[target=x]
DEBUG > [1] Issue command to run %x : [#users=1] = placeholder[target=x]
DEBUG > [0] Issue command to run %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
DEBUG > [root][0] Issuing Phase.FORWARD invocation for target submod_0 on stage 0
DEBUG > [0] Issue command to run %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [root][0] Issuing Phase.FORWARD invocation for target submod_1 on stage 1
DEBUG > [1] Issue command to run %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
DEBUG > [0][0] Received invoke call for %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
DEBUG > [root][1] Issuing Phase.FORWARD invocation for target submod_0 on stage 0
DEBUG > [0][0] Invoke call found 0 ValueReference arguments
DEBUG > [0][0] Invoke instantiated WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})) with key 0_submod_0
DEBUG > [0][0] No RRef arguments. Scheduling directly as READY workitem
DEBUG > [0] Current ready runlist keys: dict_keys([])
DEBUG > [0] Dequeueing workitem from set of 1
DEBUG > [1] Issue command to run %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [0][0] Got WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {}))
DEBUG > [root][1] Issuing Phase.FORWARD invocation for target submod_1 on stage 1
DEBUG > [0][1] Received invoke call for %submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})
INFO > [0] Running forward module for microbatch 0
DEBUG > [root] Retrieving output values from 2 chunks
DEBUG > [0][1] Invoke call found 0 ValueReference arguments
DEBUG > [0] Issue command to run return submod_1
DEBUG > [0][0] Executing transfer of value ValueReference(0, 0_submod_0) initiated by stage 1 for 0_submod_1
DEBUG > [0][1] Invoke instantiated WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {})) with key 1_submod_0
DEBUG > [1] Issue command to run return submod_1
DEBUG > [0][1] No RRef arguments. Scheduling directly as READY workitem
DEBUG > [0] Current ready runlist keys: dict_keys([])
DEBUG > [0][1] Executing transfer of value ValueReference(0, 1_submod_0) initiated by stage 1 for 1_submod_1
DEBUG > [0][0] Populating result of type <class 'torch.Tensor'> for 0_submod_0
DEBUG > [0] Dequeueing workitem from set of 1
DEBUG > [0][1] Got WorkItem WorkItem(%submod_0 : [#users=1] = call_module[target=submod_0](args = (%x,), kwargs = {}))
INFO > [0] Running forward module for microbatch 1
DEBUG > [0][1] Populating result of type <class 'torch.Tensor'> for 1_submod_0
[W tensorpipe_agent.cpp:940] RPC agent for worker0 encountered error when reading incoming response from worker1: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:940] RPC agent for worker0 encountered error when reading incoming response from worker1: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:726] RPC agent for worker0 encountered error when reading incoming request from worker1: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Traceback (most recent call last):
File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 136, in <module>
main(args)
File "/host/code/pytorch_dp/pytorch_tutorials/pipeline_parallel/pippy_test/example_gpu/issue/example.py", line 110, in main
output = pipe_driver(x)
File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1194, in _call_impl
return forward_call(*input, **kwargs)
File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 2199, in forward
local_results_and_last_grads = self._retrieve_output_values(
File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1657, in _retrieve_output_values
return pippy.fx.node.map_aggregate(
File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in map_aggregate
return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 620, in <genexpr>
return immutable_list(map_aggregate(elem, fn, should_traverse_fn) for elem in a)
File "/host/code/pytorch/PiPPy/pippy/fx/node.py", line 627, in map_aggregate
return fn(a)
File "/host/code/pytorch/PiPPy/pippy/PipelineDriver.py", line 1659, in <lambda>
lambda a: a.wait() if isinstance(a, torch._C.Future) else a,
File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 76, in wait
return super().wait()
File "/opt/conda/lib/python3.10/site-packages/torch/futures/__init__.py", line 264, in raise_error
raise fut_result
File "/opt/conda/lib/python3.10/site-packages/torch/distributed/rpc/rref_proxy.py", line 58, in _complete_op
result.set_result(fut.value())
RuntimeError: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
log in rank1:
Namespace(world_size=2, rank=1, master_addr='30.57.186.38', master_port='29500')
rpc init success for worker1.
DEBUG > WorkerInfo(id=1, name=worker1)
INFO > [PiPPy] Tracing model ...
DEBUG > Total model size: 1052938, per stage size: 526469
DEBUG > layer0_lin has params: {'weight': 262144, 'bias': 512}
DEBUG > layer1_lin has params: {'weight': 524288, 'bias': 1024}
DEBUG > layer2_lin has params: {'weight': 262144, 'bias': 256}
DEBUG > output_proj has params: {'weight': 2560, 'bias': 10}
INFO > Pipeline is in evaluation mode, backward pass not generated
DEBUG > {device(type='cuda', index=1): device(type='cuda', index=1)}
INFO > Found device cuda:1 for rank 1
device: cuda:1
INFO > [1] Instantiating RankWorker
x.
INFO > Materializing submod_1 on cuda:1
INFO > Instantiating PipeStageExecutor for stage 1
DEBUG > [1][0] Received invoke call for %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [1][0] Invoke call found 1 ValueReference arguments
DEBUG > [1][0] Invoke instantiated WorkItem WorkItem(%submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})) with key 0_submod_1
DEBUG > [1][0] Scheduling WorkItem as WAITING workitem
DEBUG > [1] Current waiting runlist keys: dict_keys([])
DEBUG > [1][0] Launching RPC data transfer for ValueReference 0 ValueReference(0, 0_submod_0)
DEBUG > [1][0] Requesting transfer of value ValueReference(0, 0_submod_0) for runlist item 0_submod_1 arg_idx 0
DEBUG > [1][1] Received invoke call for %submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})
DEBUG > [1][1] Invoke call found 1 ValueReference arguments
DEBUG > [1][1] Invoke instantiated WorkItem WorkItem(%submod_1 : [#users=1] = call_module[target=submod_1](args = (%submod_0,), kwargs = {})) with key 1_submod_1
DEBUG > [1][1] Scheduling WorkItem as WAITING workitem
DEBUG > [1] Current waiting runlist keys: dict_keys(['0_submod_1'])
DEBUG > [1][1] Launching RPC data transfer for ValueReference 0 ValueReference(0, 1_submod_0)
DEBUG > [1][1] Requesting transfer of value ValueReference(0, 1_submod_0) for runlist item 1_submod_1 arg_idx 0
DEBUG > [1][-1] Executing transfer of value ValueReference(1, 0_submod_1) initiated by stage root for collect
DEBUG > [1][-1] Executing transfer of value ValueReference(1, 1_submod_1) initiated by stage root for collect
Aborted (core dumped)