bigearthnet apache_beam error: AssertionError: OutputStream realloc failed.
Short description Trying simply to load bigearthnet dataset from tfds. Download is working fine (it seems), but it looks like the preparation fails with a memory error in apache_beam. This is happening on a 256GB ram server running only this program: bigearthnet apache_beam error: AssertionError: OutputStream realloc failed.
Maybe I am not loading the data the right way, although I selected a single sample split strategy to alleviate risk of memory allocation error. There is probably something wrong either in the library of the loading code that make this dataset unusable. Thank you in advance for your help
Environment information
- Operating System: ubuntu20.04
- Python version: 3.8.5
- tensorflow-datasets/tfds-nightly version: tfds-nightly==4.3.0.dev202106180109
- tensorflow/tf-nightly version: tensorflow==2.5.0
- Does the issue still exists with the last tfds-nightly package (pip install --upgrade tfds-nightly) ? yes
Reproduction instructions
import apache_beam as beam
import tensorflow as tf
import tensorflow_datasets as tfds
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(),
download_mode=tfds.GenerateMode.REUSE_DATASET_IF_EXISTS,
register_checksums=False)
raw_train, metadata = tfds.load(
"bigearthnet/all",
data_dir="/workdir/code/BIGEARTHNET",
split=["train[:1%]"], #, "test[:1]"
download_and_prepare_kwargs={'download_config':dl_config})
Link to logs
Downloading and preparing dataset 65.22 GiB (download: 65.22 GiB, generated: Unknown size, total: 65.22 GiB) to /workdir/code/BIGEARTHNET/bigearthnet/all/1.0.0...
Dl Completed...: 0 url [00:00, ? url/s]
Dl Size...: 0 MiB [00:00, ? MiB/s]
Generating splits...: 0%| | 0/1 [00:00<?, ? splits/s]
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-b88d29d4-6a66-45a6-a231-c27985b44792.json']
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
<ipython-input-2-ebc312630005> in <module>
50 register_checksums=False)
51
---> 52 raw_train, metadata = tfds.load(
53 "bigearthnet/all",
54 data_dir="/workdir/code/BIGEARTHNET",
/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/load.py in load(name, split, data_dir, batch_size, shuffle_files, download, as_supervised, decoders, read_config, with_info, builder_kwargs, download_and_prepare_kwargs, as_dataset_kwargs, try_gcs)
316 if download:
317 download_and_prepare_kwargs = download_and_prepare_kwargs or {}
--> 318 dbuilder.download_and_prepare(**download_and_prepare_kwargs)
319
320 if as_dataset_kwargs is None:
/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/dataset_builder.py in download_and_prepare(self, download_dir, download_config)
437 # Old version of TF are not os.PathLike compatible
438 with tf_compat.mock_gfile_pathlike():
--> 439 self._download_and_prepare(
440 dl_manager=dl_manager,
441 download_config=download_config,
/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/dataset_builder.py in _download_and_prepare(self, dl_manager, download_config)
1153 self.info.file_format].FILE_SUFFIX
1154
-> 1155 split_info_futures = [
1156 split_builder.submit_split_generation( # pylint: disable=g-complex-comprehension
1157 split_name=split_name,
/usr/lib/python3.8/contextlib.py in __exit__(self, type, value, traceback)
118 if type is None:
119 try:
--> 120 next(self.gen)
121 except StopIteration:
122 return False
/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/split_builder.py in maybe_beam_pipeline(self)
171 # If the Beam pipeline was used, then exit it.
172 if self._beam_pipeline is not None:
--> 173 self._beam_pipeline.__exit__(None, None, None)
174 self._in_contextmanager = False
175
/usr/local/lib/python3.8/dist-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
583 try:
584 if not exc_type:
--> 585 self.result = self.run()
586 self.result.wait_until_finish()
587 finally:
/usr/local/lib/python3.8/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
562 finally:
563 shutil.rmtree(tmpdir)
--> 564 return self.runner.run_pipeline(self, self._options)
565 finally:
566 shutil.rmtree(self.local_tempdir, ignore_errors=True)
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
187 options.view_as(pipeline_options.ProfilingOptions))
188
--> 189 self._latest_run_result = self.run_via_runner_api(
190 pipeline.to_runner_api(default_environment=self._default_environment))
191 return self._latest_run_result
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
198 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
199 # the teststream (if any), and all the stages).
--> 200 return self.run_stages(stage_context, stages)
201
202 @contextlib.contextmanager
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
362 runner_execution_context, stage, self._num_workers)
363
--> 364 stage_results = self._run_stage(
365 runner_execution_context,
366 bundle_context_manager,
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
553
554 while True:
--> 555 last_result, deferred_inputs, fired_timers = self._run_bundle(
556 runner_execution_context,
557 bundle_context_manager,
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
599 expected_timer_output)
600
--> 601 result, splits = bundle_manager.process_bundle(
602 data_input, data_output, input_timers, expected_timer_output)
603 # Now we collect all the deferred inputs remaining from bundle execution.
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
892 # If there is no split_manager, write all input data to the channel.
893 for transform_id, elements in inputs.items():
--> 894 self._send_input_to_worker(process_bundle_id, transform_id, elements)
895
896 # Actually start the bundle.
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _send_input_to_worker(self, process_bundle_id, read_transform_id, byte_streams)
768 data_out = self._worker_handler.data_conn.output_stream(
769 process_bundle_id, read_transform_id)
--> 770 for byte_stream in byte_streams:
771 data_out.write(byte_stream)
772 data_out.close()
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/execution.py in __iter__(self)
250 _GroupingBuffer.
251 """
--> 252 return itertools.chain(*self.partition(1))
253
254 # these should never be accessed, but they allow this class to meet the
/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/execution.py in partition(self, n)
237 key = key_coder_impl.decode(encoded_key)
238 for wkvs in windowed_key_values(key, windowed_values):
--> 239 coder_impl.encode_to_stream(wkvs, output_stream_list[idx % n], True)
240 for ix, output_stream in enumerate(output_stream_list):
241 self._grouped_output[ix] = [output_stream.get()]
/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream()
/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream()
/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.encode_to_stream()
/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.SequenceCoderImpl.encode_to_stream()
/usr/local/lib/python3.8/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.OutputStream.write()
/usr/local/lib/python3.8/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.OutputStream.extend()
AssertionError: OutputStream realloc failed.
Expected behavior I would expect tfds to load just 1% of the dataset without need for a lot of memory
Additional context It would be great if there was a way to stream the dataset from disk through a tensorflow Dataset with prefetching capabilities.
It seems that this exact problem has been reported here: https://github.com/tensorflow/datasets/issues/1007 However, the person did not mentionned if the target was to load the whole dataset or a single sample.
I think that, upon single sample load, the method should NOT issue a memory error
Additional note: interesting elements here: https://beam.apache.org/documentation/runners/direct/
In Beam 2.19.0 and newer, you can use the direct_running_mode pipeline option to set the running mode. direct_running_mode can be one of ['in_memory', 'multi_threading', 'multi_processing'].
in_memory: Runner and workers’ communication happens in memory (not through gRPC). This is a default mode.
multi_threading: Runner and workers communicate through gRPC and each worker runs in a thread.
multi_processing: Runner and workers communicate through gRPC and each worker runs in a subprocess.
Currently trying something different from in_memory All 3 options resulted in the same error unfortunately.
When tfds.load is called:
- TFDS will download, process and generate the full datasets (all splits,...)
- Then TFDS will read a subset of the dataset specify by the split (
train[:1%])
So the train[:1%] still generate the full dataset. (generating a new dataset each time the user change the split would be quite inefficient)
multi_processing likely won't change anything as the generation will be still limited by the system memory (all process share the same memory). The best way is to use Google Cloud or other distributed generation. See: https://www.tensorflow.org/datasets/beam_datasets#generating_a_beam_dataset
what is the size of the BigEarthNet dataset uncompressed because I have started downloading since yesterday and more than 700 Gb has been occupied on my external usb drive and the download is not over yet . I was wondering if the dataset is that huge or there is something wrong with my file??