Segfaulting issue when running distributed LightGBM on a large dataset
Hello LightGBM community, I'm running distributed training for LightGBM with a custom way of setting up the distributed environment (via MPI), but I'm running into a segfault that only happens when I use a large enough number of nodes, e.g. 300. I've pasted the script I used to reproduce the issue with dummy data, and I'm running on LightGBM version 3.3.5. In the script, I create a pre-partitioned dataset that is proportional in size to the number of nodes.
import time
import random
from sklearn.datasets import make_regression
import traceback
import os
import torch
import socket
import sys
import numpy as np
from lightgbm import LGBMRegressor
import logging
from typing import Optional
logger = logging.getLogger(__name__)
# Number of samples per node I have for my internal dataset
SINGLE_NODE_SAMPLES = 11714612
MODEL_PARAMS = {
"force_col_wise": True,
"verbosity": 2,
"tree_learner": "data",
"n_estimators": 100,
"num_leaves": 2048,
"max_depth": 11,
"histogram_pool_size": 16384,
}
class ProgressCallback:
def __init__(self) -> None:
super().__init__()
self.epoch_start_time = time.time()
def __call__(self, env) -> None:
logger.info(
f"Epoch {env.iteration} finished in {time.time() - self.epoch_start_time:.3f} seconds."
)
self.epoch_start_time = time.time()
def seed_everything():
random.seed(0)
np.random.seed(0)
def find_free_network_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
port = s.getsockname()[1]
s.close()
return port
def get_host_ip(host_ip: Optional[str] = None) -> str:
if host_ip == "dns":
result = socket.getfqdn()
elif host_ip is None or host_ip in ["auto", "ip"]:
try:
result = socket.gethostbyname(socket.getfqdn())
except socket.gaierror:
logger.warning("gethostbyname(socket.getfqdn()) failed... trying on hostname()")
result = socket.gethostbyname(socket.gethostname())
if result.startswith("127."):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("10.255.255.255", 1))
result = s.getsockname()[0]
assert result is not None
return result
def setup_environment():
torch.distributed.init_process_group(backend="mpi")
local_rank, local_world_size, rank, world_size = (
int(os.environ["OMPI_COMM_WORLD_LOCAL_RANK"]),
int(os.environ["OMPI_COMM_WORLD_LOCAL_SIZE"]),
int(os.environ["OMPI_COMM_WORLD_RANK"]),
int(os.environ["OMPI_COMM_WORLD_SIZE"]),
)
host_ip_list = [None for _ in range(world_size)]
torch.distributed.all_gather_object(host_ip_list, get_host_ip())
port_list = [None for _ in range(world_size)]
port = find_free_network_port()
torch.distributed.all_gather_object(port_list, port)
machines = ",".join([f"{host_ip}:{port}" for host_ip, port in zip(host_ip_list, port_list)])
local_listen_port = port
logger.addHandler(logging.StreamHandler(sys.stdout))
for plogger in [logging.getLogger(name) for name in logging.root.manager.loggerDict]: # type: ignore[attr-defined]
plogger.setLevel(level=logging.INFO if rank == 0 else logging.ERROR)
return local_rank, local_world_size, rank, world_size, machines, local_listen_port
def main(
local_rank, local_world_size, rank, world_size, machines: str, local_listen_port: int
) -> None:
logger.info(f"Constructing the dataset of {SINGLE_NODE_SAMPLES * world_size} examples...")
# Create a dataset of size proportional to the number of nodes, i.e. `world_size`
train_x, train_y = make_regression(
n_samples=SINGLE_NODE_SAMPLES, n_features=960, random_state=local_rank
)
logger.info(f"Constructing progress callback...")
progress_cb = ProgressCallback()
cbs = [progress_cb]
model = LGBMRegressor(
pre_partition=True,
machines=machines,
local_listen_port=local_listen_port,
num_machines=world_size,
**MODEL_PARAMS,
)
logger.info(f"Starting fit for model on {len(train_x)} examples...")
model.fit(
train_x,
train_y,
sample_weight=np.ones_like(train_y),
callbacks=cbs,
)
if __name__ == "__main__":
seed_everything()
(
local_rank,
local_world_size,
rank,
world_size,
machines,
local_listen_port,
) = setup_environment()
print("Environment has been set up.")
exception = None
try:
main(local_rank, local_world_size, rank, world_size, machines, local_listen_port)
except Exception as e:
exception = e
logger.error("".join(traceback.format_exception(type(exception), exception, exception.__traceback__)))
finally:
torch.distributed.destroy_process_group()
if exception:
raise exception
To run, I do:
MKL_CBWR=COMPATIBLE MKL_NUM_THREADS=8 mpirun --map-by ppr:1:node:pe=28 --bind-to core --report-bindings --mca orte_base_help_aggregate 0 --mca routed direct python3 train.py
The segfaulting error I see (on all nodes) is:
Caught signal 11 (Segmentation fault: address not mapped to object at address (nil))
==== backtrace (tid: 29334) ====
0 0x00000000000534e9 ucs_debug_print_backtrace() ???:0
1 0x0000000000012ce0 __funlockfile() :0
2 0x0000000000262213 LightGBM::Network::AllgatherBruck()
3 0x00000000002648ff LightGBM::Network::AllreduceByAllGather() ???:0
4 0x00000000001d186a LightGBM::DatasetLoader::ConstructFromSampleData() ???:0
5 0x00000000003b1211 LGBM_DatasetCreateFromMats() ???:0
6 0x00000000003b175c LGBM_DatasetCreateFromMat() ???:0
7 0x000000000000a6b4 ffi_call_unix64() ???:0
8 0x000000000000a0d1 ffi_call() ???:0
9 0x000000000000f8e5 PyCData_set() ???:0
10 0x0000000000014049 _ctypes_callproc() ???:0
11 0x000000000003aca3 __libc_start_main() ???:0
Other notes:
- The segfaulting issue occurs when I use a large number of nodes, e.g. 300, but not when I use a smaller number of nodes, e.g. 100, 50, 20, etc.
- When the dataset is highly similar across the nodes, i.e. when I make the dummy dataset without specifying the
random_statecorresponding to the local rank, then the script does not segfault.
Let me know if there's any other info I can provide to help debug this. Thanks in advance!
Hi @jameslamb, I saw you had changed the tags - is there any other info I can provide here to help assess what might be the issue? I've tried a couple other things as well, including explicitly setting the number of threads to the number of cores I have on each node that I'm running on.
It would be helpful if you could:
- share some logs from training so it's possible to narrow down where in the training process failures are occurring
- explain why you're doing all this custom setup involving
torchinstead of usinglightgbm.daskor running the LightGBM CLI as an MPI application like https://lightgbm.readthedocs.io/en/v3.3.5/Parallel-Learning-Guide.html#id2 - mention ~what version of LightGBM you're using and~ exactly how you built + installed
lightgbm
Thanks! I can't upload logs due to company policy, but I will try to summarize what's in the logs. I also found that the script works when I change the tree_learner to voting from data and when I add num_threads explicitly to be the core count on each node (I have 1 worker per node, and 300 total nodes).
Environment has been set up. # (logged 300 times)
Constructing the dataset of 3514383600 examples...
[LightGBM] [Info] Connected to rank 0 # (logged 298 times)
[LightGBM] [Info] Connected to rank 1 # (logged 298 times)
...
[LightGBM] [Info] Connected to rank 299 # (logged 298 times)
[LightGBM] [Warning] num_threads is set=28, n_jobs=-1 will be ignored. Current value: num_threads=28 # (logged 299 times)
[LightGBM] [Warning] Histogram LRU queue was enabled (histogram_pool_size=16384.000000). # (logged 299 times when this parameter is set)
Will disable this to reduce communication costs # not sure if this is associated with the histogram pool size?
The reason that I'm using this custom setup is because I have a generalization for distributed environments that spans XGBoost, LightGBM, and Torch, and I have a different CLI I'm using for submitting training jobs that are potentially distributed, so I don't have a good way of using the CLI.
We built LightGBM using the default settings, so I think MPI is not supported (which would mean it's using socket (https://github.com/microsoft/LightGBM/blob/master/src/network/linkers_socket.cpp) for distributed, right?).
@dthiagarajan It seems there are some problems when constructing Dataset distributedly. Can you provide the number of columns (features) of the used dataset?
Yep, I'm using 960 features.
I also found that the script works when I change the tree_learner to voting from data and when I add num_threads explicitly to be the core count on each node (I have 1 worker per node, and 300 total nodes).
What happens if only add num_threads explicitly?
If I specify num_threads explicitly and use tree_learner = 'data', then I see same segfault error mentioned above.
Hi i am facing a similar segmentation issue with 250 features, has there been a resolution here?