DALI icon indicating copy to clipboard operation
DALI copied to clipboard

`experimental_fetch_to_device` makes no difference since TF 2.14

Open cyanic-selkie opened this issue 2 years ago • 2 comments

Version

1.30

Describe the bug.

Following up on #5171, I ran some experiments to see if the issue of GPU-CPU-GPU roundtrip was happening with strategies other than MirroredStrategy.

I trained a simple CNN with GPU accelerated JPEG decoding. Only one worker with one GPU was used. For all strategies, a tf.distribute.DistributedDataset was created using tf.distribute.Strategy.distribute_datasets_from_function. The table below shows step durations in miliseconds.

TF Version No Strategy OneDeviceStrategy MirroredStrategy w/ InputOptions MirroredStrategy w/o InputOptions MultiWorkerMirroredStrategy
2.12.0 2.13.0 2.13.1 495 578 565 494 617
2.14.0 515 556 616 600 616

As you can see, pre 2.14, MirroredStrategy w/ InputOptions is equivalent to having no strategy, everything else incurs noticeable overhead, presumably due to the aforementioned issue. Interestingly, the overhead is not consistent among strategies, I wonder why that is since no actual synchronization is happening?

Is this something that can be easily fixed? How difficult would it be to implement the experimental_fetch_to_device option for other strategies, most notably the MultiWorkerMirroredStrategy. We need this strategy in particular since our infrastructure is made out of A100 MiGs which cannot be addressed individually on a single host.

Minimum reproducible example

def DALIPipeline(
    batch_size, num_threads, device_id, tfrecord_files, tfrecord_index_files, device, current_worker, num_workers
):
    pipeline = dali.pipeline.Pipeline(
        batch_size=batch_size,
        num_threads=num_threads,
        device_id=device_id,
    )
    with pipeline:
        inputs = fn.readers.tfrecord(
            path=tfrecord_files,
            index_path=tfrecord_index_files,
            features={
                "image": dali.tfrecord.FixedLenFeature((), dali.tfrecord.string, ""),
                "label": dali.tfrecord.FixedLenFeature([1], dali.tfrecord.int64, -1),
            },
            random_shuffle=True,
            initial_fill=SHUFFLE_BUFFER_SIZE,
            num_shards=num_workers,
            shard_id=current_worker,
        )

        images = inputs["image"]
        labels = inputs["label"]

        labels = fn.one_hot(labels, num_classes=len(CLASSES))

        images = fn.decoders.image(
            images,
            device="mixed",
            preallocate_height_hint=640,
            preallocate_width_hint=384,
            bytes_per_sample_hint=640 * 384 * 3,
        )
        images = fn.color_space_conversion(
            images,
            image_type=dali.types.DALIImageType.RGB,
            output_type=dali.types.DALIImageType.GRAY,
        )
        images = fn.cast(images, dtype=dali.types.DALIDataType.FLOAT, device=device)

        labels = labels.gpu() if device == "gpu" else labels

        pipeline.set_outputs(images, labels)

    return pipeline

def dataset_fn(input_context = None) -> tf.data.Dataset:
    with tf.device("/gpu:0"):
        pipeline = DALIPipeline(
            batch_size=BATCH_SIZE,
            num_threads=NUM_CPU,
            device_id=0,
            tfrecord_files=RECORD_SHARD_PATHS,
            tfrecord_index_files=INDEX_SHARD_PATHS,
            device="gpu",
            current_worker=0,
            num_workers=1,
        )

        return DALIDataset(
            pipeline=pipeline,
            batch_size=BATCH_SIZE,
            output_shapes=((BATCH_SIZE, 640, 384, 1), (BATCH_SIZE, len(CLASSES))),
            output_dtypes=((tf.float32), (tf.float32)),
            device_id=0,
            fail_on_device_mismatch=False,
        )

def train():
    # strategy = tf.distribute.MultiWorkerMirroredStrategy()
    strategy = tf.distribute.MirroredStrategy()
    # strategy = tf.distribute.OneDeviceStrategy("/gpu:0")

    input_options = tf.distribute.InputOptions(
        experimental_place_dataset_on_device=True,
        experimental_fetch_to_device=False,
        experimental_replication_mode=tf.distribute.InputReplicationMode.PER_REPLICA,
    )
    # Used only with MirroredStrategy.
    dataset = strategy.distribute_datasets_from_function(dataset_fn, input_options)
    # Used with every other strategy. 
    # dataset = strategy.distribute_datasets_from_function(dataset_fn)
    # Used with no strategy.
    # dataset = dataset_fn()
    
    # Context used only with strategies.
    with strategy.scope():
        model = Model()
        model.compile(
            loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True, label_smoothing=0.1),
            optimizer=tf.keras.optimizers.Adam(
                learning_rate=3e-4,
                weight_decay=1e-5,
            ),
            metrics=[
                "accuracy",
            ],
            jit_compile=True,
        )

Relevant log output

No response

Other/Misc.

CUDA 11.8

Check for duplicates

  • [X] I have searched the open bugs/issues and have found no duplicates for this bug report

cyanic-selkie avatar Nov 23 '23 10:11 cyanic-selkie

Hello @cyanic-selkie, nice to hear from you again.

Thanks a lot for your work to run these tests.

Is this something that can be easily fixed? How difficult would it be to implement the experimental_fetch_to_device option for other strategies, most notably the MultiWorkerMirroredStrategy. We need this strategy in particular since our infrastructure is made out of A100 MiGs which cannot be addressed individually on a single host.

To best answer this I need to contact some other teams at NVIDIA who we collaborated with on this feature when it was developed. I will get back to you on that.

One important question here is what is your exact software stack? Mainly, do you use upstream TensorFlow or the NGC container?

awolant avatar Nov 23 '23 15:11 awolant

@awolant

One important question here is what is your exact software stack? Mainly, do you use upstream TensorFlow or the NGC container?

We use custom containers with nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04 as the base image. We then simply install Tensorflow/DALI via pip inside it.

One important thing that I completely forgot about is that we use tcmalloc.

Our driver version is 525.60.13.

cyanic-selkie avatar Nov 24 '23 06:11 cyanic-selkie