`experimental_fetch_to_device` makes no difference since TF 2.14
Version
1.30
Describe the bug.
Following up on #5171, I ran some experiments to see if the issue of GPU-CPU-GPU roundtrip was happening with strategies other than MirroredStrategy.
I trained a simple CNN with GPU accelerated JPEG decoding. Only one worker with one GPU was used. For all strategies, a tf.distribute.DistributedDataset was created using tf.distribute.Strategy.distribute_datasets_from_function. The table below shows step durations in miliseconds.
| TF Version | No Strategy | OneDeviceStrategy | MirroredStrategy w/ InputOptions | MirroredStrategy w/o InputOptions | MultiWorkerMirroredStrategy |
|---|---|---|---|---|---|
| 2.12.0 2.13.0 2.13.1 | 495 | 578 | 565 | 494 | 617 |
| 2.14.0 | 515 | 556 | 616 | 600 | 616 |
As you can see, pre 2.14, MirroredStrategy w/ InputOptions is equivalent to having no strategy, everything else incurs noticeable overhead, presumably due to the aforementioned issue. Interestingly, the overhead is not consistent among strategies, I wonder why that is since no actual synchronization is happening?
Is this something that can be easily fixed? How difficult would it be to implement the experimental_fetch_to_device option for other strategies, most notably the MultiWorkerMirroredStrategy. We need this strategy in particular since our infrastructure is made out of A100 MiGs which cannot be addressed individually on a single host.
Minimum reproducible example
def DALIPipeline(
batch_size, num_threads, device_id, tfrecord_files, tfrecord_index_files, device, current_worker, num_workers
):
pipeline = dali.pipeline.Pipeline(
batch_size=batch_size,
num_threads=num_threads,
device_id=device_id,
)
with pipeline:
inputs = fn.readers.tfrecord(
path=tfrecord_files,
index_path=tfrecord_index_files,
features={
"image": dali.tfrecord.FixedLenFeature((), dali.tfrecord.string, ""),
"label": dali.tfrecord.FixedLenFeature([1], dali.tfrecord.int64, -1),
},
random_shuffle=True,
initial_fill=SHUFFLE_BUFFER_SIZE,
num_shards=num_workers,
shard_id=current_worker,
)
images = inputs["image"]
labels = inputs["label"]
labels = fn.one_hot(labels, num_classes=len(CLASSES))
images = fn.decoders.image(
images,
device="mixed",
preallocate_height_hint=640,
preallocate_width_hint=384,
bytes_per_sample_hint=640 * 384 * 3,
)
images = fn.color_space_conversion(
images,
image_type=dali.types.DALIImageType.RGB,
output_type=dali.types.DALIImageType.GRAY,
)
images = fn.cast(images, dtype=dali.types.DALIDataType.FLOAT, device=device)
labels = labels.gpu() if device == "gpu" else labels
pipeline.set_outputs(images, labels)
return pipeline
def dataset_fn(input_context = None) -> tf.data.Dataset:
with tf.device("/gpu:0"):
pipeline = DALIPipeline(
batch_size=BATCH_SIZE,
num_threads=NUM_CPU,
device_id=0,
tfrecord_files=RECORD_SHARD_PATHS,
tfrecord_index_files=INDEX_SHARD_PATHS,
device="gpu",
current_worker=0,
num_workers=1,
)
return DALIDataset(
pipeline=pipeline,
batch_size=BATCH_SIZE,
output_shapes=((BATCH_SIZE, 640, 384, 1), (BATCH_SIZE, len(CLASSES))),
output_dtypes=((tf.float32), (tf.float32)),
device_id=0,
fail_on_device_mismatch=False,
)
def train():
# strategy = tf.distribute.MultiWorkerMirroredStrategy()
strategy = tf.distribute.MirroredStrategy()
# strategy = tf.distribute.OneDeviceStrategy("/gpu:0")
input_options = tf.distribute.InputOptions(
experimental_place_dataset_on_device=True,
experimental_fetch_to_device=False,
experimental_replication_mode=tf.distribute.InputReplicationMode.PER_REPLICA,
)
# Used only with MirroredStrategy.
dataset = strategy.distribute_datasets_from_function(dataset_fn, input_options)
# Used with every other strategy.
# dataset = strategy.distribute_datasets_from_function(dataset_fn)
# Used with no strategy.
# dataset = dataset_fn()
# Context used only with strategies.
with strategy.scope():
model = Model()
model.compile(
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True, label_smoothing=0.1),
optimizer=tf.keras.optimizers.Adam(
learning_rate=3e-4,
weight_decay=1e-5,
),
metrics=[
"accuracy",
],
jit_compile=True,
)
Relevant log output
No response
Other/Misc.
CUDA 11.8
Check for duplicates
- [X] I have searched the open bugs/issues and have found no duplicates for this bug report
Hello @cyanic-selkie, nice to hear from you again.
Thanks a lot for your work to run these tests.
Is this something that can be easily fixed? How difficult would it be to implement the experimental_fetch_to_device option for other strategies, most notably the MultiWorkerMirroredStrategy. We need this strategy in particular since our infrastructure is made out of A100 MiGs which cannot be addressed individually on a single host.
To best answer this I need to contact some other teams at NVIDIA who we collaborated with on this feature when it was developed. I will get back to you on that.
One important question here is what is your exact software stack? Mainly, do you use upstream TensorFlow or the NGC container?
@awolant
One important question here is what is your exact software stack? Mainly, do you use upstream TensorFlow or the NGC container?
We use custom containers with nvidia/cuda:11.8.0-cudnn8-devel-ubuntu22.04 as the base image. We then simply install Tensorflow/DALI via pip inside it.
One important thing that I completely forgot about is that we use tcmalloc.
Our driver version is 525.60.13.