speechbrain icon indicating copy to clipboard operation
speechbrain copied to clipboard

Encountered a TypeError: expected string or bytes-like object when training the model using dual-GPU model sharding.

Open wenyuc55 opened this issue 1 year ago • 4 comments

Describe the bug

Originally, I used a single 4090 GPU to train the 4-speaker LibriMix model and had prepared my own mixed datasets. I noticed that training results with wav8k/min weren’t ideal, so I wanted to use the wav8k/max dataset instead. However, I ran into a CUDA out-of-memory issue. As a result, I decided to use two 4090 GPUs for training. When I ran the command torchrun --nproc_per_node=2 train.py [hparams/sepformer-libri4mix.yaml,I found that using DDP only speeds up training and does not alleviate memory pressure. Therefore, I tried using model sharding with two GPUs, but I kept encountering the error TypeError: expected string or bytes-like object. This is my CSV file.And train.py is basically the same as the original file. libri4mix_dev.csv

(new0913-env) wenyu@wenyu:~/桌面/speechbrain-develop0912/speechbrain-develop/recipes/LibriMix/separation$ torchrun --nproc_per_node=2 train5.py hparams/sepformer-libri5mix.yaml W1206 15:58:26.255389 140175740086080 torch/distributed/run.py:779] W1206 15:58:26.255389 140175740086080 torch/distributed/run.py:779] ***************************************** W1206 15:58:26.255389 140175740086080 torch/distributed/run.py:779] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. W1206 15:58:26.255389 140175740086080 torch/distributed/run.py:779] ***************************************** speechbrain.core - Beginning experiment! speechbrain.core - Experiment folder: results/sepformer-libri5mix/1234 [rank1]: Traceback (most recent call last): [rank1]: File "train5.py", line 689, in [rank1]: train_data, valid_data, test_data = dataio_prep(hparams) [rank1]: File "train5.py", line 476, in dataio_prep [rank1]: train_data = sb.dataio.dataset.DynamicItemDataset.from_csv( [rank1]: File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/speechbrain/dataio/dataset.py", line 408, in from_csv [rank1]: data = load_data_csv(csv_path, replacements) [rank1]: File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/speechbrain/dataio/dataio.py", line 148, in load_data_csv [rank1]: row[key] = variable_finder.sub( [rank1]: TypeError: expected string or bytes-like object W1206 15:58:29.073572 140175740086080 torch/distributed/elastic/multiprocessing/api.py:858] Sending process 4984 closing signal SIGTERM E1206 15:58:29.137494 140175740086080 torch/distributed/elastic/multiprocessing/api.py:833] failed (exitcode: 1) local_rank: 1 (pid: 4985) of binary: /home/wenyu/桌面/speechbrain-develop0912/new0913-env/bin/python3 Traceback (most recent call last): File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/bin/torchrun", line 8, in sys.exit(main()) File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/torch/distributed/elastic/multiprocessing/errors/init.py", line 348, in wrapper return f(*args, **kwargs) File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/torch/distributed/run.py", line 901, in main run(args) File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/torch/distributed/run.py", line 892, in run elastic_launch( File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/torch/distributed/launcher/api.py", line 133, in call return launch_agent(self._config, self._entrypoint, list(args)) File "/home/wenyu/桌面/speechbrain-develop0912/new0913-env/lib/python3.8/site-packages/torch/distributed/launcher/api.py", line 264, in launch_agent raise ChildFailedError( torch.distributed.elastic.multiprocessing.errors.ChildFailedError:

train5.py FAILED

Failures: <NO_OTHER_FAILURES>

Root Cause (first observed failure): [0]: time : 2024-12-06_15:58:29 host : wenyu rank : 1 (local_rank: 1) exitcode : 1 (pid: 4985) error_file: <N/A> traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html

Expected behaviour

I hope to use model sharding with dual GPUs to train the Libri4Mix model.

To Reproduce

#!/usr/bin/env/python3 """Recipe for training a neural speech separation system on Libri2/3Mix datasets. The system employs an encoder, a decoder, and a masking network.

To run this recipe, do the following:

python train.py hparams/sepformer-libri2mix.yaml python train.py hparams/sepformer-libri3mix.yaml

The experiment file is flexible enough to support different neural networks. By properly changing the parameter files, you can try different architectures. The script supports both libri2mix and libri3mix.

Authors

  • Cem Subakan 2020
  • Mirco Ravanelli 2020
  • Samuele Cornell 2020
  • Mirko Bronzi 2020
  • Jianyuan Zhong 2020 """

import csv import logging import os import sys

import numpy as np import torch import torch.nn.functional as F import torchaudio from hyperpyyaml import load_hyperpyyaml from tqdm import tqdm

import speechbrain as sb import speechbrain.nnet.schedulers as schedulers from speechbrain.core import AMPConfig from speechbrain.utils.distributed import run_on_main

logger = logging.getLogger("speechbrain_logger") logger.setLevel(logging.INFO)

Define training procedure

class Separation(sb.Brain): def init(self, modules, opt_class, hparams, run_opts, checkpointer): super().init(modules, opt_class, hparams, run_opts, checkpointer)

    # 將模型模組分配到不同的 GPU
    self.hparams.Encoder = self.hparams.Encoder.to('cuda:0')  # 放在 GPU0
    self.hparams.MaskNet = self.hparams.MaskNet.to('cuda:1')  # 放在 GPU1
    self.hparams.Decoder = self.hparams.Decoder.to('cuda:0')  # 再次回到 GPU0

def compute_forward(self, mix, targets, stage, noise=None):
    """Forward computations from the mixture to the separated signals."""

    # Unpack lists and put tensors in the right device
    mix, mix_lens = mix
    mix, mix_lens = mix.to('cuda:0'), mix_lens.to('cuda:0')

    # Convert targets to tensor
    targets = torch.cat(
        [targets[i][0].unsqueeze(-1) for i in range(self.hparams.num_spks)],
        dim=-1,
    ).to('cuda:0')

    # Add speech distortions
    if stage == sb.Stage.TRAIN:
        with torch.no_grad():
            if self.hparams.use_speedperturb or self.hparams.use_rand_shift:
                mix, targets = self.add_speed_perturb(targets, mix_lens)

                mix = targets.sum(-1)

                if self.hparams.use_wham_noise:
                    noise = noise.to('cuda:0')
                    len_noise = noise.shape[1]
                    len_mix = mix.shape[1]
                    min_len = min(len_noise, len_mix)

                    # add the noise
                    mix = mix[:, :min_len] + noise[:, :min_len]

                    # fix the length of targets also
                    targets = targets[:, :min_len, :]

            if self.hparams.use_wavedrop:
                mix = self.hparams.drop_chunk(mix, mix_lens)
                mix = self.hparams.drop_freq(mix)

            if self.hparams.limit_training_signal_len:
                mix, targets = self.cut_signals(mix, targets)

    # Encoder 計算
    mix_w = self.hparams.Encoder(mix)

    # 將中間結果移動到 MaskNet 所在的 GPU
    mix_w = mix_w.to('cuda:1')

    # MaskNet 計算
    est_mask = self.hparams.MaskNet(mix_w)

    # 將 MaskNet 的結果移回 Decoder 所在的 GPU
    mix_w = mix_w.to('cuda:0')
    est_mask = est_mask.to('cuda:0')

    # Decoding(在 cuda:0 上執行)
    mix_w = torch.stack([mix_w] * self.hparams.num_spks)
    sep_h = mix_w * est_mask
    est_source = torch.cat(
        [
            self.hparams.Decoder(sep_h[i]).unsqueeze(-1)
            for i in range(self.hparams.num_spks)
        ],
        dim=-1,
    )

    # T changed after conv1d in encoder, fix it here
    T_origin = mix.size(1)
    T_est = est_source.size(1)
    if T_origin > T_est:
        est_source = F.pad(est_source, (0, 0, 0, T_origin - T_est))
    else:
        est_source = est_source[:, :T_origin, :]

    return est_source, targets

def compute_objectives(self, predictions, targets):
    """Computes the si-snr loss"""
    return self.hparams.loss(targets, predictions)

def fit_batch(self, batch):
    """Trains one batch"""
    amp = AMPConfig.from_name(self.precision)
    should_step = (self.step % self.grad_accumulation_factor) == 0

    # Unpacking batch list
    mixture = batch.mix_sig
    targets = [batch.s1_sig, batch.s2_sig, batch.s3_sig]
    if self.hparams.use_wham_noise:
        noise = batch.noise_sig[0]
    else:
        noise = None

    if self.hparams.num_spks == 4:
        targets.append(batch.s4_sig)

    with self.no_sync(not should_step):
        if self.use_amp:
            with torch.autocast(
                dtype=amp.dtype,
                device_type=torch.device(self.device).type,
            ):
                predictions, targets = self.compute_forward(
                    mixture, targets, sb.Stage.TRAIN, noise
                )
                loss = self.compute_objectives(predictions, targets)

                # hard threshold the easy dataitems
                if self.hparams.threshold_byloss:
                    th = self.hparams.threshold
                    loss = loss[loss > th]
                    if loss.nelement() > 0:
                        loss = loss.mean()
                else:
                    loss = loss.mean()

            if (
                loss.nelement() > 0 and loss < self.hparams.loss_upper_lim
            ):  # the fix for computational problems
                self.scaler.scale(loss).backward()
                if self.hparams.clip_grad_norm >= 0:
                    self.scaler.unscale_(self.optimizer)
                    torch.nn.utils.clip_grad_norm_(
                        self.modules.parameters(),
                        self.hparams.clip_grad_norm,
                    )
                self.scaler.step(self.optimizer)
                self.scaler.update()
            else:
                self.nonfinite_count += 1
                logger.info(
                    "infinite loss or empty loss! it happened {} times so far - skipping this batch".format(
                        self.nonfinite_count
                    )
                )
                loss.data = torch.tensor(0.0).to(self.device)
        else:
            predictions, targets = self.compute_forward(
                mixture, targets, sb.Stage.TRAIN, noise
            )
            loss = self.compute_objectives(predictions, targets)

            if self.hparams.threshold_byloss:
                th = self.hparams.threshold
                loss = loss[loss > th]
                if loss.nelement() > 0:
                    loss = loss.mean()
            else:
                loss = loss.mean()

            if (
                loss.nelement() > 0 and loss < self.hparams.loss_upper_lim
            ):  # the fix for computational problems
                loss.backward()
                if self.hparams.clip_grad_norm >= 0:
                    torch.nn.utils.clip_grad_norm_(
                        self.modules.parameters(),
                        self.hparams.clip_grad_norm,
                    )
                self.optimizer.step()
            else:
                self.nonfinite_count += 1
                logger.info(
                    "infinite loss or empty loss! it happened {} times so far - skipping this batch".format(
                        self.nonfinite_count
                    )
                )
                loss.data = torch.tensor(0.0).to(self.device)
    self.optimizer.zero_grad()

    return loss.detach().cpu()

def evaluate_batch(self, batch, stage):
    """Computations needed for validation/test batches"""
    snt_id = batch.id
    mixture = batch.mix_sig
    targets = [batch.s1_sig, batch.s2_sig, batch.s3_sig]
    if self.hparams.num_spks == 4:
        targets.append(batch.s4_sig)

    with torch.no_grad():
        predictions, targets = self.compute_forward(mixture, targets, stage)
        loss = self.compute_objectives(predictions, targets)

    # Manage audio file saving
    if stage == sb.Stage.TEST and self.hparams.save_audio:
        if hasattr(self.hparams, "n_audio_to_save"):
            if self.hparams.n_audio_to_save > 0:
                self.save_audio(snt_id[0], mixture, targets, predictions)
                self.hparams.n_audio_to_save += -1
        else:
            self.save_audio(snt_id[0], mixture, targets, predictions)

    return loss.mean().detach()

def on_stage_end(self, stage, stage_loss, epoch):
    """Gets called at the end of a epoch."""
    # Compute/store important stats
    stage_stats = {"si-snr": stage_loss}
    if stage == sb.Stage.TRAIN:
        self.train_stats = stage_stats

    # Perform end-of-iteration things, like annealing, logging, etc.
    if stage == sb.Stage.VALID:
        # Learning rate annealing
        if isinstance(
            self.hparams.lr_scheduler, schedulers.ReduceLROnPlateau
        ):
            current_lr, next_lr = self.hparams.lr_scheduler(
                [self.optimizer], epoch, stage_loss
            )
            schedulers.update_learning_rate(self.optimizer, next_lr)
        else:
            # if we do not use the reducelronplateau, we do not change the lr
            current_lr = self.hparams.optimizer.optim.param_groups[0]["lr"]

        self.hparams.train_logger.log_stats(
            stats_meta={"epoch": epoch, "lr": current_lr},
            train_stats=self.train_stats,
            valid_stats=stage_stats,
        )
        self.checkpointer.save_and_keep_only(
            meta={"si-snr": stage_stats["si-snr"]},
            min_keys=["si-snr"],
        )
    elif stage == sb.Stage.TEST:
        self.hparams.train_logger.log_stats(
            stats_meta={"Epoch loaded": self.hparams.epoch_counter.current},
            test_stats=stage_stats,
        )

def add_speed_perturb(self, targets, targ_lens):
    """Adds speed perturbation and random_shift to the input signals"""

    min_len = -1
    recombine = False

    if self.hparams.use_speedperturb:
        # Performing speed change (independently on each source)
        new_targets = []
        recombine = True

        for i in range(targets.shape[-1]):
            new_target = self.hparams.speed_perturb(targets[:, :, i])
            new_targets.append(new_target)
            if i == 0:
                min_len = new_target.shape[-1]
            else:
                if new_target.shape[-1] < min_len:
                    min_len = new_target.shape[-1]

        if self.hparams.use_rand_shift:
            # Performing random_shift (independently on each source)
            recombine = True
            for i in range(targets.shape[-1]):
                rand_shift = torch.randint(
                    self.hparams.min_shift, self.hparams.max_shift, (1,)
                )
                new_targets[i] = new_targets[i].to(self.device)
                new_targets[i] = torch.roll(
                    new_targets[i], shifts=(rand_shift[0],), dims=1
                )

        # Re-combination
        if recombine:
            if self.hparams.use_speedperturb:
                targets = torch.zeros(
                    targets.shape[0],
                    min_len,
                    targets.shape[-1],
                    device=targets.device,
                    dtype=torch.float,
                )
            for i, new_target in enumerate(new_targets):
                targets[:, :, i] = new_targets[i][:, 0:min_len]

    mix = targets.sum(-1)
    return mix, targets

def cut_signals(self, mixture, targets):
    """This function selects a random segment of a given length within the mixture.
    The corresponding targets are selected accordingly"""
    randstart = torch.randint(
        0,
        1 + max(0, mixture.shape[1] - self.hparams.training_signal_len),
        (1,),
    ).item()
    targets = targets[
        :, randstart : randstart + self.hparams.training_signal_len, :
    ]
    mixture = mixture[
        :, randstart : randstart + self.hparams.training_signal_len
    ]
    return mixture, targets

def reset_layer_recursively(self, layer):
    """Reinitializes the parameters of the neural networks"""
    if hasattr(layer, "reset_parameters"):
        layer.reset_parameters()
    for child_layer in layer.modules():
        if layer != child_layer:
            self.reset_layer_recursively(child_layer)

def save_results(self, test_data):
    """This script computes the SDR and SI-SNR metrics and saves
    them into a csv file"""

    # This package is required for SDR computation
    from mir_eval.separation import bss_eval_sources

    # Create folders where to store audio
    save_file = os.path.join(self.hparams.output_folder, "test_results.csv")

    # Variable init
    all_sdrs = []
    all_sdrs_i = []
    all_sisnrs = []
    all_sisnrs_i = []
    csv_columns = ["snt_id", "sdr", "sdr_i", "si-snr", "si-snr_i"]

    test_loader = sb.dataio.dataloader.make_dataloader(
        test_data, **self.hparams.dataloader_opts
    )

    with open(save_file, "w", newline="", encoding="utf-8") as results_csv:
        writer = csv.DictWriter(results_csv, fieldnames=csv_columns)
        writer.writeheader()

        # Loop over all test sentence
        with tqdm(test_loader, dynamic_ncols=True) as t:
            for i, batch in enumerate(t):
                # Apply Separation
                mixture, mix_len = batch.mix_sig
                snt_id = batch.id
                targets = [batch.s1_sig, batch.s2_sig, batch.s3_sig]
                if self.hparams.num_spks == 4:
                    targets.append(batch.s4_sig)

                with torch.no_grad():
                    predictions, targets = self.compute_forward(
                        batch.mix_sig, targets, sb.Stage.TEST
                    )

                # Compute SI-SNR
                sisnr = self.compute_objectives(predictions, targets)

                # Compute SI-SNR improvement
                mixture_signal = torch.stack(
                    [mixture] * self.hparams.num_spks, dim=-1
                )
                mixture_signal = mixture_signal.to(targets.device)
                sisnr_baseline = self.compute_objectives(
                    mixture_signal, targets
                )
                sisnr_i = sisnr - sisnr_baseline

                # Compute SDR
                sdr, _, _, _ = bss_eval_sources(
                    targets[0].t().cpu().numpy(),
                    predictions[0].t().detach().cpu().numpy(),
                )

                sdr_baseline, _, _, _ = bss_eval_sources(
                    targets[0].t().cpu().numpy(),
                    mixture_signal[0].t().detach().cpu().numpy(),
                )

                sdr_i = sdr.mean() - sdr_baseline.mean()

                # Saving on a csv file
                row = {
                    "snt_id": snt_id[0],
                    "sdr": sdr.mean(),
                    "sdr_i": sdr_i,
                    "si-snr": -sisnr.item(),
                    "si-snr_i": -sisnr_i.item(),
                }
                writer.writerow(row)

                # Metric Accumulation
                all_sdrs.append(sdr.mean())
                all_sdrs_i.append(sdr_i.mean())
                all_sisnrs.append(-sisnr.item())
                all_sisnrs_i.append(-sisnr_i.item())

            row = {
                "snt_id": "avg",
                "sdr": np.array(all_sdrs).mean(),
                "sdr_i": np.array(all_sdrs_i).mean(),
                "si-snr": np.array(all_sisnrs).mean(),
                "si-snr_i": np.array(all_sisnrs_i).mean(),
            }
            writer.writerow(row)

    logger.info("Mean SISNR is {}".format(np.array(all_sisnrs).mean()))
    logger.info("Mean SISNRi is {}".format(np.array(all_sisnrs_i).mean()))
    logger.info("Mean SDR is {}".format(np.array(all_sdrs).mean()))
    logger.info("Mean SDRi is {}".format(np.array(all_sdrs_i).mean()))

def save_audio(self, snt_id, mixture, targets, predictions):
    "saves the test audio (mixture, targets, and estimated sources) on disk"

    # Create output folder
    save_path = os.path.join(self.hparams.save_folder, "audio_results")
    if not os.path.exists(save_path):
        os.mkdir(save_path)

    for ns in range(self.hparams.num_spks):
        # Estimated source
        signal = predictions[0, :, ns]
        signal = signal / signal.abs().max()
        save_file = os.path.join(
            save_path, "item{}_source{}hat.wav".format(snt_id, ns + 1)
        )
        torchaudio.save(
            save_file, signal.unsqueeze(0).cpu(), self.hparams.sample_rate
        )

        # Original source
        signal = targets[0, :, ns]
        signal = signal / signal.abs().max()
        save_file = os.path.join(
            save_path, "item{}_source{}.wav".format(snt_id, ns + 1)
        )
        torchaudio.save(
            save_file, signal.unsqueeze(0).cpu(), self.hparams.sample_rate
        )

    # Mixture
    signal = mixture[0][0, :]
    signal = signal / signal.abs().max()
    save_file = os.path.join(save_path, "item{}_mix.wav".format(snt_id))
    torchaudio.save(
        save_file, signal.unsqueeze(0).cpu(), self.hparams.sample_rate
    )

def dataio_prep(hparams): """Creates data processing pipeline"""

# 1. Define datasets
train_data = sb.dataio.dataset.DynamicItemDataset.from_csv(
    csv_path=hparams["train_data"],
    replacements={"data_root": hparams["data_folder"]},
)

valid_data = sb.dataio.dataset.DynamicItemDataset.from_csv(
    csv_path=hparams["valid_data"],
    replacements={"data_root": hparams["data_folder"]},
)

test_data = sb.dataio.dataset.DynamicItemDataset.from_csv(
    csv_path=hparams["test_data"],
    replacements={"data_root": hparams["data_folder"]},
)

datasets = [train_data, valid_data, test_data]

# 驗證數據處理結果
print("開始驗證數據處理結果...")
for i, sample in enumerate(train_data):
    print(f"Sample {i}: {sample}")
    if i >= 10:  # 打印前 10 行即可,避免輸出過多
        break

# 2. Provide audio pipelines

@sb.utils.data_pipeline.takes("mix_wav")
@sb.utils.data_pipeline.provides("mix_sig")
def audio_pipeline_mix(mix_wav):
    mix_sig = sb.dataio.dataio.read_audio(mix_wav)
    return mix_sig

@sb.utils.data_pipeline.takes("s1_wav")
@sb.utils.data_pipeline.provides("s1_sig")
def audio_pipeline_s1(s1_wav):
    s1_sig = sb.dataio.dataio.read_audio(s1_wav)
    return s1_sig

@sb.utils.data_pipeline.takes("s2_wav")
@sb.utils.data_pipeline.provides("s2_sig")
def audio_pipeline_s2(s2_wav):
    s2_sig = sb.dataio.dataio.read_audio(s2_wav)
    return s2_sig

@sb.utils.data_pipeline.takes("s3_wav")
@sb.utils.data_pipeline.provides("s3_sig")
def audio_pipeline_s3(s3_wav):
    s3_sig = sb.dataio.dataio.read_audio(s3_wav)
    return s3_sig
    
if hparams["num_spks"] == 4:

    @sb.utils.data_pipeline.takes("s4_wav")
    @sb.utils.data_pipeline.provides("s4_sig")
    def audio_pipeline_s4(s4_wav):
        s4_sig = sb.dataio.dataio.read_audio(s4_wav)
        return s4_sig

if hparams["use_wham_noise"]:

    @sb.utils.data_pipeline.takes("noise_wav")
    @sb.utils.data_pipeline.provides("noise_sig")
    def audio_pipeline_noise(noise_wav):
        noise_sig = sb.dataio.dataio.read_audio(noise_wav)
        return noise_sig

sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline_mix)
sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline_s1)
sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline_s2)
sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline_s3)
if hparams["num_spks"] == 4:
    sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline_s4)

if hparams["use_wham_noise"]:
    print("Using the WHAM! noise in the data pipeline")
    sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline_noise)

if (hparams["num_spks"] == 2) and hparams["use_wham_noise"]:
    sb.dataio.dataset.set_output_keys(
        datasets, ["id", "mix_sig", "s1_sig", "s2_sig", "noise_sig"]
    )
elif (hparams["num_spks"] == 3) and hparams["use_wham_noise"]:
    sb.dataio.dataset.set_output_keys(
        datasets,
        ["id", "mix_sig", "s1_sig", "s2_sig", "s3_sig", "noise_sig"],
    )
elif (hparams["num_spks"] == 4) and hparams["use_wham_noise"]:
    sb.dataio.dataset.set_output_keys(
        datasets,
        ["id", "mix_sig", "s1_sig", "s2_sig", "s3_sig", "s4_sig", "noise_sig"],
    )
elif (hparams["num_spks"] == 2) and not hparams["use_wham_noise"]:
    sb.dataio.dataset.set_output_keys(
        datasets, ["id", "mix_sig", "s1_sig", "s2_sig"]
    )
elif (hparams["num_spks"] == 3) and not hparams["use_wham_noise"]:
    sb.dataio.dataset.set_output_keys(
        datasets, ["id", "mix_sig", "s1_sig", "s2_sig", "s3_sig"]
    )
else:
    sb.dataio.dataset.set_output_keys(
        datasets, ["id", "mix_sig", "s1_sig", "s2_sig", "s3_sig", "s4_sig"]
    )

return train_data, valid_data, test_data

if name == "main": # Load hyperparameters file with command-line overrides hparams_file, run_opts, overrides = sb.parse_arguments(sys.argv[1:]) with open(hparams_file, encoding="utf-8") as fin: hparams = load_hyperpyyaml(fin, overrides)

# Initialize ddp (useful only for multi-GPU DDP training)
sb.utils.distributed.ddp_init_group(run_opts)

# Create experiment directory
sb.create_experiment_directory(
    experiment_directory=hparams["output_folder"],
    hyperparams_to_save=hparams_file,
    overrides=overrides,
)

# Check if wsj0_tr is set with dynamic mixing
if hparams["dynamic_mixing"] and not os.path.exists(
    hparams["base_folder_dm"]
):
    raise ValueError(
        "Please, specify a valid base_folder_dm folder when using dynamic mixing"
    )

# Update precision to bf16 if the device is CPU and precision is fp16
if run_opts.get("device") == "cpu" and hparams.get("precision") == "fp16":
    hparams["precision"] = "bf16"

# Data preparation
from prepare_data import prepare_librimix

run_on_main(
    prepare_librimix,
    kwargs={
        "datapath": hparams["data_folder"],
        "savepath": hparams["save_folder"],
        "n_spks": hparams["num_spks"],
        "skip_prep": hparams["skip_prep"],
        "librimix_addnoise": hparams["use_wham_noise"],
        "fs": hparams["sample_rate"],
    },
)

# Create dataset objects
if hparams["dynamic_mixing"]:
    from dynamic_mixing import (
        dynamic_mix_data_prep_librimix as dynamic_mix_data_prep,
    )

    # if the base_folder for dm is not processed, preprocess them
    if "processed" not in hparams["base_folder_dm"]:
        # if the processed folder already exists we just use it otherwise we do the preprocessing
        if not os.path.exists(
            os.path.normpath(hparams["base_folder_dm"]) + "_processed"
        ):
            from recipes.LibriMix.meta.preprocess_dynamic_mixing import (
                resample_folder,
            )

            print("Resampling the base folder")
            run_on_main(
                resample_folder,
                kwargs={
                    "input_folder": hparams["base_folder_dm"],
                    "output_folder": os.path.normpath(
                        hparams["base_folder_dm"]
                    )
                    + "_processed",
                    "fs": hparams["sample_rate"],
                    "regex": "**/*.flac",
                },
            )
            # adjust the base_folder_dm path
            hparams["base_folder_dm"] = (
                os.path.normpath(hparams["base_folder_dm"]) + "_processed"
            )
        else:
            print(
                "Using the existing processed folder on the same directory as base_folder_dm"
            )
            hparams["base_folder_dm"] = (
                os.path.normpath(hparams["base_folder_dm"]) + "_processed"
            )

    dm_hparams = {
        "train_data": hparams["train_data"],
        "data_folder": hparams["data_folder"],
        "base_folder_dm": hparams["base_folder_dm"],
        "sample_rate": hparams["sample_rate"],
        "num_spks": hparams["num_spks"],
        "training_signal_len": hparams["training_signal_len"],
        "dataloader_opts": hparams["dataloader_opts"],
    }

    train_data = dynamic_mix_data_prep(dm_hparams)
    _, valid_data, test_data = dataio_prep(hparams)
else:
    train_data, valid_data, test_data = dataio_prep(hparams)

# Load pretrained model if pretrained_separator is present in the yaml
if "pretrained_separator" in hparams:
    run_on_main(hparams["pretrained_separator"].collect_files)
    hparams["pretrained_separator"].load_collected()

# Brain class initialization
separator = Separation(
    modules=hparams["modules"],
    opt_class=hparams["optimizer"],
    hparams=hparams,
    run_opts=run_opts,
    checkpointer=hparams["checkpointer"],
)

# GPU 分配檢查
print(next(separator.hparams.Encoder.parameters()).device)  # 應該是 cuda:0
print(next(separator.hparams.MaskNet.parameters()).device)  # 應該是 cuda:1
print(next(separator.hparams.Decoder.parameters()).device)  # 應該是 cuda:0

# re-initialize the parameters if we don't use a pretrained model
if "pretrained_separator" not in hparams:
    for module in separator.modules.values():
        separator.reset_layer_recursively(module)

# Training
separator.fit(
    separator.hparams.epoch_counter,
    train_data,
    valid_data,
    train_loader_kwargs=hparams["dataloader_opts"],
    valid_loader_kwargs=hparams["dataloader_opts"],
)

# Eval
separator.evaluate(test_data, min_key="si-snr")
separator.save_results(test_data)

Environment Details

1125環境

Relevant Log Output

No response

Additional Context

No response

wenyuc55 avatar Dec 06 '24 13:12 wenyuc55

Hi! Before jumping into GPU sharding (which sounds a bit overkill on a 2GPU setup tbh) i'd encourage you to just reduce the batch size and increase the gradient accumulation ;) It should also slow the training much less.

TParcollet avatar Dec 09 '24 21:12 TParcollet

My batch size is already set to 1. How can I further reduce it?

Additionally, I’ve encountered an issue. When I train using the original LibriMix with multi-GPU training, the first run executes smoothly. However, if I stop the training and rerun the same command to start training again, I encounter the same TypeError: expected string or bytes-like object issue mentioned earlier.

Also, when I try to stop the training using Ctrl+Z or Ctrl+C, it doesn’t work. I have to close the entire terminal and reopen it. After that, without making any changes, rerunning the same command results in the issue.

Below is my YAML file.

################################# #Model: SepFormer for source separation #https://arxiv.org/abs/2010.13154 #Dataset : Libri4Mix #################################

#Basic parameters #Seed needs to be set at top of yaml, before objects with parameters are made

seed: 1234 __set_seed: !apply:torch.manual_seed [!ref ]

#Data params

#e.g. '/yourpath/Libri4Mix/train-clean-360/' #the data folder is needed even if dynamic mixing is applied data_folder: /home/wenyu/桌面/LibriMix-master0912/LibriMix-master/storage_dir/Libri4Mix #media/wy/1ba42eac-2b5d-438f-bf3e-08c619080bac/home/wenyu/桌面/LibriMix-master0912/LibriMix-master/storage_dir/Libri4Mix #/home/wenyu/桌面/LibriMix-master0912/LibriMix-master/storage_dir/Libri4Mix

#This is needed only if dynamic mixing is applied base_folder_dm: /home/wenyu/桌面/LibriMix-master0912/LibriMix-master/storage_dir/LibriSpeech/train-clean-360/ #media/wy/1ba42eac-2b5d-438f-bf3e-08c619080bac/home/wenyu/桌面/LibriMix-master0912/LibriMix-master/storage_dir/LibriSpeech/train-clean-360/ #/home/wenyu/桌面/LibriMix-master0912/LibriMix-master/storage_dir/LibriSpeech/train-clean-360/

experiment_name: sepformer-libri4mix output_folder: !ref results/<experiment_name>/ train_log: !ref <output_folder>/train_log.txt save_folder: !ref <output_folder>/save train_data: !ref <save_folder>/libri4mix_train-360.csv valid_data: !ref <save_folder>/libri4mix_dev.csv test_data: !ref <save_folder>/libri4mix_test.csv skip_prep: False

ckpt_interval_minutes: 60

#Experiment params precision: fp16 # bf16, fp16 or fp32 # Set it to True for mixed precision num_spks: 4 noprogressbar: False save_audio: True # Save estimated sources on disk sample_rate: 8000

####################### Training Parameters #################################### N_epochs: 250 batch_size: 1 lr: 0.00015 clip_grad_norm: 5 loss_upper_lim: 999999 # this is the upper limit for an acceptable loss #if True, the training sequences are cut to a specified length limit_training_signal_len: False #this is the length of sequences if we choose to limit #the signal length of training sequences training_signal_len: 32000000

#Set it to True to dynamically create mixtures at training time dynamic_mixing: False use_wham_noise: False

#Parameters for data augmentation use_wavedrop: False use_speedperturb: True use_rand_shift: False min_shift: -8000 max_shift: 8000

#Speed perturbation speed_changes: [95, 100, 105] # List of speed changes for time-stretching

speed_perturb: !new:speechbrain.augment.time_domain.SpeedPerturb orig_freq: !ref <sample_rate> speeds: !ref <speed_changes>

#Frequency drop: randomly drops a number of frequency bands to zero. drop_freq_low: 0 # Min frequency band dropout probability drop_freq_high: 1 # Max frequency band dropout probability drop_freq_count_low: 1 # Min number of frequency bands to drop drop_freq_count_high: 3 # Max number of frequency bands to drop drop_freq_width: 0.05 # Width of frequency bands to drop

drop_freq: !new:speechbrain.augment.time_domain.DropFreq drop_freq_low: !ref <drop_freq_low> drop_freq_high: !ref <drop_freq_high> drop_freq_count_low: !ref <drop_freq_count_low> drop_freq_count_high: !ref <drop_freq_count_high> drop_freq_width: !ref <drop_freq_width>

#Time drop: randomly drops a number of temporal chunks. drop_chunk_count_low: 1 # Min number of audio chunks to drop drop_chunk_count_high: 5 # Max number of audio chunks to drop drop_chunk_length_low: 1000 # Min length of audio chunks to drop drop_chunk_length_high: 2000 # Max length of audio chunks to drop

drop_chunk: !new:speechbrain.augment.time_domain.DropChunk drop_length_low: !ref <drop_chunk_length_low> drop_length_high: !ref <drop_chunk_length_high> drop_count_low: !ref <drop_chunk_count_low> drop_count_high: !ref <drop_chunk_count_high>

#loss thresholding -- this thresholds the training loss threshold_byloss: True threshold: -30

#Encoder parameters N_encoder_out: 256 out_channels: 256 kernel_size: 16 kernel_stride: 8 d_ffn: 1024

#Dataloader options dataloader_opts: batch_size: !ref <batch_size> num_workers: 3

#Specifying the network Encoder: !new:speechbrain.lobes.models.dual_path.Encoder kernel_size: !ref <kernel_size> out_channels: !ref <N_encoder_out>

SBtfintra: !new:speechbrain.lobes.models.dual_path.SBTransformerBlock num_layers: 8 d_model: !ref <out_channels> nhead: 8 d_ffn: !ref <d_ffn> dropout: 0 use_positional_encoding: True norm_before: True

SBtfinter: !new:speechbrain.lobes.models.dual_path.SBTransformerBlock num_layers: 8 d_model: !ref <out_channels> nhead: 8 d_ffn: !ref <d_ffn> dropout: 0 use_positional_encoding: True norm_before: True

MaskNet: !new:speechbrain.lobes.models.dual_path.Dual_Path_Model num_spks: !ref <num_spks> in_channels: !ref <N_encoder_out> out_channels: !ref <out_channels> num_layers: 2 K: 250 intra_model: !ref <SBtfintra> inter_model: !ref <SBtfinter> norm: ln linear_layer_after_inter_intra: False skip_around_intra: True

Decoder: !new:speechbrain.lobes.models.dual_path.Decoder in_channels: !ref <N_encoder_out> out_channels: 1 kernel_size: !ref <kernel_size> stride: !ref <kernel_stride> bias: False

optimizer: !name:torch.optim.Adam lr: !ref weight_decay: 0

loss: !name:speechbrain.nnet.losses.get_si_snr_with_pitwrapper

lr_scheduler: !new:speechbrain.nnet.schedulers.ReduceLROnPlateau factor: 0.5 patience: 2 dont_halve_until_epoch: 5

epoch_counter: !new:speechbrain.utils.epoch_loop.EpochCounter limit: !ref <N_epochs>

modules: encoder: !ref <Encoder> decoder: !ref <Decoder> masknet: !ref <MaskNet>

checkpointer: !new:speechbrain.utils.checkpoints.Checkpointer checkpoints_dir: !ref <save_folder> recoverables: encoder: !ref <Encoder> decoder: !ref <Decoder> masknet: !ref <MaskNet> counter: !ref <epoch_counter> # lr_scheduler: !ref <lr_scheduler>

train_logger: !new:speechbrain.utils.train_logger.FileTrainLogger save_file: !ref <train_log>

Thank you!

wenyuc55 avatar Dec 10 '24 08:12 wenyuc55

@ycemsubakan any chance that you could have a look to this?

TParcollet avatar Jan 31 '25 15:01 TParcollet

Sure I will try to do it this weekend.

ycemsubakan avatar Jan 31 '25 15:01 ycemsubakan