DALI icon indicating copy to clipboard operation
DALI copied to clipboard

First‐batch filenames misaligned by two frames when using ExternalSource + DALIGenericIterator (DALI 1.49)

Open aafaqin opened this issue 11 months ago • 6 comments

Describe the question.

I’m trying to feed raw JPEG bytes and their corresponding frame‐indices into a DALI pipeline, then look up the real filenames on the Python side. My minimal repro looks like this (using DALI 1.49):


import os, numpy as np
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIGenericIterator
from nvidia.dali.plugin.base_iterator import LastBatchPolicy

class ExternalInputIterator:
    def __init__(self, batch_size, image_dir):
        self.files = sorted(os.listdir(image_dir))
        self.batch_size = batch_size
        self.i = 0
    def __iter__(self):
        self.i = 0
        return self
    def __next__(self):
        if self.i >= len(self.files):
            raise StopIteration()
        bufs, idxs = [], []
        for _ in range(self.batch_size):
            if self.i >= len(self.files): break
            fname = self.files[self.i]
            with open(os.path.join(image_dir, fname), 'rb') as f:
                bufs.append(np.frombuffer(f.read(), dtype=np.uint8))
            idxs.append(np.int64(self.i))
            self.i += 1
        return bufs, idxs

class SimplePipeline(Pipeline):
    def __init__(self, bs, nthreads, devid, external_iter, W, H):
        super().__init__(bs, nthreads, devid,
                         exec_async=False, exec_pipelined=False,
                         prefetch_queue_depth=1, seed=123)
        self.jpegs, self.idxs = fn.external_source(
            source=external_iter,
            num_outputs=2,
            dtype=[types.UINT8, types.INT64],
            parallel=False
        )
        self.decode  = fn.decoders.image(self.jpegs, device="mixed", output_type=types.RGB)
        self.resized = fn.resize(self.decode, device="gpu", resize_x=W, resize_y=H)
        self.normed  = fn.crop_mirror_normalize(
            self.resized, device="gpu", dtype=types.FLOAT, output_layout="CHW",
            mean=[0,0,0], std=[255,255,255], shift=0
        )
    def define_graph(self):
        return self.normed, self.idxs

# … in infer_on_DALI() …
all_files = sorted(os.listdir(image_dir))
external_it = ExternalInputIterator(batch_size, image_dir)
pipe = SimplePipeline(batch_size, 4, 0, external_it, WIDTH, HEIGHT)
pipe.build()

dali_iter = DALIGenericIterator(
    pipe,
    ["data", "idx"],
    size=len(all_files),
    last_batch_policy=LastBatchPolicy.PARTIAL,
    auto_reset=True,
    prepare_first_batch=False  # or omitted
)

for batch in dali_iter:
    images = batch[0]["data"]
    idxs   = batch[0]["idx"].cpu().numpy().astype(int)
    frame_names = [all_files[i] for i in idxs]
    print("Processing", frame_names)
    # … do detection …

Observed behavior

Every frame_names printed inside the loop is consistently two frames behind what the ExternalInputIterator logs on its own print calls.

Adding or removing prepare_first_batch=False and/or manual next(dali_iter) flushes changes how many batches are skipped, but I can’t get a 1:1 alignment.

I’m on DALI 1.49.

Expected behavior

The first batch inside the for batch in dali_iter: should correspond to the first images and indices yielded by ExternalInputIterator, with no offset.

Questions

Am I misusing prepare_first_batch vs. DALI’s default priming?

Are there any internal queues or prefetching details in DALI 1.49 that introduce this two-frame lag?

What is the recommended pattern for keeping an ExternalSource -> DALIGenericIterator pipeline perfectly in sync, so my Python-side indices (and thus filenames) line up exactly with the tensors I receive?

Any pointers or example snippets to fix this alignment issue would be greatly appreciated!

Check for duplicates

  • [x] I have searched the open bugs/issues and have found no duplicates for this bug report

aafaqin avatar May 24 '25 16:05 aafaqin

Hi @aafaqin,

Thank you for reaching out.

The current flow that employs the iterator is as follows:

  • Obtain a batch from the pipeline. If none has been scheduled, schedule the execution.
  • Schedule another run to have samples ready when the pipeline is queried again.
  • Return the obtained batch.

So what you see is:

  • ExternalInputIterator is called to get the current batch.
  • Before the current batch is returned, ExternalInputIterator is called again to have it in the pipeline for the next iterator call.

Can you tell me the reason you want to have this 1:1 correspondence between the ExternalSource call and the iterator output?

JanuszL avatar May 26 '25 11:05 JanuszL

Hi Janusz, thanks again for explaining DALI’s prefetch behavior. I’ve got one more wrinkle in my frame-to-inference sync:

What I’m doing

My ExternalInputIterator feeds frames into a DALI Pipeline (with default exec_async=True and prefetch_queue_depth=2).

For each DALI output I immediately call into TensorRT’s Python API:


# set up
logger        = trt.Logger(trt.Logger.WARNING)
runtime       = trt.Runtime(logger)
engine        = runtime.deserialize_cuda_engine(engine_bytes)
context       = engine.create_execution_context()
stream        = cuda.Stream()

# in infer():
context.set_binding_shape(...)
cuda.memcpy_dtod_async(..., stream)
context.execute_async_v3(stream.handle)
stream.synchronize()

What I see

The first frame lines up correctly.

Subsequent frames are offset by two—e.g. the second DALI output gets inference results for the first frame, the third maps to the second, etc.

Question Could DALI’s async/pipelined prefetch be advancing the iterator two frames ahead before I push into:

trt.Runtime → trt.ICudaEngine → trt.IExecutionContext.execute_async_v3

pycuda.driver.Stream + cuda.memcpy_dtod_async

What’s the recommended way to force a strict 1:1 sequence from ExternalSource → DALI output → IExecutionContext so there’s no hidden two-frame “lag”? Should I tune prefetch_queue_depth, disable exec_async/exec_pipelined, or adjust TensorRT’s stream usage? Any example settings would be extremely helpful.

Thanks, Aafaq

aafaqin avatar May 26 '25 11:05 aafaqin

Hi @aafaqin,

If you want to run the inference the above configuration should work fine and you can skip the iterator and just use run method:

import torch

o = pipe.run()
o0 = torch.from_dlpack(o[0][0].__dlpack__())
o1 = torch.from_dlpack(o[1][0].__dlpack__())
print(o0, o1)

JanuszL avatar May 26 '25 11:05 JanuszL

@aafaqin The code above will work for batch_size==1. If you need multiple samples to be represented as a single tensor (NHWC), then you can do:

o = pipe.run()
o0 = torch.from_dlpack(o[0].as_tensor().__dlpack__())
o1 = torch.from_dlpack(o[1].as_tensor().__dlpack__())

mzient avatar May 26 '25 12:05 mzient

Hi @JanuszL, @mzient and DALI team, thanks for the speedy replies

I’ve moved to driving DALI with pipe.run() + zero-copy DLPack conversion per batch, which should eliminate any hidden prefetch, but I still see the same two-frame offset:


# in infer_on_DALI(...)
steps = (n + batch_size - 1) // batch_size
for _ in range(steps):
    out_images, out_idxs = pipe.run()

    # zero-copy to PyTorch
    imgs = torch.from_dlpack(out_images.as_tensor().__dlpack__()).contiguous()
    idxs = torch.from_dlpack(out_idxs.as_tensor().__dlpack__()).cpu().numpy().astype(int)

    # Now imgs[i] should correspond exactly to frame file_list[idxs[i]]
    # But after the first batch, each subsequent frame is shifted by 2.

Suspecting the TensorRT side My TensorRTDetector does all allocation once in init, and then reuses a single CUDA stream for every inference:


# TensorRTDetector.__init__
cuda.init()
self.device = cuda.Device(0)
self.ctx    = self.device.make_context()
# … load engine, create IExecutionContext, allocate all device_mem …
self.stream = cuda.Stream()
self.ctx.pop()  # let DALI use the primary context

# TensorRTDetector.infer()
self.ctx.push()
cuda.memcpy_dtod_async(     # copy input into device_mem
    self.inputs[self.input_name]['device_mem'],
    input_tensor.data_ptr(),
    self.inputs[self.input_name]['size'],
    self.stream
)
self.context.execute_async_v3(self.stream.handle)
cuda.memcpy_dtod_async(     # copy output back
    output_tensor.data_ptr(),
    self.outputs[self.output_name]['device_mem'],
    self.outputs[self.output_name]['size'],
    self.stream
)
self.stream.synchronize()
self.ctx.pop()

Questions Stream reuse Could reusing the same pycuda.driver.Stream() for every batch lead to writes from batch n bleeding into batch n+1, causing a two-frame lag? Would creating a fresh Stream() inside infer() fix this?

Synchronization ordering Should I be calling stream.synchronize() before launching the next memcpy_dtod_async, not only after inference?

Device buffer allocation Is it safer to allocate new device buffers (cuda.mem_alloc) per batch rather than once at init, to guarantee each batch writes to a clean buffer?

CUDA context lifecycle

I call make_context() → ctx.pop() in init, then ctx.push()/ctx.pop() around every infer().

Could that pattern be stomping on DALI’s primary context or my own inference state?

Would it be better to hold the TRT context open (skip the initial pop()) or even recreate the context per frame?

Any best-practice recommendations or example snippets for TensorRT + PyCUDA stream/context management that ensure a strict 1:1 mapping from DALI output → TRT inference would be hugely appreciated!

aafaqin avatar May 26 '25 14:05 aafaqin

@aafaqin, are you observing this shift in the output of the DALI pipeline or from TensorRT (TRT)? Could you provide a self-contained code example that reproduces this issue? You can use a toy example with images from DALI_extra based on the initial code sample you provided.

I'm asking because I cannot reproduce the behavior you're seeing, so I might be missing some details.

Regarding your questions:

  • You don't need to create a new stream every time; DALI uses its own, and when you request the output, the data is ready to be used on any stream.
  • It should be enough to synchronize only after the copy out.
  • Reusing the same buffer should be safe.
  • DALI manages its context separately, so it should not interfere with the one from PyCUDA.

JanuszL avatar May 26 '25 14:05 JanuszL