Adding support for `MergeDim`s and Split Variables to `FilePatternToChunks` transform.
Fixes #29 and #38.
I'm testing this PR end-to-end with a script that uses this file pattern for a Grib 2 dataset (uses the cfgrib backend in XArray), deployed on GCP's Dataflow. Right now I'm getting what appears to be a deadline. My pipeline ends with this error:
Root cause: The worker lost contact with the service.
Traces in logs show that threads are acquiring a lock, though it's unclear if it's just a big dataset and thus taking some time.
log 1
"Operation ongoing for over 1665.73 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-764 without returning. Current Traceback:
File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task
self._execute(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
self.output(decoded_value)
File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 152, in _open_chunks
with self._open_dataset(path) as dataset:
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.8/site-packages/xarray_beam/_src/pangeo_forge.py", line 137, in _open_dataset
local_file = fsspec.open_local(
File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 487, in open_local
with of as files:
File "/usr/local/lib/python3.8/site-packages/fsspec/core.py", line 184, in __enter__
self.files = fs.open_many(self)
File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 394, in <lambda>
return lambda *args, **kw: getattr(type(self), item).__get__(self)(
File "/usr/local/lib/python3.8/site-packages/fsspec/implementations/cached.py", line 503, in open_many
self.fs.get(downpath, downfn)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 59, in sync
if event.wait(1):
File "/usr/local/lib/python3.8/threading.py", line 558, in wait
signaled = self._cond.wait(timeout)
File "/usr/local/lib/python3.8/threading.py", line 306, in wait
gotit = waiter.acquire(True, timeout)
log 2
Operation ongoing for over 743.75 seconds in state process-msecs in step FilePatternToChunks/FlatMapTuple(_open_chunks)-ptransform-224 without returning. Current Traceback:
File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
self._work_item.run()
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in task
self._execute(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 601, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 222, in process_encoded
self.output(decoded_value)
File "/Users/alxrsngrtn/Github/xarray-beam/xarray_beam/_src/pangeo_forge.py", line 185, in _open_chunks
new_key, chunk.compute(num_workers=num_threads)
File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 1031, in compute
return new.load(**kwargs)
File "/usr/local/lib/python3.8/site-packages/xarray/core/dataset.py", line 865, in load
evaluated_data = da.compute(*lazy_data.values(), **kwargs)
File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 570, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
results = get_async(
File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 506, in get_async
for key, res_info, failed in queue_get(queue).result():
File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 134, in queue_get
return q.get()
File "/usr/local/lib/python3.8/queue.py", line 170, in get
self.not_empty.wait()
File "/usr/local/lib/python3.8/threading.py", line 302, in wait
waiter.acquire()
This to me seems like an instance of https://github.com/pydata/xarray/issues/4591. Right now, I'm going to experiment with changing the scheduler to use a single thread in the compute method of _open_chunks().
I should mention: The Dataflow diagnostics for the above report is showing unresponsive threads, making a dead-lock scenario more sound.
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "keepalive watchdog timeout" debug_error_string = "{"created":"@1630579134.284653312","description":"Error received from peer ipv6:[::1]:12371","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"keepalive watchdog timeout","grpc_status":14}" >
at _next (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:803)
at __next__ (/usr/local/lib/python3.8/site-packages/grpc/_channel.py:416)
at run (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:251)
at main (/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker_main.py:182)
Dead-locking seems plausible, but this is different from pydata/xarray#4591 which describes a serialization failure.
This has gone stale.