data-juicer icon indicating copy to clipboard operation
data-juicer copied to clipboard

[Bug]: Ray connection fails with remote cluster - cfg parameter not passed during initialization

Open kyo-tom opened this issue 2 months ago • 4 comments

Before Reporting 报告之前

  • [x] I have pulled the latest code of main branch to run again and the bug still existed. 我已经拉取了主分支上最新的代码,重新运行之后,问题仍不能解决。

  • [x] I have read the README carefully and no error occurred during the installation process. (Otherwise, we recommend that you can ask a question using the Question template) 我已经仔细阅读了 README 上的操作指引,并且在安装过程中没有错误发生。(否则,我们建议您使用Question模板向我们进行提问)

Search before reporting 先搜索,再报告

  • [x] I have searched the Data-Juicer issues and found no similar bugs. 我已经在 issue列表 中搜索但是没有发现类似的bug报告。

OS 系统

Macos

Installation Method 安装方式

source

Data-Juicer Version Data-Juicer版本

latest

Python Version Python版本

3.12.12

Describe the bug 描述这个bug

The cfg parameter wasn't being passed through the initialization chain, causing Ray to use address='auto' (looking for a local Ray instance) instead of your configured remote Ray address (ray://k8s01:10001). Here is the stack:

(dj) kyotom@celebi data-juicer % python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml                            
2025-11-06 11:34:58.135 | INFO     | data_juicer.ops:timing_context:12 - Importing operator modules took 4.01 seconds
2025-11-06 11:34:59.638 | INFO     | data_juicer.config.config:660 - dataset_path config is set and a valid local path
2025-11-06 11:34:59.989 | WARNING  | data_juicer.utils.ray_utils:30 - No ray config provided, using default ray address 'auto'.
2025-11-06 11:35:00.018 | ERROR    | __main__:36 - An error has been caught in function '<module>', process 'MainProcess' (29563), thread 'MainThread' (8441814784):
Traceback (most recent call last):

> File "/Users/kyotom/Program/pycharm/data-juicer/tools/process_data.py", line 36, in <module>
    main()
    └ <function main at 0x3283b3e20>

  File "/Users/kyotom/Program/pycharm/data-juicer/tools/process_data.py", line 21, in main
    cfg = init_configs()
          └ <function init_configs at 0x30a88d8a0>

  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/config/config.py", line 588, in init_configs
    cfg = init_setup_from_cfg(cfg, load_configs_only)
          │                   │    └ False
          │                   └ Namespace(config=[Path_fr(demos/process_on_ray/configs/demo.yaml, cwd=/Users/kyotom/Program/pycharm/data-juicer)], auto=False...
          └ <function init_setup_from_cfg at 0x30ac33d80>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/config/config.py", line 676, in init_setup_from_cfg
    sys_cpu_count = cpu_count()
                    └ <function cpu_count at 0x108237ec0>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/utils/resource_utils.py", line 100, in cpu_count
    if check_and_initialize_ray():
       └ <function check_and_initialize_ray at 0x108237880>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/utils/ray_utils.py", line 40, in check_and_initialize_ray
    initialize_ray(cfg)
    │              └ None
    └ <function initialize_ray at 0x108236b60>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/utils/ray_utils.py", line 35, in initialize_ray
    ray.init(ray_address, ignore_reinit_error=True, runtime_env=runtime_env)
    │   │    │                                                  └ {'env_vars': {'RAY_JOB': '1'}}
    │   │    └ 'auto'
    │   └ <function init at 0x328b7b600>
    └ <module 'ray' from '/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/__init__.py'>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
           │     │       └ {'ignore_reinit_error': True, 'runtime_env': {'env_vars': {'RAY_JOB': '1'}}}
           │     └ ('auto',)
           └ <function init at 0x328b7b420>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/worker.py", line 1720, in init
    bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
                        │        │                              │        └ None
                        │        │                              └ 'auto'
                        │        └ <function canonicalize_bootstrap_address at 0x16bfcb060>
                        └ <module 'ray._private.services' from '/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/services.py'>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/services.py", line 529, in canonicalize_bootstrap_address
    addr = get_ray_address_from_environment(addr, temp_dir)
           │                                │     └ None
           │                                └ 'auto'
           └ <function get_ray_address_from_environment at 0x16bfcac00>
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/services.py", line 410, in get_ray_address_from_environment
    raise ConnectionError(

ConnectionError: Could not find any running Ray instance. Please specify the one to connect to by setting `--address` flag or `RAY_ADDRESS` environment variable.
Traceback (most recent call last):
  File "/Users/kyotom/Program/pycharm/data-juicer/tools/process_data.py", line 36, in <module>
    main()
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/loguru/_logger.py", line 1297, in catch_wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyotom/Program/pycharm/data-juicer/tools/process_data.py", line 21, in main
    cfg = init_configs()
          ^^^^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/config/config.py", line 588, in init_configs
    cfg = init_setup_from_cfg(cfg, load_configs_only)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/config/config.py", line 676, in init_setup_from_cfg
    sys_cpu_count = cpu_count()
                    ^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/utils/resource_utils.py", line 100, in cpu_count
    if check_and_initialize_ray():
       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/utils/ray_utils.py", line 40, in check_and_initialize_ray
    initialize_ray(cfg)
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/data_juicer/utils/ray_utils.py", line 35, in initialize_ray
    ray.init(ray_address, ignore_reinit_error=True, runtime_env=runtime_env)
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/worker.py", line 1720, in init
    bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/services.py", line 529, in canonicalize_bootstrap_address
    addr = get_ray_address_from_environment(addr, temp_dir)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kyotom/miniconda3/envs/dj/lib/python3.12/site-packages/ray/_private/services.py", line 410, in get_ray_address_from_environment
    raise ConnectionError(
ConnectionError: Could not find any running Ray instance. Please specify the one to connect to by setting `--address` flag or `RAY_ADDRESS` environment variable.

To Reproduce 如何复现

  1. Clone the source code
  2. Change the ray_address in data-juicer/demos/process_on_ray/configs/demo.yaml
executor_type: 'ray'
ray_address: 'ray://remote-host:10001'
  1. Start data-juicer on ray by exec python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml

Configs 配置信息

# Process config example for dataset

# global parameters
project_name: 'ray-demo'
dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl'  # path to your dataset directory or file
export_path: './outputs/demo/demo-processed'

executor_type: 'ray'
ray_address: 'ray://k8s01:10001'                     # change to your ray cluster address, e.g., ray://<hostname>:<port>

# process schedule
# a list of several process operators with their arguments
process:
  # Filter ops
  - alphanumeric_filter:                                    # filter text with alphabet/numeric ratio out of specific range.
      tokenization: false                                     # Whether to count the ratio of alphanumeric to the total number of tokens.
      min_ratio: 0.0                                          # the min ratio of filter range
      max_ratio: 0.9                                          # the max ratio of filter range
  - average_line_length_filter:                             # filter text with the average length of lines out of specific range.
      min_len: 10                                             # the min length of filter range
      max_len: 10000                                          # the max length of filter range
  - character_repetition_filter:                            # filter text with the character repetition ratio out of specific range
      rep_len: 10                                             # repetition length for char-level n-gram
      min_ratio: 0.0                                          # the min ratio of filter range
      max_ratio: 0.5                                          # the max ratio of filter range
  - flagged_words_filter:                                   # filter text with the flagged-word ratio larger than a specific max value
      lang: en                                                # consider flagged words in what language
      tokenization: false                                     # whether to use model to tokenize documents
      max_ratio: 0.0045                                       # the max ratio to filter text
      flagged_words_dir: ./assets                             # directory to store flagged words dictionaries
      use_words_aug: false                                    # whether to augment words, especially for Chinese and Vietnamese
      words_aug_group_sizes: [2]                              # the group size of words to augment
      words_aug_join_char: ""                                 # the join char between words to augment
  - language_id_score_filter:                               # filter text in specific language with language scores larger than a specific max value
      lang: en                                                # keep text in what language
      min_score: 0.8                                          # the min language scores to filter text
  - maximum_line_length_filter:                             # filter text with the maximum length of lines out of specific range
      min_len: 10                                             # the min length of filter range
      max_len: 10000                                          # the max length of filter range
  - perplexity_filter:                                      # filter text with perplexity score out of specific range
      lang: en                                                # compute perplexity in what language
      max_ppl: 1500                                           # the max perplexity score to filter text
  - special_characters_filter:                              # filter text with special-char ratio out of specific range
      min_ratio: 0.0                                          # the min ratio of filter range
      max_ratio: 0.25                                         # the max ratio of filter range
  - stopwords_filter:                                       # filter text with stopword ratio smaller than a specific min value
      lang: en                                                # consider stopwords in what language
      tokenization: false                                     # whether to use model to tokenize documents
      min_ratio: 0.3                                          # the min ratio to filter text
      stopwords_dir: ./assets                                 # directory to store stopwords dictionaries
      use_words_aug: false                                    # whether to augment words, especially for Chinese and Vietnamese
      words_aug_group_sizes: [2]                              # the group size of words to augment
      words_aug_join_char: ""                                 # the join char between words to augment
  - text_length_filter:                                     # filter text with length out of specific range
      min_len: 10                                             # the min length of filter range
      max_len: 10000                                          # the max length of filter range
  - words_num_filter:                                       # filter text with number of words out of specific range
      lang: en                                                # sample in which language
      tokenization: false                                     # whether to use model to tokenize documents
      min_num: 10                                             # the min number of filter range
      max_num: 10000                                          # the max number of filter range
  - word_repetition_filter:                                 # filter text with the word repetition ratio out of specific range
      lang: en                                                # sample in which language
      tokenization: false                                     # whether to use model to tokenize documents
      rep_len: 10                                             # repetition length for word-level n-gram
      min_ratio: 0.0                                          # the min ratio of filter range
      max_ratio: 0.5                                          # the max ratio of filter range

Logs 报错日志

No response

Screenshots 截图

No response

Additional 额外信息

No response

kyo-tom avatar Nov 06 '25 03:11 kyo-tom

Hi @kyo-tom , thanks for using data-juicer!

Currently, data-juicer supports starting distributed processing on the head node, but it does not support submitting data to the remote cluster for distributed processing, which is caused by some mechanism of ray.data.

HYLcool avatar Nov 06 '25 06:11 HYLcool

Hi @kyo-tom , thanks for using data-juicer!

Currently, data-juicer supports starting distributed processing on the head node, but it does not support submitting data to the remote cluster for distributed processing, which is caused by some mechanism of ray.data.

First of all, thank you for your reply. I didn't notice anywhere in the documentation that it "not support submitting data to the remote cluster," which led me to believe that submitting to the remote cluster was a viable approach. Anyway, could you explain what "some mechanism" refers to? By the way, after making some modifications locally, I tried submitting a simple text processing task to remote ray cluster, and it ran successfully and gave the correct results.

kyo-tom avatar Nov 06 '25 07:11 kyo-tom

Hi @kyo-tom , sorry for the late reply.

For now, data-juicer implemented a customized data source for reading json streamingly, which has to require ray==2.47.1. With this version, an error from ray.data, "AttributeError: 'Worker' object has no attribute 'core_worker'", occurs when connecting to a remote cluster. We found several related issues said that "Ray client does not work with Ray Data now" for your reference. https://github.com/ray-project/ray/issues/42739 https://github.com/ray-project/ray/issues/48682 https://github.com/ray-project/ray/issues/47759

Maybe latest ray has resolved it. We will test for them and try to update the customized data source to make it compatible with later versions of ray. And we will update our documents to make it clearer. If you have any good ideas on it, we will appreciate it if you can contribute to data-juicer. Thanks for your suggestion again!

HYLcool avatar Nov 10 '25 13:11 HYLcool

Hi @kyo-tom , sorry for the late reply.

For now, data-juicer implemented a customized data source for reading json streamingly, which has to require ray==2.47.1. With this version, an error from ray.data, "AttributeError: 'Worker' object has no attribute 'core_worker'", occurs when connecting to a remote cluster. We found several related issues said that "Ray client does not work with Ray Data now" for your reference. ray-project/ray#42739 ray-project/ray#48682 ray-project/ray#47759

Maybe latest ray has resolved it. We will test for them and try to update the customized data source to make it compatible with later versions of ray. And we will update our documents to make it clearer. If you have any good ideas on it, we will appreciate it if you can contribute to data-juicer. Thanks for your suggestion again!

After carefully reading the relevant Ray issues, I understand that it does not support submitting data to the remote cluster for distributed processing, which is caused by some mechanism of ray.data. However, the current problem is that data-juicer is not passing the ray head address, which causes it to not be able to find the ray instance. Even if ray fixes those issues in the future, data-juicer will still not be able to find the ray instance. I believe this is a real problem with data-juicer.

kyo-tom avatar Nov 11 '25 01:11 kyo-tom