accelerate icon indicating copy to clipboard operation
accelerate copied to clipboard

dataloader doesn't load data while gpu is training

Open geekifan opened this issue 1 year ago • 1 comments

System Info

Copy-and-paste the text below in your GitHub issue

- `Accelerate` version: 1.0.0
- Platform: Linux-6.10.11-amd64-x86_64-with-glibc2.40
- `accelerate` bash location: /disk/zdata0/home/xuyifan/anaconda3/envs/llava-transformers/bin/accelerate
- Python version: 3.11.9
- Numpy version: 1.26.4
- PyTorch version (GPU?): 2.4.0 (True)
- PyTorch XPU available: False
- PyTorch NPU available: False
- PyTorch MLU available: False
- PyTorch MUSA available: False
- System RAM: 503.67 GB
- GPU type: NVIDIA RTX A6000
- `Accelerate` default config:
        - compute_environment: LOCAL_MACHINE
        - distributed_type: DEEPSPEED
        - mixed_precision: bf16
        - use_cpu: False
        - debug: False
        - num_processes: 8
        - machine_rank: 0
        - num_machines: 1
        - rdzv_backend: static
        - same_network: True
        - main_training_function: main
        - enable_cpu_affinity: False
        - deepspeed_config: {'gradient_accumulation_steps': 1, 'offload_optimizer_device': 'none', 'offload_param_device': 'none', 'zero3_init_flag': False, 'zero3_save_16bit_model': False, 'zero_stage': 3}
        - downcast_bf16: no
        - tpu_use_cluster: False
        - tpu_use_sudo: False
        - tpu_env: []

Information

  • [ ] The official example scripts
  • [X] My own modified scripts

Tasks

  • [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported no_trainer script in the examples folder of the transformers repo (such as run_no_trainer_glue.py)
  • [X] My own task or dataset (give details below)

Reproduction

I use a pytroch dataloader when I use trainer from transformers. Since accelerate is the dataloader backend of trainer, I think the problem is caused by accelerate.

I use a simple training script to distill CLIP model. Part of my code:

def train(
    # model/data params
    teacher_type: str = "",
    student_type: str = "",
    student_model: str = "checkpoints/clip-vit-base-patch16",
    from_pretrained: bool = True,
    data_config: str = "data.config",
    train_dataset: str = "msrvtt_train_single",
    num_frames: int = 8,
    mask_ratio: float = 0.8,
    output_dir: str = "./model",
    # training hyperparams
    batch_size: int = 256,
    micro_batch_size: int = 64,
    num_epochs: int = 1,
    learning_rate: float = 5e-4,
    warmup_ratio: float = 0.5,
    cutoff_len: int = 32,
    # llm hyperparams
    run_name: str = None,
    save_steps: int = 100,
    seed: int = 42,
    deepspeed: str = None,
    logging_steps: int = 10,
    grad_checkpoint: bool = False,
    # system_prompt: str = None,
    bf16: bool = False,
    # to avoid fire error
    local_rank: int = 0,
):
    # save args
    args = locals()
    if int(os.environ.get("LOCAL_RANK") or 0) == 0:
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        with open(os.path.join(output_dir, "args.json"), "w") as f:
            json.dump(args, f, indent=2)

    # set NCCL_DEBUG

    gradient_accumulation_steps = batch_size // micro_batch_size

    device_map = "cuda"
    world_size = int(os.environ.get("WORLD_SIZE", 1))
    ddp = world_size != 1
    gradient_accumulation_steps = gradient_accumulation_steps // world_size
    # # if ddp and False:
    # if ddp:
    #     device_map = {"": int(os.environ.get("LOCAL_RANK") or 0)}
    #     gradient_accumulation_steps = gradient_accumulation_steps // world_size
    #     torch.distributed.init_process_group("nccl")
    #     rank, world_size = torch.distributed.get_rank(), torch.distributed.get_world_size()
    #     device_id = rank % torch.cuda.device_count()
    #     device = torch.device(device_id)
    #     torch.cuda.set_device(device)

    set_seed(seed)

    data_config = load_dataset_config(data_config, train_dataset)

    # initialize tokenizer
    tokenizer = CLIPTokenizer.from_pretrained(student_model)

    # initialize model
    feat = torch.load(data_config['feat_paths'][teacher_type], map_location='cpu')
    teacher_text_dim = feat[0]['text_embeds'].shape[-1]
    teacher_vision_dim = feat[0]['video_embeds'].shape[-1]
    config = CLIPConfigForDistill.from_pretrained(student_model)
    config.update({
        "teacher_text_dim": teacher_text_dim, 
        "teacher_vision_dim": teacher_vision_dim,
        "student_text_dim": config.text_config.projection_dim, 
        "student_image_dim": config.vision_config.projection_dim,
        "max_token_len": cutoff_len,
    })
    del feat

    model = CLIPModelForDistill(config)
    print(model.num_parameters(only_trainable=True))

    if grad_checkpoint:
        model.enable_input_require_grads()
    
    # initialize dataset
    train_data = DatasetForOfflineDistill(
        anno_path=data_config['anno_path'],
        data_root=data_config['data_root'],
        feat_path=data_config['feat_paths'][teacher_type],
        tokenize=False,
        num_frames=num_frames,
    )

    def custom_collate_fn(batch):
        # batch is a list of dicts
        # now = datetime.now()
        # dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
        # print(f'[{dt_string} {os.getpid()}] Rank {rank} is collating')
        collated_batch = {}
        for key in batch[0].keys():
            collated_batch[key] = [b[key] for b in batch]
        # collated_batch['video'] is a list of [num_frames, 3, 224, 224]
        # collated_batch['caption'] is a list of strings
        tokenized_caption = tokenizer(collated_batch['caption'], padding=True, return_tensors="pt")
        collated_batch['input_ids'] = tokenized_caption['input_ids']
        collated_batch['attention_mask'] = tokenized_caption['attention_mask']
        collated_batch['pixel_values'] = torch.stack(collated_batch['video'])
        collated_batch['vision_embeds'] = torch.stack(collated_batch['vision_embeds'])
        collated_batch['text_embeds'] = torch.stack(collated_batch['text_embeds'])
        return collated_batch

    trainer = TrainerForDistill(
        model=model,
        train_dataset=train_data,
        mask_ratio=mask_ratio,
        args=transformers.TrainingArguments(
            per_device_train_batch_size=micro_batch_size,
            gradient_accumulation_steps=gradient_accumulation_steps,
            warmup_ratio=warmup_ratio,
            num_train_epochs=num_epochs,
            learning_rate=learning_rate,
            fp16=True if not bf16 else False,
            bf16=bf16,
            logging_steps=logging_steps,
            save_strategy="steps",
            eval_steps=None,
            save_steps=save_steps,
            output_dir=output_dir,
            save_total_limit=1,
            load_best_model_at_end=False,
            ddp_find_unused_parameters=True if ddp else None,
            run_name=run_name,
            report_to=None,
            deepspeed=deepspeed,
            gradient_checkpointing=grad_checkpoint,
            remove_unused_columns=False,
            dataloader_num_workers=32,
            dataloader_pin_memory=True,
            dataloader_prefetch_factor=10,
            dataloader_persistent_workers=True,
        ),
        data_collator=custom_collate_fn,
    )

    if torch.__version__ >= "2" and sys.platform != "win32":
        model = torch.compile(model)

    trainer.train()

    model.save_pretrained(output_dir)

The code of dataloader:

import torch
import os
import json
from utils.video import read_frames_decord
from torchvision.transforms.v2 import Compose, Resize, CenterCrop, RandomResizedCrop, RandomHorizontalFlip, ToTensor, Normalize
from PIL import Image
from datetime import datetime

class DatasetForOfflineDistill(torch.utils.data.Dataset):
    def __init__(
            self, 
            anno_path: str | os.PathLike, 
            data_root: str | os.PathLike,
            feat_path: str | os.PathLike,
            tokenizer: torch.nn.Module | None = None,
            tokenize: bool = False,
            num_frames: int = 8,
            test: bool = False
        ):
        with open(anno_path) as f:
            self.anno = json.load(f)
        self.data_root = data_root
        # keys of each item: idx, text_embeds, video_embeds
        self.feat = torch.load(feat_path, weights_only=True)
        self.num_frames = num_frames
        self.transforms = self.build_transforms(test)
        self.tokenizer = tokenizer
        self.tokenize = tokenize
    
    def build_transforms(self, test: bool):
        image_mean =  [0.48145466, 0.4578275, 0.40821073]
        image_std = [0.26862954, 0.26130258, 0.27577711]
        size = 224
        normalize = (
            Normalize(mean=image_mean, std=image_std)
        )
        train_transforms = Compose([
            RandomResizedCrop(size),
            RandomHorizontalFlip(),
            ToTensor(),
            normalize,
        ])
        val_transforms = Compose([
            Resize(size),
            CenterCrop(size),
            ToTensor(),
            normalize,
        ])
        if test:
            return val_transforms
        return train_transforms
    
    def __len__(self):
        return len(self.anno)
    
    def __getitem__(self, idx):
        rank = int(os.environ.get("LOCAL_RANK") or 0)
        now = datetime.now()
        dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
        print(f'[{dt_string} {os.getpid()}] Rank {rank} is loading', idx)
        item = self.feat[idx]
        anno_idx = item['idx']
        # [teacher_dim] -> [1, teacher_dim]
        text_embeds = item['text_embeds']
        video_embeds = item['video_embeds']
        caption = self.anno[anno_idx]['caption']
        if self.tokenizer is not None and self.tokenize:
            tokenized_caption = self.tokenizer(caption)
            caption = {
                'input_ids': tokenized_caption['input_ids'],
                'attention_mask': tokenized_caption['attention_mask'],
            }
        video_path = os.path.join(self.data_root, self.anno[anno_idx]['video'])
        # video = read_frames_decord(video_path, num_frames=self.num_frames).numpy()
        # frames = [self.transforms(Image.fromarray(frame)) for frame in video]
        video = read_frames_decord(video_path, num_frames=self.num_frames).permute(0, 3, 1, 2).float() / 255.0
        frames = self.transforms(video)
        return {
            'caption': caption, 
            'video': frames, 
            'text_embeds': text_embeds, 
            'vision_embeds': video_embeds
        }

The dataloader will load data every 1/100 of total steps. If I train for 4500 steps. the dataloader will fetch enough data. Then the dataloader stops fetching and GPUs start to training. After 45 steps, the GPUs hung and the dataloader starts to fetch data again. The GPU usage is very low due to this problem. And I think it is a bug (or maybe a designed feature?).

Expected behavior

The dataloader continues to fetch data when the gpus are running so that gpus will never stop to wait for data.

geekifan avatar Oct 14 '24 11:10 geekifan

i think the GPU blocks the python global interpreter lock (GIL).

bghira avatar Oct 18 '24 17:10 bghira

This issue has been automatically marked as stale because it has not had recent activity. If you think this still needs to be addressed please comment on this thread.

Please note that issues that do not follow the contributing guidelines are likely to be ignored.

github-actions[bot] avatar Nov 13 '24 15:11 github-actions[bot]

it is not stale

bghira avatar Nov 13 '24 15:11 bghira

I apologize for my late reply. 😢 I am busy with my project these days. I finally figured out today that it was because of my very small dataset. The dataloader loaded everything into the memory so the dataloader stopped loading during the training. It was not easy for me to locate this reason until I used a larger dataset by chance. Thanks for you reply. @bghira

geekifan avatar Nov 13 '24 15:11 geekifan