xarray-beam icon indicating copy to clipboard operation
xarray-beam copied to clipboard

Adding support for `MergeDim`s and Split Variables to `FilePatternToChunks` transform.

Open alxmrs opened this issue 4 years ago • 3 comments

Fixes #29 and #38.

alxmrs avatar Sep 24 '21 00:09 alxmrs

I'm testing this PR end-to-end with a script that uses this file pattern for a Grib 2 dataset (uses the cfgrib backend in XArray), deployed on GCP's Dataflow. Right now I'm getting what appears to be a deadline. My pipeline ends with this error:

Root cause: The worker lost contact with the service.

Traces in logs show that threads are acquiring a lock, though it's unclear if it's just a big dataset and thus taking some time.

log 1
"Operation ongoing for over 1665.73 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-764  without returning. Current Traceback:
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()

  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task
    self._execute(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
    return getattr(self, request_type)(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
    self.output(decoded_value)

  File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 152, in _open_chunks
    with self._open_dataset(path) as dataset:

  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)

  File "/usr/local/lib/python3.8/site-packages/xarray_beam/_src/pangeo_forge.py", line 137, in _open_dataset
    local_file = fsspec.open_local(

  File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 487, in open_local
    with of as files:

  File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 184, in __enter__
    self.files = fs.open_many(self)

  File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 394, in <lambda>
    return lambda *args, **kw: getattr(type(self), item).__get__(self)(

  File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 503, in open_many
    self.fs.get(downpath, downfn)

  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
    return sync(self.loop, func, *args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 59, in sync
    if event.wait(1):

  File "/usr/local/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)

  File "/usr/local/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)
log 2
Operation ongoing for over 743.75 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-224  without returning. Current Traceback:
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()

  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task
    self._execute(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
    return getattr(self, request_type)(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(

  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
    self.output(decoded_value)

  File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 185, in _open_chunks
    new_key, chunk.compute(num_workers=num_threads)

  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 1031, in compute
    return new.load(**kwargs)

  File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 865, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)

  File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 570, in compute
    results = schedule(dsk, keys, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(

  File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 506, in get_async
    for key, res_info, failed in queue_get(queue).result():

  File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 134, in queue_get
    return q.get()

  File "/usr/local/lib/python3.8/queue.py", line 170, in get
    self.not_empty.wait()

  File "/usr/local/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()

This to me seems like an instance of https://github.com/pydata/xarray/issues/4591. Right now, I'm going to experiment with changing the scheduler to use a single thread in the compute method of _open_chunks().

alxmrs avatar Oct 07 '21 19:10 alxmrs

I should mention: The Dataflow diagnostics for the above report is showing unresponsive threads, making a dead-lock scenario more sound.

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "keepalive watchdog timeout" debug_error_string = "{"created":"@1630579134.284653312","description":"Error received from peer ipv6:[::1]:12371","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"keepalive watchdog timeout","grpc_status":14}" >
at _next (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:803)
at __next__ (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:416)
at run (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:251)
at main (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py:182)

alxmrs avatar Oct 07 '21 19:10 alxmrs

Dead-locking seems plausible, but this is different from pydata/xarray#4591 which describes a serialization failure.

shoyer avatar Oct 07 '21 19:10 shoyer

This has gone stale.

shoyer avatar Jan 05 '24 03:01 shoyer