DataPipes for Error Handling and Retry
🚀 The feature
Provide a mechanism to catch exception raised by a previous DataPipe and retry. These can be related but separate DataPipes. Non-DataPipes implementation should also be considered.
Motivation, pitch
Currently, there is no standard mechanism to handle exception from a source DataPipe. One example of such issues is raised on the PyTorch forum.
Alternatives
Given how iterators are implemented, it is unclear to me if this is feasible as a DataPipe implementation. We may have to modify the iterator wrapper within _hook_iterator.py (inside PyTorch core) to make this feature possible.
We can also consider modifying specific DataPipes where custom error handling and retries are frequently needed .
Additional context
No response
I had similar issues working with messy data and flaky processing steps. I implemented a try map pipe that extends the map pipe to skip samples that raise exceptions.
@functional_datapipe("try_map")
class TryMapIterDataPipe(IterDataPipe):
def __new__(
cls, datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None
) -> None:
def try_fn(x):
try:
return fn(x)
except Exception as e:
print(e)
return None
return datapipe.map(try_fn, input_col=input_col, output_col=output_col) # type: ignore