LightGBM icon indicating copy to clipboard operation
LightGBM copied to clipboard

Segfaulting issue when running distributed LightGBM on a large dataset

Open dthiagarajan opened this issue 2 years ago • 8 comments

Hello LightGBM community, I'm running distributed training for LightGBM with a custom way of setting up the distributed environment (via MPI), but I'm running into a segfault that only happens when I use a large enough number of nodes, e.g. 300. I've pasted the script I used to reproduce the issue with dummy data, and I'm running on LightGBM version 3.3.5. In the script, I create a pre-partitioned dataset that is proportional in size to the number of nodes.

import time
import random
from sklearn.datasets import make_regression
import traceback
import os
import torch
import socket
import sys
import numpy as np

from lightgbm import LGBMRegressor
import logging
from typing import Optional

logger = logging.getLogger(__name__)

# Number of samples per node I have for my internal dataset
SINGLE_NODE_SAMPLES = 11714612
MODEL_PARAMS = {
    "force_col_wise": True,
    "verbosity": 2,
    "tree_learner": "data",
    "n_estimators": 100,
    "num_leaves": 2048,
    "max_depth": 11,
    "histogram_pool_size": 16384,
}
class ProgressCallback:
    def __init__(self) -> None:
        super().__init__()
        self.epoch_start_time = time.time()

    def __call__(self, env) -> None:
        logger.info(
            f"Epoch {env.iteration} finished in {time.time() - self.epoch_start_time:.3f} seconds."
        )
        self.epoch_start_time = time.time()

def seed_everything():
    random.seed(0)
    np.random.seed(0)

def find_free_network_port() -> int:
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(("", 0))
    port = s.getsockname()[1]
    s.close()
    return port

def get_host_ip(host_ip: Optional[str] = None) -> str:
    if host_ip == "dns":
        result = socket.getfqdn()
    elif host_ip is None or host_ip in ["auto", "ip"]:
        try:
            result = socket.gethostbyname(socket.getfqdn())
        except socket.gaierror:
            logger.warning("gethostbyname(socket.getfqdn()) failed... trying on hostname()")
            result = socket.gethostbyname(socket.gethostname())
        if result.startswith("127."):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("10.255.255.255", 1))
            result = s.getsockname()[0]

    assert result is not None
    return result

def setup_environment():
    torch.distributed.init_process_group(backend="mpi")
    local_rank, local_world_size, rank, world_size = (
        int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]),
        int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]),
        int(os.environ["OMPI_COMM_WORLD_RANK"]),
        int(os.environ["OMPI_COMM_WORLD_SIZE"]),
    )

    host_ip_list = [None for _ in range(world_size)]
    torch.distributed.all_gather_object(host_ip_list, get_host_ip())
    port_list = [None for _ in range(world_size)]
    port = find_free_network_port()
    torch.distributed.all_gather_object(port_list, port)
    machines = ",".join([f"{host_ip}:{port}" for host_ip, port in zip(host_ip_list, port_list)])
    local_listen_port = port
    logger.addHandler(logging.StreamHandler(sys.stdout))
    for plogger in [logging.getLogger(name) for name in logging.root.manager.loggerDict]:  # type: ignore[attr-defined]
        plogger.setLevel(level=logging.INFO if rank == 0 else logging.ERROR)

    return local_rank, local_world_size, rank, world_size, machines, local_listen_port

def main(
    local_rank, local_world_size, rank, world_size, machines: str, local_listen_port: int
) -> None:
    logger.info(f"Constructing the dataset of {SINGLE_NODE_SAMPLES * world_size} examples...")
    # Create a dataset of size proportional to the number of nodes, i.e. `world_size`
    train_x, train_y = make_regression(
        n_samples=SINGLE_NODE_SAMPLES, n_features=960, random_state=local_rank
    )
    logger.info(f"Constructing progress callback...")
    progress_cb = ProgressCallback()
    cbs = [progress_cb]
    model = LGBMRegressor(
        pre_partition=True,
        machines=machines,
        local_listen_port=local_listen_port,
        num_machines=world_size,
        **MODEL_PARAMS,
    )
    logger.info(f"Starting fit for model on {len(train_x)} examples...")
    model.fit(
        train_x,
        train_y,
        sample_weight=np.ones_like(train_y),
        callbacks=cbs,
    )

if __name__ == "__main__":
    seed_everything()
    (
        local_rank,
        local_world_size,
        rank,
        world_size,
        machines,
        local_listen_port,
    ) = setup_environment()
    print("Environment has been set up.")
    exception = None
    try:
       main(local_rank, local_world_size, rank, world_size, machines, local_listen_port)
    except Exception as e:
       exception = e
       logger.error("".join(traceback.format_exception(type(exception), exception, exception.__traceback__)))
   finally:
       torch.distributed.destroy_process_group()
       if exception:
          raise exception

To run, I do:

MKL_CBWR=COMPATIBLE MKL_NUM_THREADS=8 mpirun --map-by ppr:1:node:pe=28 --bind-to core --report-bindings --mca orte_base_help_aggregate 0 --mca routed direct python3 train.py

The segfaulting error I see (on all nodes) is:

Caught signal 11 (Segmentation fault: address not mapped to object at address (nil))
==== backtrace (tid:  29334) ====
 0 0x00000000000534e9 ucs_debug_print_backtrace()  ???:0
 1 0x0000000000012ce0 __funlockfile()  :0
 2 0x0000000000262213 LightGBM::Network::AllgatherBruck()  
 3 0x00000000002648ff LightGBM::Network::AllreduceByAllGather()  ???:0
 4 0x00000000001d186a LightGBM::DatasetLoader::ConstructFromSampleData()  ???:0
 5 0x00000000003b1211 LGBM_DatasetCreateFromMats()  ???:0
 6 0x00000000003b175c LGBM_DatasetCreateFromMat()  ???:0
 7 0x000000000000a6b4 ffi_call_unix64()  ???:0
 8 0x000000000000a0d1 ffi_call()  ???:0
 9 0x000000000000f8e5 PyCData_set()  ???:0
10 0x0000000000014049 _ctypes_callproc()  ???:0
11 0x000000000003aca3 __libc_start_main()  ???:0

Other notes:

  • The segfaulting issue occurs when I use a large number of nodes, e.g. 300, but not when I use a smaller number of nodes, e.g. 100, 50, 20, etc.
  • When the dataset is highly similar across the nodes, i.e. when I make the dummy dataset without specifying the random_state corresponding to the local rank, then the script does not segfault.

Let me know if there's any other info I can provide to help debug this. Thanks in advance!

dthiagarajan avatar May 24 '23 17:05 dthiagarajan

Hi @jameslamb, I saw you had changed the tags - is there any other info I can provide here to help assess what might be the issue? I've tried a couple other things as well, including explicitly setting the number of threads to the number of cores I have on each node that I'm running on.

dthiagarajan avatar Jun 01 '23 16:06 dthiagarajan

It would be helpful if you could:

  • share some logs from training so it's possible to narrow down where in the training process failures are occurring
  • explain why you're doing all this custom setup involving torch instead of using lightgbm.dask or running the LightGBM CLI as an MPI application like https://lightgbm.readthedocs.io/en/v3.3.5/Parallel-Learning-Guide.html#id2
  • mention ~what version of LightGBM you're using and~ exactly how you built + installed lightgbm

jameslamb avatar Jun 01 '23 16:06 jameslamb

Thanks! I can't upload logs due to company policy, but I will try to summarize what's in the logs. I also found that the script works when I change the tree_learner to voting from data and when I add num_threads explicitly to be the core count on each node (I have 1 worker per node, and 300 total nodes).

Environment has been set up.  # (logged 300 times)
Constructing the dataset of 3514383600 examples...
[LightGBM] [Info] Connected to rank 0  # (logged 298 times)
[LightGBM] [Info] Connected to rank 1  # (logged 298 times)
...
[LightGBM] [Info] Connected to rank 299  # (logged 298 times)
[LightGBM] [Warning] num_threads is set=28, n_jobs=-1 will be ignored. Current value: num_threads=28  # (logged 299 times)
[LightGBM] [Warning] Histogram LRU queue was enabled (histogram_pool_size=16384.000000).  # (logged 299 times when this parameter is set)
Will disable this to reduce communication costs  # not sure if this is associated with the histogram pool size? 

The reason that I'm using this custom setup is because I have a generalization for distributed environments that spans XGBoost, LightGBM, and Torch, and I have a different CLI I'm using for submitting training jobs that are potentially distributed, so I don't have a good way of using the CLI.

We built LightGBM using the default settings, so I think MPI is not supported (which would mean it's using socket (https://github.com/microsoft/LightGBM/blob/master/src/network/linkers_socket.cpp) for distributed, right?).

dthiagarajan avatar Jun 01 '23 17:06 dthiagarajan

@dthiagarajan It seems there are some problems when constructing Dataset distributedly. Can you provide the number of columns (features) of the used dataset?

guolinke avatar Jun 02 '23 09:06 guolinke

Yep, I'm using 960 features.

dthiagarajan avatar Jun 02 '23 14:06 dthiagarajan

I also found that the script works when I change the tree_learner to voting from data and when I add num_threads explicitly to be the core count on each node (I have 1 worker per node, and 300 total nodes).

What happens if only add num_threads explicitly?

guolinke avatar Jun 05 '23 02:06 guolinke

If I specify num_threads explicitly and use tree_learner = 'data', then I see same segfault error mentioned above.

dthiagarajan avatar Jun 05 '23 15:06 dthiagarajan

Hi i am facing a similar segmentation issue with 250 features, has there been a resolution here?

PCIHD avatar Dec 05 '23 15:12 PCIHD