[Bug] Poor performance when processing large amounts of LLM text data
What happened + What you expected to happen
I want to use ray to do large-scale text data cleaning tasks, and extracted 5 million data from the open source redpajama github dataset for testing, which is about 58G. I used two 128 vCPU, 256 GiB nodes for testing, which took more than 2 hours.
However, I used a 128 vCPU, 256 GiB node, did not use the ray framework, and used the datasets api to do multi-process processing, which only took about 20 minutes.
When using ray, it always reports an error: (raylet) A worker died or was killed while executing a task by an unexpected system error. It should be that the worker was killed continuously due to out-of-memory. I have increased --object-store-memory, reduced concurrency and other debugging settings, but still cannot improve.
What should I do to improve performance. Now the performance of ray 2 nodes and single machines without ray is very different.
Versions / Dependencies
- ray: 2.10.0
- python: 3.8.15
Reproduction script
ray script:
import ftfy
import time
def FixUnicodeMapper(sample):
text_key = 'content'
normalization = 'NFKC'
sample[text_key] = ftfy.fix_text(sample[text_key], normalization=normalization)
return sample
if __name__ == "__main__":
import ray
from ray.data import read_json
ray.init()
num_nodes = ray.nodes()
num_nodes_used = len(num_nodes)
print(">>> Number nodes:", num_nodes_used)
data_path = "./datasets/redpajama_github/json_parts_500w/" # 224 jsonl files
export_path = './out_dir/'
s_time = time.time()
dataset = read_json(data_path)
res_datasets = dataset.map(FixUnicodeMapper)
res_datasets.write_json(export_path, force_ascii=False)
print('cost time: ', time.time() - s_time)
ray.shutdown()
Issue Severity
None
Hi, you should try to use map_batches() to process vectorized data. Map is not efficient as it will process each row and create a task...
Maybe you can change number of parallels by setting concurrency . also based you the screenshot, the object memory is not full but it's OOM, this is typically because there are not enough memory for work heap, i.e. 30% store will results in 70% heap while, 60 % store results in 40% heap. So what you need is to reduce the object memory store so that the work heap can take more memory.
Thanks very much, I'll try it.
If your setup allows as well please grab the latest version of Ray; there was a bunch of perf and stability updates we did from 2.10 to 2.24, which is latest. (we've started releasing Ray weekly to keep up with the rapid pace of change in the AI/ML space in recent months)
@Bye-legumes @anyscalesam
Thanks. According to the previous suggestions, I tried to use map_batches, and adjust the batch_size, concurrency, and memory parameters, and the performance has improved. But it still does not meet expectations. Currently, it takes about 1 hour for 2 nodes, which is still a bit different from the half-hour for a single machine. But I am using 2.10. I will continue to test it with the latest version. Thanks.
def FixUnicodeMapperBatches(samples):
text_key = 'content'
normalization = 'NFKC'
normalize_text = list(map(lambda t: ftfy.fix_text(t, normalization=normalization), samples[text_key]))
samples[text_key] = normalize_text
return samples
dataset = read_json(data_path)
res_datasets = dataset.map_batches(
FixUnicodeMapperBatches,
batch_size=1024*2,
# concurrency=256*2
)
@Bye-legumes @anyscalesam I used version 2.24, but the performance is still not improved. Is there any way to speed up?
I used a single node (not using ray) to process a single file (58G) about 30 minutes with multiple processes, which was faster than splitting it into multiple small files.
I also tried to merge the files into larger files and process it with ray. There was a slight improvement. But it still did not meet expectations.
2024-06-18 16:26:34,264 INFO usage_lib.py:467 -- Usage stats collection is enabled by default without user confirmation because this terminal is detected to be non-interactive. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.
2024-06-18 16:26:34,264 INFO scripts.py:767 -- Local node IP: dlc1osj4eeqgn24e-master-0
2024-06-18 16:26:37,124 SUCC scripts.py:804 -- --------------------
2024-06-18 16:26:37,125 SUCC scripts.py:805 -- Ray runtime started.
2024-06-18 16:26:37,125 SUCC scripts.py:806 -- --------------------
2024-06-18 16:26:37,125 INFO scripts.py:808 -- Next steps
2024-06-18 16:26:37,125 INFO scripts.py:811 -- To add another node to this Ray cluster, run
2024-06-18 16:26:37,125 INFO scripts.py:814 -- ray start --address='dlc1osj4eeqgn24e-master-0:23456'
2024-06-18 16:26:37,125 INFO scripts.py:823 -- To connect to this Ray cluster:
2024-06-18 16:26:37,125 INFO scripts.py:825 -- import ray
2024-06-18 16:26:37,125 INFO scripts.py:826 -- ray.init(_node_ip_address='dlc1osj4eeqgn24e-master-0')
2024-06-18 16:26:37,125 INFO scripts.py:857 -- To terminate the Ray runtime, run
2024-06-18 16:26:37,125 INFO scripts.py:858 -- ray stop
2024-06-18 16:26:37,125 INFO scripts.py:861 -- To view the status of the cluster, use
2024-06-18 16:26:37,125 INFO scripts.py:862 -- ray status
Network is under initialization...
Network successfully initialized.
2024-06-18 16:26:39,193 INFO worker.py:1568 -- Connecting to existing Ray cluster at address: dlc1osj4eeqgn24e-master-0:23456...
2024-06-18 16:26:39,200 INFO worker.py:1753 -- Connected to Ray cluster.
total number of nodes: 2
current rank: 0; execute cmd: ray start --head --port=23456 --node-ip-address=dlc1osj4eeqgn24e-master-0 --object-store-memory=42949672960 --system-config='{"worker_register_timeout_seconds": 600}'
Successfully created ray cluster!
2024-06-18 16:26:41,271 INFO worker.py:1568 -- Connecting to existing Ray cluster at address: dlc1osj4eeqgn24e-master-0:23456...
2024-06-18 16:26:41,280 INFO worker.py:1753 -- Connected to Ray cluster.
(raylet) It looks like you're creating a detached actor in an anonymous namespace. In order to access this actor in the future, you will need to explicitly connect to this namespace with ray.init(namespace="4b88bdbc-9522-478a-8723-c999135f12a4", ...)
INFO:root:Execute ray.shutdown before the program exits. Done ...
Rank: 0
2024-06-18 16:26:43,625 INFO worker.py:1568 -- Connecting to existing Ray cluster at address: dlc1osj4eeqgn24e-master-0:23456...
2024-06-18 16:26:43,632 INFO worker.py:1753 -- Connected to Ray cluster.
2024-06-18 16:26:44,791 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-06-18_16-26-34_265509_18/logs/ray-data
2024-06-18 16:26:44,791 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[MapBatches(FixUnicodeMapperBatches)->Write]
>>>>>> Number nodes: 2
- ReadJSON->SplitBlocks(2) 1: 0%| | 0/56 [00:00<?, ?it/s]
- MapBatches(FixUnicodeMapperBatches)->Write 2: 0%| | 0/56 [00:00<?, ?it/s]
Running 0: 0%| | 0/56 [00:00<?, ?it/s]
- ReadJSON->SplitBlocks(2): 56 active, 0 queued, [cpu: 56.0, objects: 14.0GB]: 0%| | 0/56 [00:06<?, ?it/s]
- ReadJSON->SplitBlocks(2): 56 active, 0 queued, [cpu: 56.0, objects: 14.0GB]: 2%|▏ | 1/56 [00:06<05:40, 6.19s/it]
Running: 56/256.0 CPU, 0/0.0 GPU, 4.8GB/56.6GB object_store_memory: 0%| | 0/56 [00:06<?, ?it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 2 active, 0 queued, [cpu: 2.0, objects: 512.0MB]: 0%| | 0/56 [00:06<?, ?it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 2 active, 0 queued, [cpu: 2.0, objects: 512.0MB]: 0%| | 0/112 [00:25<?, ?it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 2 active, 0 queued, [cpu: 2.0, objects: 512.0MB]: 1%| | 1/112 [00:25<47:43, 25.80s/it]
Running: 56/256.0 CPU, 0/0.0 GPU, 4.7GB/56.6GB object_store_memory: 0%| | 0/56 [00:25<?, ?it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 1 active, 0 queued, [cpu: 1.0, objects: 187.0B]: 1%| | 1/112 [00:33<47:43, 25.80s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 1 active, 0 queued, [cpu: 1.0, objects: 187.0B]: 2%|▏ | 2/112 [00:33<27:52, 15.21s/it]
Running: 55/256.0 CPU, 0/0.0 GPU, 4.6GB/56.6GB object_store_memory: 0%| | 0/56 [00:33<?, ?it/s]
Running: 55/256.0 CPU, 0/0.0 GPU, 4.6GB/56.6GB object_store_memory: 0%| | 0/112 [00:33<?, ?it/s]
Running: 55/256.0 CPU, 0/0.0 GPU, 4.6GB/56.6GB object_store_memory: 1%| | 1/112 [00:33<1:02:08, 33.59s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 4.6GB]: 2%|▏ | 1/56 [00:33<05:40, 6.19s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 4.6GB]: 4%|▎ | 2/56 [00:33<16:48, 18.67s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 4.6GB]: 2%|▏ | 2/112 [10:30<34:13, 18.67s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 4.6GB]: 3%|▎ | 3/112 [10:30<8:33:24, 282.61s/it]
Running: 55/256.0 CPU, 0/0.0 GPU, 5.4GB/56.6GB object_store_memory: 1%| | 1/112 [10:30<1:02:08, 33.59s/it]
Running: 55/256.0 CPU, 0/0.0 GPU, 5.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:30<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 8.0GB]: 3%|▎ | 3/112 [10:31<8:33:24, 282.61s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 8.0GB]: 23%|██▎ | 26/112 [10:31<25:22, 17.70s/it]
Running: 78/256.0 CPU, 0/0.0 GPU, 8.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:31<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 28 active, 0 queued, [cpu: 28.0, objects: 5.1KB]: 2%|▏ | 2/112 [10:31<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 11.0GB]: 23%|██▎ | 26/112 [10:32<25:22, 17.70s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 11.0GB]: 63%|██████▎ | 71/112 [10:32<03:21, 4.91s/it]
Running: 123/256.0 CPU, 0/0.0 GPU, 11.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:32<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 73 active, 0 queued, [cpu: 73.0, objects: 13.3KB]: 2%|▏ | 2/112 [10:32<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 13.5GB]: 63%|██████▎ | 71/112 [10:33<03:21, 4.91s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 13.5GB]: 100%|██████████| 112/112 [10:33<00:00, 2.54s/it]
Running: 164/256.0 CPU, 0/0.0 GPU, 13.8GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:33<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 113 active, 0 queued, [cpu: 113.0, objects: 20.6KB]: 2%|▏ | 2/112 [10:33<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 16.4GB]: 100%|██████████| 112/112 [10:34<00:00, 2.54s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 16.4GB]: 71%|███████▏ | 112/157 [10:34<01:54, 2.54s/it]
- ReadJSON->SplitBlocks(2): 55 active, 0 queued, [cpu: 55.0, objects: 16.4GB]: 100%|██████████| 157/157 [10:34<00:00, 1.45s/it]
Running: 209/256.0 CPU, 0/0.0 GPU, 16.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:34<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 157 active, 0 queued, [cpu: 157.0, objects: 28.7KB]: 2%|▏ | 2/112 [10:34<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 54 active, 0 queued, [cpu: 54.0, objects: 19.4GB]: 100%|██████████| 157/157 [10:35<00:00, 1.45s/it]
- ReadJSON->SplitBlocks(2): 54 active, 0 queued, [cpu: 54.0, objects: 19.4GB]: 31%|███ | 157/504 [10:35<08:24, 1.45s/it]
- ReadJSON->SplitBlocks(2): 54 active, 0 queued, [cpu: 54.0, objects: 19.4GB]: 41%|████ | 206/504 [10:35<04:21, 1.14it/s]
Running: 246/256.0 CPU, 0/0.0 GPU, 19.7GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:35<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 18 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:35<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 51 active, 0 queued, [cpu: 51.0, objects: 21.6GB]: 41%|████ | 206/504 [10:36<04:21, 1.14it/s]
- ReadJSON->SplitBlocks(2): 51 active, 0 queued, [cpu: 51.0, objects: 21.6GB]: 27%|██▋ | 206/765 [10:36<08:10, 1.14it/s]
- ReadJSON->SplitBlocks(2): 51 active, 0 queued, [cpu: 51.0, objects: 21.6GB]: 33%|███▎ | 249/765 [10:36<05:05, 1.69it/s]
Running: 242/256.0 CPU, 0/0.0 GPU, 21.9GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:36<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 60 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:36<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 46 active, 0 queued, [cpu: 46.0, objects: 25.6GB]: 33%|███▎ | 249/765 [10:38<05:05, 1.69it/s]
- ReadJSON->SplitBlocks(2): 46 active, 0 queued, [cpu: 46.0, objects: 25.6GB]: 30%|███ | 249/825 [10:38<05:40, 1.69it/s]
- ReadJSON->SplitBlocks(2): 46 active, 0 queued, [cpu: 46.0, objects: 25.6GB]: 39%|███▉ | 321/825 [10:38<02:52, 2.93it/s]
Running: 237/256.0 CPU, 0/0.0 GPU, 27.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:38<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 158 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:38<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 40 active, 0 queued, [cpu: 40.0, objects: 30.2GB]: 39%|███▉ | 321/825 [10:39<02:52, 2.93it/s]
- ReadJSON->SplitBlocks(2): 40 active, 0 queued, [cpu: 40.0, objects: 30.2GB]: 38%|███▊ | 321/852 [10:39<03:01, 2.93it/s]
- ReadJSON->SplitBlocks(2): 40 active, 0 queued, [cpu: 40.0, objects: 30.2GB]: 48%|████▊ | 407/852 [10:39<01:28, 5.01it/s]
Running: 230/256.0 CPU, 0/0.0 GPU, 30.3GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:39<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 217 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:39<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 32.5GB]: 48%|████▊ | 407/852 [10:40<01:28, 5.01it/s]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 32.5GB]: 48%|████▊ | 407/855 [10:40<01:29, 5.01it/s]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 32.5GB]: 52%|█████▏ | 448/855 [10:40<01:05, 6.25it/s]
Running: 229/256.0 CPU, 0/0.0 GPU, 32.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:40<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 255 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:40<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 33.3GB]: 52%|█████▏ | 448/855 [10:41<01:05, 6.25it/s]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 33.3GB]: 54%|█████▍ | 462/855 [10:41<00:59, 6.60it/s]
Running: 229/256.0 CPU, 0/0.0 GPU, 33.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:41<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 268 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:41<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 35.7GB]: 54%|█████▍ | 462/855 [10:43<00:59, 6.60it/s]
- ReadJSON->SplitBlocks(2): 37 active, 0 queued, [cpu: 37.0, objects: 35.7GB]: 58%|█████▊ | 499/855 [10:43<00:42, 8.46it/s]
Running: 229/256.0 CPU, 0/0.0 GPU, 35.9GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:43<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 307 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:43<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 35 active, 0 queued, [cpu: 35.0, objects: 36.8GB]: 58%|█████▊ | 499/855 [10:44<00:42, 8.46it/s]
- ReadJSON->SplitBlocks(2): 35 active, 0 queued, [cpu: 35.0, objects: 36.8GB]: 58%|█████▊ | 499/859 [10:44<00:42, 8.46it/s]
- ReadJSON->SplitBlocks(2): 35 active, 0 queued, [cpu: 35.0, objects: 36.8GB]: 61%|██████ | 522/859 [10:44<00:35, 9.53it/s]
Running: 227/256.0 CPU, 0/0.0 GPU, 37.0GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:44<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 330 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:44<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 35 active, 0 queued, [cpu: 35.0, objects: 38.0GB]: 61%|██████ | 522/859 [10:45<00:35, 9.53it/s]
- ReadJSON->SplitBlocks(2): 35 active, 0 queued, [cpu: 35.0, objects: 38.0GB]: 63%|██████▎ | 542/859 [10:45<00:30, 10.56it/s]
Running: 227/256.0 CPU, 0/0.0 GPU, 38.2GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:45<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 350 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:45<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 32 active, 0 queued, [cpu: 32.0, objects: 40.1GB]: 63%|██████▎ | 542/859 [10:46<00:30, 10.56it/s]
- ReadJSON->SplitBlocks(2): 32 active, 0 queued, [cpu: 32.0, objects: 40.1GB]: 63%|██████▎ | 542/867 [10:46<00:30, 10.56it/s]
- ReadJSON->SplitBlocks(2): 32 active, 0 queued, [cpu: 32.0, objects: 40.1GB]: 67%|██████▋ | 583/867 [10:46<00:19, 14.36it/s]
Running: 221/256.0 CPU, 0/0.0 GPU, 40.1GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:46<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 394 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:46<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 25 active, 0 queued, [cpu: 25.0, objects: 41.3GB]: 67%|██████▋ | 583/867 [10:47<00:19, 14.36it/s]
- ReadJSON->SplitBlocks(2): 25 active, 0 queued, [cpu: 25.0, objects: 41.3GB]: 67%|██████▋ | 583/871 [10:47<00:20, 14.36it/s]
- ReadJSON->SplitBlocks(2): 25 active, 0 queued, [cpu: 25.0, objects: 41.3GB]: 71%|███████ | 616/871 [10:47<00:14, 17.11it/s]
Running: 217/256.0 CPU, 0/0.0 GPU, 41.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:47<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 422 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:47<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 71%|███████ | 616/871 [10:48<00:14, 17.11it/s]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 71%|███████ | 616/872 [10:48<00:14, 17.11it/s]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 73%|███████▎ | 633/872 [10:48<00:14, 16.97it/s]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:48<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 440 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:48<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 73%|███████▎ | 633/872 [10:58<00:14, 16.97it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 440 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:58<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:58<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 73%|███████▎ | 633/872 [10:59<00:14, 16.97it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 440 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [10:59<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [10:59<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 73%|███████▎ | 633/872 [11:00<00:14, 16.97it/s]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 73%|███████▎ | 633/872 [11:04<00:14, 16.97it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 440 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:04<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:04<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.3GB]: 73%|███████▎ | 633/872 [11:04<00:14, 16.97it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 440 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:04<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.4GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:04<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.5GB]: 73%|███████▎ | 633/872 [11:08<00:14, 16.97it/s]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.5GB]: 73%|███████▎ | 636/872 [11:08<01:22, 2.88it/s]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:08<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 442 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:08<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.5GB]: 73%|███████▎ | 636/872 [11:09<01:22, 2.88it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 442 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:09<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:09<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.5GB]: 73%|███████▎ | 636/872 [11:12<01:22, 2.88it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 442 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:12<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:12<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.5GB]: 73%|███████▎ | 636/872 [11:14<01:22, 2.88it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 442 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:14<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:14<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.5GB]: 73%|███████▎ | 636/872 [11:17<01:22, 2.88it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 442 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:17<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:17<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 636/872 [11:18<01:22, 2.88it/s]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 637/872 [11:18<02:07, 1.84it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 443 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:18<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 637/872 [11:19<02:07, 1.84it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 443 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:19<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:19<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 637/872 [11:22<02:07, 1.84it/s]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ca81462ea7cfdb052e51a238a2f1ecfe9da54a4005000000 Worker ID: 5009ccfe85e45de9985c32d789ce8ec7b89158fd84ec3792d35103d5 Node ID: b6930932da50e875bc0f8ba8218273645900e7f883e90760f2a61903 Worker IP address: dlc1osj4eeqgn24e-master-0 Worker port: 10096 Worker PID: 614 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 1898e9be82f5b8030000bb6c3bef70048961261a05000000 Worker ID: 0c3de944b1f326355367f984d44d7731284ed4464befee1db0562254 Node ID: 236fb800c19951ebfb8f8dc08b437314de33adbdd1fe686172651829 Worker IP address: 10.224.140.93 Worker port: 10053 Worker PID: 340 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details:
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 1c2f33e21f3c467741fc034db2b86eb4d90eca2305000000 Worker ID: 8da65aad398663df6dddb0c1db2bd295227f2f7b76f1fcc2f91e633e Node ID: 236fb800c19951ebfb8f8dc08b437314de33adbdd1fe686172651829 Worker IP address: 10.224.140.93 Worker port: 10074 Worker PID: 306 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors. [repeated 15x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options...
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 31d0048f4ca93d0267e90d3e66f30fe683e2443105000000 Worker ID: ea11a50d05dfc2158e3f6cb1a08ee4f974c13deae571568d7ba3c087 Node ID: 236fb800c19951ebfb8f8dc08b437314de33adbdd1fe686172651829 Worker IP address: 10.224.140.93 Worker port: 10103 Worker PID: 329 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details:
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 521b871f4d04630700daa28974c886d3cfd29ff405000000 Worker ID: 346b4995580165fabcdf93a6fbfc51259578f6b3989cd68dbb16a418 Node ID: b6930932da50e875bc0f8ba8218273645900e7f883e90760f2a61903 Worker IP address: dlc1osj4eeqgn24e-master-0 Worker port: 10123 Worker PID: 597 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors. [repeated 16x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 0ec8590a4a9db1b84c41193d1d4133d02d58946805000000 Worker ID: fb428679009feeb8c81cea19725c038a123ce4fd1d4aa8cd02213335 Node ID: b6930932da50e875bc0f8ba8218273645900e7f883e90760f2a61903 Worker IP address: dlc1osj4eeqgn24e-master-0 Worker port: 10070 Worker PID: 567 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details:
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: f550d6c78b79b5acd4ebf48f6a82e312656378de05000000 Worker ID: 34933b605774c1d2ed9af15c0c3821893d0b907ad10e456d74a00bd7 Node ID: b6930932da50e875bc0f8ba8218273645900e7f883e90760f2a61903 Worker IP address: dlc1osj4eeqgn24e-master-0 Worker port: 10040 Worker PID: 536 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors. [repeated 16x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: d8e366fd65e7dc84455aaaf4fbe8efd1445830a905000000 Worker ID: 8a6511142d53b97d6f7b930f9674e5105a942b347fe1696c0c34ee41 Node ID: b6930932da50e875bc0f8ba8218273645900e7f883e90760f2a61903 Worker IP address: dlc1osj4eeqgn24e-master-0 Worker port: 10061 Worker PID: 581 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details: [repeated 3x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c5b01ca0967365e302409768a9bebba057bc1f2905000000 Worker ID: 563e92fcf2ac46fba07ea979445a30b899b15576da9b1d9a484c5108 Node ID: 236fb800c19951ebfb8f8dc08b437314de33adbdd1fe686172651829 Worker IP address: 10.224.140.93 Worker port: 10014 Worker PID: 288 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors. [repeated 14x across cluster]
(raylet) A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 76095d6883ed5c0e66d4a73000ed732e389a2d1e05000000 Worker ID: 2fc852c323d2a8e5f84bd79df57f0851608589577b740c147e752201 Node ID: b6930932da50e875bc0f8ba8218273645900e7f883e90760f2a61903 Worker IP address: dlc1osj4eeqgn24e-master-0 Worker port: 10035 Worker PID: 531 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details: [repeated 6x across cluster]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 443 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:22<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:22<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 637/872 [11:24<02:07, 1.84it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 443 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:24<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:24<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 637/872 [11:27<02:07, 1.84it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 443 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:27<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.6GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:27<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 638/872 [11:29<03:11, 1.22it/s]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.7GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:29<11:08:52, 364.84s/it]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 444 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:29<27:52, 15.21s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 638/872 [11:29<03:11, 1.22it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 444 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:29<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.7GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:29<11:08:52, 364.84s/it]
- ReadJSON->SplitBlocks(2): 24 active, 0 queued, [cpu: 24.0, objects: 42.6GB]: 73%|███████▎ | 638/872 [11:32<03:11, 1.22it/s]
- MapBatches(FixUnicodeMapperBatches)->Write: 192 active, 444 queued, [cpu: 192.0, objects: 35.1KB]: 2%|▏ | 2/112 [11:33<27:52, 15.21s/it]
Running: 216/256.0 CPU, 0/0.0 GPU, 42.7GB/56.6GB object_store_memory: 2%|▏ | 2/112 [11:33<11:08:52, 364.84s/it]
Hi, your plot is very useful! Thanks @Cathy0908 Can you make it into a actor instead of task so it will not release the resource after task finished to reduce the task init time. After the test above if the cpu usage is still not very high, can you change the resource requirements ,i.e. cpu=0.5 or 0.25 so it will have higher parallel(concurrency).
@Bye-legumes Thank you for your suggestions. I changed to actor and tried several sets of parameters, but it didn't seem to work. Later I tried to increase the machine's memory to 512G. It was obviously improved, the performance has reached my expectations, and it does not kill workers frequently.
However, it seems that when the memory reaches a certain proportion, the performance seems not very good , unless there is enough memory
import ftfy
import time
import ray
from ray.data import read_json
from ray.data.context import DatasetContext
DatasetContext.max_errored_blocks = 2
class FixUnicodeMapperActor:
def __init__(self):
self.text_key = 'content'
self.normalization = 'NFKC'
def __call__(self, samples):
normalize_text = list(map(
lambda t: ftfy.fix_text(t, normalization=self.normalization), samples[self.text_key]))
samples[self.text_key] = normalize_text
return samples
if __name__ == "__main__":
ray.init()
num_nodes = ray.nodes()
print(">>>>>> Number nodes:", len(num_nodes))
data_path = "/mnt/data/json_parts_500w_v4/"
export_path = '/mnt/data/outputs/actor_v4/'
s_time = time.time()
dataset = read_json(data_path)
res_datasets = dataset.map_batches(
FixUnicodeMapperActor,
batch_size=1024*2,
num_cpus=0.25,
concurrency=1024
)
res_datasets.write_json(export_path, force_ascii=False)
print('cost time: ', time.time() - s_time)
ray.shutdown()
```python
Later I tried to increase the machine's memory to 512G. It was obviously improved
Great to hear! So based on the above discussion / screenshots, it looks like the main issue is that Ray is not dedicating enough memory to the Ray Data tasks. @Cathy0908 What commands were you using to adjust the heap memory / object store capacity / etc?
@scottjlee I think the main friction here is that currently there's no way to adjust heap and preallocated object store ratio dynamically, which in many cases user simply goes by the default value and it just sits there taking up the available heap. @Bye-legumes and I are thinking of adding such a capability to the plasma store, so that it could be scaled elastically to make room for heap (also scale up), and such a poor performance can be automatically mitigated in many use cases. What do you think? The users would specify a lower-upper bound of the object store instead of at a fixed number.
Yes, that sounds like it could work. Please keep us updated with a PR, or if you have any questions. Thanks!
@scottjlee can you assign a priority please?