dataloader doesn't load data while gpu is training
System Info
Copy-and-paste the text below in your GitHub issue
- `Accelerate` version: 1.0.0
- Platform: Linux-6.10.11-amd64-x86_64-with-glibc2.40
- `accelerate` bash location: /disk/zdata0/home/xuyifan/anaconda3/envs/llava-transformers/bin/accelerate
- Python version: 3.11.9
- Numpy version: 1.26.4
- PyTorch version (GPU?): 2.4.0 (True)
- PyTorch XPU available: False
- PyTorch NPU available: False
- PyTorch MLU available: False
- PyTorch MUSA available: False
- System RAM: 503.67 GB
- GPU type: NVIDIA RTX A6000
- `Accelerate` default config:
- compute_environment: LOCAL_MACHINE
- distributed_type: DEEPSPEED
- mixed_precision: bf16
- use_cpu: False
- debug: False
- num_processes: 8
- machine_rank: 0
- num_machines: 1
- rdzv_backend: static
- same_network: True
- main_training_function: main
- enable_cpu_affinity: False
- deepspeed_config: {'gradient_accumulation_steps': 1, 'offload_optimizer_device': 'none', 'offload_param_device': 'none', 'zero3_init_flag': False, 'zero3_save_16bit_model': False, 'zero_stage': 3}
- downcast_bf16: no
- tpu_use_cluster: False
- tpu_use_sudo: False
- tpu_env: []
Information
- [ ] The official example scripts
- [X] My own modified scripts
Tasks
- [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported
no_trainerscript in theexamplesfolder of thetransformersrepo (such asrun_no_trainer_glue.py) - [X] My own task or dataset (give details below)
Reproduction
I use a pytroch dataloader when I use trainer from transformers. Since accelerate is the dataloader backend of trainer, I think the problem is caused by accelerate.
I use a simple training script to distill CLIP model. Part of my code:
def train(
# model/data params
teacher_type: str = "",
student_type: str = "",
student_model: str = "checkpoints/clip-vit-base-patch16",
from_pretrained: bool = True,
data_config: str = "data.config",
train_dataset: str = "msrvtt_train_single",
num_frames: int = 8,
mask_ratio: float = 0.8,
output_dir: str = "./model",
# training hyperparams
batch_size: int = 256,
micro_batch_size: int = 64,
num_epochs: int = 1,
learning_rate: float = 5e-4,
warmup_ratio: float = 0.5,
cutoff_len: int = 32,
# llm hyperparams
run_name: str = None,
save_steps: int = 100,
seed: int = 42,
deepspeed: str = None,
logging_steps: int = 10,
grad_checkpoint: bool = False,
# system_prompt: str = None,
bf16: bool = False,
# to avoid fire error
local_rank: int = 0,
):
# save args
args = locals()
if int(os.environ.get("LOCAL_RANK") or 0) == 0:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(os.path.join(output_dir, "args.json"), "w") as f:
json.dump(args, f, indent=2)
# set NCCL_DEBUG
gradient_accumulation_steps = batch_size // micro_batch_size
device_map = "cuda"
world_size = int(os.environ.get("WORLD_SIZE", 1))
ddp = world_size != 1
gradient_accumulation_steps = gradient_accumulation_steps // world_size
# # if ddp and False:
# if ddp:
# device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)}
# gradient_accumulation_steps = gradient_accumulation_steps // world_size
# torch.distributed.init_process_group("nccl")
# rank, world_size = torch.distributed.get_rank(), torch.distributed.get_world_size()
# device_id = rank % torch.cuda.device_count()
# device = torch.device(device_id)
# torch.cuda.set_device(device)
set_seed(seed)
data_config = load_dataset_config(data_config, train_dataset)
# initialize tokenizer
tokenizer = CLIPTokenizer.from_pretrained(student_model)
# initialize model
feat = torch.load(data_config['feat_paths'][teacher_type], map_location='cpu')
teacher_text_dim = feat[0]['text_embeds'].shape[-1]
teacher_vision_dim = feat[0]['video_embeds'].shape[-1]
config = CLIPConfigForDistill.from_pretrained(student_model)
config.update({
"teacher_text_dim": teacher_text_dim,
"teacher_vision_dim": teacher_vision_dim,
"student_text_dim": config.text_config.projection_dim,
"student_image_dim": config.vision_config.projection_dim,
"max_token_len": cutoff_len,
})
del feat
model = CLIPModelForDistill(config)
print(model.num_parameters(only_trainable=True))
if grad_checkpoint:
model.enable_input_require_grads()
# initialize dataset
train_data = DatasetForOfflineDistill(
anno_path=data_config['anno_path'],
data_root=data_config['data_root'],
feat_path=data_config['feat_paths'][teacher_type],
tokenize=False,
num_frames=num_frames,
)
def custom_collate_fn(batch):
# batch is a list of dicts
# now = datetime.now()
# dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
# print(f'[{dt_string} {os.getpid()}] Rank {rank} is collating')
collated_batch = {}
for key in batch[0].keys():
collated_batch[key] = [b[key] for b in batch]
# collated_batch['video'] is a list of [num_frames, 3, 224, 224]
# collated_batch['caption'] is a list of strings
tokenized_caption = tokenizer(collated_batch['caption'], padding=True, return_tensors="pt")
collated_batch['input_ids'] = tokenized_caption['input_ids']
collated_batch['attention_mask'] = tokenized_caption['attention_mask']
collated_batch['pixel_values'] = torch.stack(collated_batch['video'])
collated_batch['vision_embeds'] = torch.stack(collated_batch['vision_embeds'])
collated_batch['text_embeds'] = torch.stack(collated_batch['text_embeds'])
return collated_batch
trainer = TrainerForDistill(
model=model,
train_dataset=train_data,
mask_ratio=mask_ratio,
args=transformers.TrainingArguments(
per_device_train_batch_size=micro_batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
warmup_ratio=warmup_ratio,
num_train_epochs=num_epochs,
learning_rate=learning_rate,
fp16=True if not bf16 else False,
bf16=bf16,
logging_steps=logging_steps,
save_strategy="steps",
eval_steps=None,
save_steps=save_steps,
output_dir=output_dir,
save_total_limit=1,
load_best_model_at_end=False,
ddp_find_unused_parameters=True if ddp else None,
run_name=run_name,
report_to=None,
deepspeed=deepspeed,
gradient_checkpointing=grad_checkpoint,
remove_unused_columns=False,
dataloader_num_workers=32,
dataloader_pin_memory=True,
dataloader_prefetch_factor=10,
dataloader_persistent_workers=True,
),
data_collator=custom_collate_fn,
)
if torch.__version__ >= "2" and sys.platform != "win32":
model = torch.compile(model)
trainer.train()
model.save_pretrained(output_dir)
The code of dataloader:
import torch
import os
import json
from utils.video import read_frames_decord
from torchvision.transforms.v2 import Compose, Resize, CenterCrop, RandomResizedCrop, RandomHorizontalFlip, ToTensor, Normalize
from PIL import Image
from datetime import datetime
class DatasetForOfflineDistill(torch.utils.data.Dataset):
def __init__(
self,
anno_path: str | os.PathLike,
data_root: str | os.PathLike,
feat_path: str | os.PathLike,
tokenizer: torch.nn.Module | None = None,
tokenize: bool = False,
num_frames: int = 8,
test: bool = False
):
with open(anno_path) as f:
self.anno = json.load(f)
self.data_root = data_root
# keys of each item: idx, text_embeds, video_embeds
self.feat = torch.load(feat_path, weights_only=True)
self.num_frames = num_frames
self.transforms = self.build_transforms(test)
self.tokenizer = tokenizer
self.tokenize = tokenize
def build_transforms(self, test: bool):
image_mean = [0.48145466, 0.4578275, 0.40821073]
image_std = [0.26862954, 0.26130258, 0.27577711]
size = 224
normalize = (
Normalize(mean=image_mean, std=image_std)
)
train_transforms = Compose([
RandomResizedCrop(size),
RandomHorizontalFlip(),
ToTensor(),
normalize,
])
val_transforms = Compose([
Resize(size),
CenterCrop(size),
ToTensor(),
normalize,
])
if test:
return val_transforms
return train_transforms
def __len__(self):
return len(self.anno)
def __getitem__(self, idx):
rank = int(os.environ.get("LOCAL_RANK") or 0)
now = datetime.now()
dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
print(f'[{dt_string} {os.getpid()}] Rank {rank} is loading', idx)
item = self.feat[idx]
anno_idx = item['idx']
# [teacher_dim] -> [1, teacher_dim]
text_embeds = item['text_embeds']
video_embeds = item['video_embeds']
caption = self.anno[anno_idx]['caption']
if self.tokenizer is not None and self.tokenize:
tokenized_caption = self.tokenizer(caption)
caption = {
'input_ids': tokenized_caption['input_ids'],
'attention_mask': tokenized_caption['attention_mask'],
}
video_path = os.path.join(self.data_root, self.anno[anno_idx]['video'])
# video = read_frames_decord(video_path, num_frames=self.num_frames).numpy()
# frames = [self.transforms(Image.fromarray(frame)) for frame in video]
video = read_frames_decord(video_path, num_frames=self.num_frames).permute(0, 3, 1, 2).float() / 255.0
frames = self.transforms(video)
return {
'caption': caption,
'video': frames,
'text_embeds': text_embeds,
'vision_embeds': video_embeds
}
The dataloader will load data every 1/100 of total steps. If I train for 4500 steps. the dataloader will fetch enough data. Then the dataloader stops fetching and GPUs start to training. After 45 steps, the GPUs hung and the dataloader starts to fetch data again. The GPU usage is very low due to this problem. And I think it is a bug (or maybe a designed feature?).
Expected behavior
The dataloader continues to fetch data when the gpus are running so that gpus will never stop to wait for data.
i think the GPU blocks the python global interpreter lock (GIL).
This issue has been automatically marked as stale because it has not had recent activity. If you think this still needs to be addressed please comment on this thread.
Please note that issues that do not follow the contributing guidelines are likely to be ignored.
it is not stale
I apologize for my late reply. 😢 I am busy with my project these days. I finally figured out today that it was because of my very small dataset. The dataloader loaded everything into the memory so the dataloader stopped loading during the training. It was not easy for me to locate this reason until I used a larger dataset by chance. Thanks for you reply. @bghira