Device assignment does not work in PyTorch
As of 12372f4c6562f296c510f6734e748ef54c375c33, device assignment in the PyTorch dataloader does not work correctly with multiple GPUs.
import os
import pandas as pd
from merlin.dataloader.torch import Loader
from merlin.io.dataset import Dataset
dataset = Dataset(pd.DataFrame({"a": list(range(10))}))
dataset = dataset.repartition(npartitions=2)
rank = int(os.environ["LOCAL_RANK"])
with Loader(
dataset,
batch_size=1,
global_rank=rank,
global_size=2,
device=rank,
) as loader:
for idx, batch in enumerate(loader):
x, y = batch
device = x["a"].device
print(f"rank: {rank}, device: {device}")
When I run the above, I get:
root@ba87ba84045a:/dataloader# torchrun --nproc_per_node=2 test_torch_multi_gpu.py
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 0, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
rank: 1, device: cuda:0
But for rank 1, tensors are expected to be be placed on cuda:1 not cuda:0.
Even with #132, the device assignment still doesn't work for list columns:
import os
import pandas as pd
from merlin.dataloader.torch import Loader
from merlin.io.dataset import Dataset
#dataset = Dataset(pd.DataFrame({"a": list(range(10))}))
dataset = Dataset(pd.DataFrame({"a": [[1], [2, 3]] * 5}))
dataset = dataset.repartition(npartitions=2)
rank = int(os.environ["LOCAL_RANK"])
with Loader(
dataset,
batch_size=1,
global_rank=rank,
global_size=2,
device=rank,
) as loader:
for idx, batch in enumerate(loader):
x, y = batch
values_device = x["a__values"].device
offsets_device = x["a__offsets"].device
print(f"rank: {rank}, values_device: {values_device}, offsets_device: {offsets_device}")
# torchrun --nproc_per_node=2 test_torch_multi_gpu.py
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
/usr/local/lib/python3.8/dist-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
rank: 0, values_device: cuda:0, offsets_device: cuda:0
rank: 0, values_device: cuda:0, offsets_device: cuda:0
rank: 0, values_device: cuda:0, offsets_device: cuda:0
rank: 0, values_device: cuda:0, offsets_device: cuda:0
rank: 0, values_device: cuda:0, offsets_device: cuda:0
terminate called after throwing an instance of 'thrust::system::system_error'
what(): transform: failed to synchronize: cudaErrorIllegalAddress: an illegal memory access was encountered
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: -6) local_rank: 1 (pid: 1218) of binary: /usr/bin/python3
Traceback (most recent call last):
File "/usr/local/bin/torchrun", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/torch/distributed/run.py", line 762, in main
run(args)
File "/usr/local/lib/python3.8/dist-packages/torch/distributed/run.py", line 753, in run
elastic_launch(
File "/usr/local/lib/python3.8/dist-packages/torch/distributed/launcher/api.py", line 132, in __call__
return launch_agent(self._config, self._entrypoint, list(args))
File "/usr/local/lib/python3.8/dist-packages/torch/distributed/launcher/api.py", line 246, in launch_agent
raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError:
=====================================================
test_torch_multi_gpu.py FAILED
-----------------------------------------------------
Failures:
<NO_OTHER_FAILURES>
-----------------------------------------------------
Root Cause (first observed failure):
[0]:
time : 2023-04-13_21:57:34
host : 0810733-lcedt
rank : 1 (local_rank: 1)
exitcode : -6 (pid: 1218)
error_file: <N/A>
traceback : Signal 6 (SIGABRT) received by PID 1218
=====================================================
Wondering if this may be related to which GPU the cuDF dataframe or series is coming from. Selecting the device with cuDF requires dropping down to CuPy AFAIK, which I think requires running cupy.cuda.Device(1).use() before creating dataframes.
#135 partially fixes the issue, but users have to set the device themselves by using cupy.cuda.Device() (example: https://github.com/NVIDIA-Merlin/Transformers4Rec/pull/675) or
rank = int(os.environ["LOCAL_RANK"])
with cupy.cuda.Device(rank):
dataset = Dataset(pd.DataFrame(
{"a": [[1], [2, 3], [4, 5], [6, 7, 8], [9], [10, 11], [12], [13, 14]]}
))
dataset = dataset.repartition(npartitions=2)
with Loader(
dataset,
batch_size=2,
global_rank=rank,
global_size=2,
device=rank,
) as loader:
for idx, batch in enumerate(loader):
x, y = batch
device = x["a__values"].device
print(f"rank: {rank}, device: {device}")
We should move this to Merlin Core by implementing something like a device parameter in Dataset(..., device=device).
@edknv Could you create an issue for the proposed change to merlin.io.Dataset (if there isn't one already)?