[Data] Reduce internal Ray Data stack trace output by default
Why are these changes needed?
Whenever there is any error when using Ray Data, the full stack trace is currently printed to stdout. If the exception originates from the user code, the internal stack trace is usually not needed, and obfuscates the relevant user code with the actual error. This PR reduces the stack trace output to omit internal Ray Data or Ray Core code in the stdout output, and only logs the full stack trace to the Ray Data ray-data.log file. Users can enable the full stack trace outputted to stdout by setting DataContext.log_internal_stack_trace_to_stdout = True.
For the sample user code with an error, we compare the stack trace output for several Ray Data usages before and after this PR:
import ray
def f(x):
1/0
return x
The output is as follows:
# Example 1
ray.data.range(10).map(f).take_all()
Before (stdout):
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 7, in <module>
ray.data.range(10).map(f).take_all()
File "/Users/sjl/src/ray/python/ray/data/dataset.py", line 2437, in take_all
for row in self.iter_rows():
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 229, in _wrapped_iterator
for batch in batch_iterable:
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 164, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 506, in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
for bundle in bundle_iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 211, in run
while self._scheduling_loop_step(self._topology) and not self._shutdown:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 259, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in process_completed_tasks
raise e from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 421, in process_completed_tasks
num_blocks_read = task.on_data_ready(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 102, in on_data_ready
raise ex from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 98, in on_data_ready
ray.get(block_ref)
File "/Users/sjl/src/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 2653, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 870, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ZeroDivisionError): ray::Map(f)() (pid=50729, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 4, in f
1/0
ZeroDivisionError: division by zero
After (stdout):
2024-02-19 00:49:28,350 ERROR dataset_logger.py:57 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log file at /tmp/ray/session_2024-02-19_00-49-24_680422_61222/logs/ray-data.log. To output the full stack trace to stdout, set `DataContext.internal_stack_trace_stdout` to True`.
ray.data.UserCodeException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 8, in <module>
ray.data.range(10).map_batches(f).take_all()
File "/Users/sjl/src/ray/python/ray/data/dataset.py", line 2435, in take_all
for row in self.iter_rows():
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 236, in _wrapped_iterator
for batch in batch_iterable:
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 171, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 79, in handle_trace
raise e.with_traceback(None) from UserCodeException()
ray.exceptions.RayTaskError(ZeroDivisionError): ray::MapBatches(f)() (pid=61238, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 215, in __call__
yield from self._batch_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 189, in transform_fn
res = fn(batch)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 5, in f
1/0
ZeroDivisionError: division by zero
After (ray-data.log):
2024-02-19 00:49:28,350 ERROR dataset_logger.py:57 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log file at /tmp/ray/session_2024-02-19_00-49-24_680422_61222/logs/ray-data.log. To output the full stack trace to stdout, set `DataContext.internal_stack_trace_stdout` to True`.
2024-02-19 00:49:28,350 ERROR dataset_logger.py:74 -- Full stack trace:
Traceback (most recent call last):
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 44, in handle_trace
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 507, in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
for bundle in bundle_iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 214, in run
continue_sched = self._scheduling_loop_step(self._topology)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 264, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in process_completed_tasks
raise e from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 421, in process_completed_tasks
num_blocks_read = task.on_data_ready(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 102, in on_data_ready
raise ex from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 98, in on_data_ready
ray.get(block_ref)
File "/Users/sjl/src/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 2653, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 870, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ZeroDivisionError): [36mray::MapBatches(f)()[39m (pid=61238, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 215, in __call__
yield from self._batch_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 189, in transform_fn
res = fn(batch)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 5, in f
1/0
ZeroDivisionError: division by zero
# Example 2
import numpy as np
arr = np.arange(10)
list(ray.data.from_numpy(arr).map(f).iter_batches())
Before (stdout):
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 8, in <module>
list(ray.data.range(10).map(f).iter_batches())
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 164, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 506, in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
for bundle in bundle_iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 211, in run
while self._scheduling_loop_step(self._topology) and not self._shutdown:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 259, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in process_completed_tasks
raise e from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 421, in process_completed_tasks
num_blocks_read = task.on_data_ready(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 102, in on_data_ready
raise ex from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 98, in on_data_ready
ray.get(block_ref)
File "/Users/sjl/src/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 2653, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 870, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ZeroDivisionError): ray::Map(f)() (pid=50841, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 4, in f
1/0
ZeroDivisionError: division by zero
After (stdout):
2024-02-19 00:51:20,994 ERROR dataset_logger.py:57 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log file at /tmp/ray/session_2024-02-19_00-51-17_453231_61516/logs/ray-data.log. To output the full stack trace to stdout, set `DataContext.internal_stack_trace_stdout` to True`.
ray.data.UserCodeException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 11, in <module>
list(ray.data.from_numpy(arr).map(f).iter_batches())
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 171, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 79, in handle_trace
raise e.with_traceback(None) from UserCodeException()
ray.exceptions.RayTaskError(ZeroDivisionError): ray::Map(f)() (pid=61539, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 5, in f
1/0
ZeroDivisionError: division by zero
After (ray-data.log):
2024-02-19 00:51:20,994 ERROR dataset_logger.py:57 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log file at /tmp/ray/session_2024-02-19_00-51-17_453231_61516/logs/ray-data.log. To output the full stack trace to stdout, set `DataContext.internal_stack_trace_stdout` to True`.
2024-02-19 00:51:20,994 ERROR dataset_logger.py:74 -- Full stack trace:
Traceback (most recent call last):
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 44, in handle_trace
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 507, in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
for bundle in bundle_iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 214, in run
continue_sched = self._scheduling_loop_step(self._topology)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 264, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in process_completed_tasks
raise e from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 421, in process_completed_tasks
num_blocks_read = task.on_data_ready(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 102, in on_data_ready
raise ex from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 98, in on_data_ready
ray.get(block_ref)
File "/Users/sjl/src/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 2653, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 870, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ZeroDivisionError): [36mray::Map(f)()[39m (pid=61539, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 5, in f
1/0
ZeroDivisionError: division by zero
# Example 3
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv").map(f).materialize()
Before (stdout):
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 9, in <module>
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv").map(f).materialize()
File "/Users/sjl/src/ray/python/ray/data/dataset.py", line 4506, in materialize
copy._plan.execute(force_read=True)
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 572, in execute
blocks = execute_to_legacy_block_list(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 108, in execute_to_legacy_block_list
block_list = _bundles_to_block_list(bundles)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 204, in _bundles_to_block_list
for ref_bundle in bundles:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 211, in run
while self._scheduling_loop_step(self._topology) and not self._shutdown:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 259, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in process_completed_tasks
raise e from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 421, in process_completed_tasks
num_blocks_read = task.on_data_ready(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 102, in on_data_ready
raise ex from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 98, in on_data_ready
ray.get(block_ref)
File "/Users/sjl/src/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 2653, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 870, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ZeroDivisionError): ray::Map(f)() (pid=51102, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 4, in f
1/0
ZeroDivisionError: division by zero
After (stdout):
ERROR dataset_logger.py:57 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log file at /tmp/ray/session_2024-02-19_00-54-09_570692_61907/logs/ray-data.log. To output the full stack trace to stdout, set `DataContext.internal_stack_trace_stdout` to True`.
ray.data.UserCodeException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 13, in <module>
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv").map(f).materialize()
File "/Users/sjl/src/ray/python/ray/data/dataset.py", line 4610, in materialize
copy._plan.execute(force_read=True)
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 79, in handle_trace
raise e.with_traceback(None) from UserCodeException()
ray.exceptions.RayTaskError(ZeroDivisionError): ray::Map(f)() (pid=61937, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 5, in f
1/0
ZeroDivisionError: division by zero
After (ray-data.log):
2024-02-19 00:54:13,289 ERROR dataset_logger.py:57 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log file at /tmp/ray/session_2024-02-19_00-54-09_570692_61907/logs/ray-data.log. To output the full stack trace to stdout, set `DataContext.internal_stack_trace_stdout` to True`.
2024-02-19 00:54:13,289 ERROR dataset_logger.py:74 -- Full stack trace:
Traceback (most recent call last):
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 44, in handle_trace
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 574, in execute
blocks = execute_to_legacy_block_list(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 108, in execute_to_legacy_block_list
block_list = _bundles_to_block_list(bundles)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 204, in _bundles_to_block_list
for ref_bundle in bundles:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 214, in run
continue_sched = self._scheduling_loop_step(self._topology)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 264, in _scheduling_loop_step
num_errored_blocks = process_completed_tasks(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in process_completed_tasks
raise e from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 421, in process_completed_tasks
num_blocks_read = task.on_data_ready(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 102, in on_data_ready
raise ex from None
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 98, in on_data_ready
ray.get(block_ref)
File "/Users/sjl/src/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 2653, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/Users/sjl/src/ray/python/ray/_private/worker.py", line 870, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ZeroDivisionError): [36mray::Map(f)()[39m (pid=61937, ip=127.0.0.1)
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
for data in iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
yield from self._row_fn(input, ctx)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
out_row = fn(row)
File "/Users/sjl/src/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
return op_fn(item, *fn_args, **fn_kwargs)
File "/Users/sjl/Desktop/test_exception.py", line 5, in f
1/0
ZeroDivisionError: division by zero
Finally, here is a comparison of the log output in the case of an internal Ray Data code error (I added a 1/0 in the Ray Data code to trigger an exception):
# Example 1, with `ZeroDivisionError` caused by Ray Data internal code path
ray.data.range(10).map(f).take_all()
Before:
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 13, in <module>
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv").map(f).materialize()
File "/Users/sjl/src/ray/python/ray/data/dataset.py", line 4506, in materialize
copy._plan.execute(force_read=True)
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 572, in execute
blocks = execute_to_legacy_block_list(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 108, in execute_to_legacy_block_list
block_list = _bundles_to_block_list(bundles)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 204, in _bundles_to_block_list
for ref_bundle in bundles:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 274, in get_output_blocking
1/0
ZeroDivisionError: division by zero
After (stdout):
2024-02-19 00:57:22,367 ERROR dataset_logger.py:67 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
ray.data.SystemException
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/sjl/Desktop/test_exception.py", line 8, in <module>
ray.data.range(10).map_batches(f).take_all()
File "/Users/sjl/src/ray/python/ray/data/dataset.py", line 2435, in take_all
for row in self.iter_rows():
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 236, in _wrapped_iterator
for batch in batch_iterable:
File "/Users/sjl/src/ray/python/ray/data/iterator.py", line 171, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 81, in handle_trace
raise e.with_traceback(None) from SystemException()
ZeroDivisionError: division by zero
After (ray-data.log):
2024-02-19 00:57:22,367 ERROR dataset_logger.py:67 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2024-02-19 00:57:22,367 ERROR dataset_logger.py:74 -- Full stack trace:
Traceback (most recent call last):
File "/Users/sjl/src/ray/python/ray/data/_internal/dataset_logger.py", line 44, in handle_trace
return fn(*args, **kwargs)
File "/Users/sjl/src/ray/python/ray/data/_internal/plan.py", line 507, in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/legacy_compat.py", line 45, in execute_to_legacy_block_iterator
for bundle in bundle_iter:
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 144, in get_next
item = self._outer._output_node.get_output_blocking(
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 277, in get_output_blocking
raise self._exception
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 214, in run
continue_sched = self._scheduling_loop_step(self._topology)
File "/Users/sjl/src/ray/python/ray/data/_internal/execution/streaming_executor.py", line 264, in _scheduling_loop_step
1/0
ZeroDivisionError: division by zero
Related issue number
Checks
- [ ] I've signed off every commit(by using the -s flag, i.e.,
git commit -s) in this PR. - [ ] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/under the corresponding.rstfile.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(