datasets icon indicating copy to clipboard operation
datasets copied to clipboard

bigearthnet apache_beam error: AssertionError: OutputStream realloc failed.

Open gnthibault opened this issue 4 years ago • 4 comments

Short description Trying simply to load bigearthnet dataset from tfds. Download is working fine (it seems), but it looks like the preparation fails with a memory error in apache_beam. This is happening on a 256GB ram server running only this program: bigearthnet apache_beam error: AssertionError: OutputStream realloc failed.

Maybe I am not loading the data the right way, although I selected a single sample split strategy to alleviate risk of memory allocation error. There is probably something wrong either in the library of the loading code that make this dataset unusable. Thank you in advance for your help

Environment information

  • Operating System: ubuntu20.04
  • Python version: 3.8.5
  • tensorflow-datasets/tfds-nightly version: tfds-nightly==4.3.0.dev202106180109
  • tensorflow/tf-nightly version: tensorflow==2.5.0
  • Does the issue still exists with the last tfds-nightly package (pip install --upgrade tfds-nightly) ? yes

Reproduction instructions

import apache_beam as beam
import tensorflow as tf
import tensorflow_datasets as tfds

dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(),
    download_mode=tfds.GenerateMode.REUSE_DATASET_IF_EXISTS,  
    register_checksums=False)

raw_train, metadata = tfds.load(
    "bigearthnet/all",
    data_dir="/workdir/code/BIGEARTHNET",
    split=["train[:1%]"], #, "test[:1]"
    download_and_prepare_kwargs={'download_config':dl_config})

Link to logs

Downloading and preparing dataset 65.22 GiB (download: 65.22 GiB, generated: Unknown size, total: 65.22 GiB) to /workdir/code/BIGEARTHNET/bigearthnet/all/1.0.0...
Dl Completed...: 0 url [00:00, ? url/s]
Dl Size...: 0 MiB [00:00, ? MiB/s]
Generating splits...:   0%|          | 0/1 [00:00<?, ? splits/s]
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/root/.local/share/jupyter/runtime/kernel-b88d29d4-6a66-45a6-a231-c27985b44792.json']
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-2-ebc312630005> in <module>
     50     register_checksums=False)
     51 
---> 52 raw_train, metadata = tfds.load(
     53     "bigearthnet/all",
     54     data_dir="/workdir/code/BIGEARTHNET",

/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/load.py in load(name, split, data_dir, batch_size, shuffle_files, download, as_supervised, decoders, read_config, with_info, builder_kwargs, download_and_prepare_kwargs, as_dataset_kwargs, try_gcs)
    316   if download:
    317     download_and_prepare_kwargs = download_and_prepare_kwargs or {}
--> 318     dbuilder.download_and_prepare(**download_and_prepare_kwargs)
    319 
    320   if as_dataset_kwargs is None:

/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/dataset_builder.py in download_and_prepare(self, download_dir, download_config)
    437           # Old version of TF are not os.PathLike compatible
    438           with tf_compat.mock_gfile_pathlike():
--> 439             self._download_and_prepare(
    440                 dl_manager=dl_manager,
    441                 download_config=download_config,

/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/dataset_builder.py in _download_and_prepare(self, dl_manager, download_config)
   1153           self.info.file_format].FILE_SUFFIX
   1154 
-> 1155       split_info_futures = [
   1156           split_builder.submit_split_generation(  # pylint: disable=g-complex-comprehension
   1157               split_name=split_name,

/usr/lib/python3.8/contextlib.py in __exit__(self, type, value, traceback)
    118         if type is None:
    119             try:
--> 120                 next(self.gen)
    121             except StopIteration:
    122                 return False

/usr/local/lib/python3.8/dist-packages/tensorflow_datasets/core/split_builder.py in maybe_beam_pipeline(self)
    171       # If the Beam pipeline was used, then exit it.
    172       if self._beam_pipeline is not None:
--> 173         self._beam_pipeline.__exit__(None, None, None)
    174     self._in_contextmanager = False
    175 

/usr/local/lib/python3.8/dist-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    583     try:
    584       if not exc_type:
--> 585         self.result = self.run()
    586         self.result.wait_until_finish()
    587     finally:

/usr/local/lib/python3.8/dist-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    562         finally:
    563           shutil.rmtree(tmpdir)
--> 564       return self.runner.run_pipeline(self, self._options)
    565     finally:
    566       shutil.rmtree(self.local_tempdir, ignore_errors=True)

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    129       runner = BundleBasedDirectRunner()
    130 
--> 131     return runner.run_pipeline(pipeline, options)
    132 
    133 

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    187         options.view_as(pipeline_options.ProfilingOptions))
    188 
--> 189     self._latest_run_result = self.run_via_runner_api(
    190         pipeline.to_runner_api(default_environment=self._default_environment))
    191     return self._latest_run_result

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    198     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    199     #   the teststream (if any), and all the stages).
--> 200     return self.run_stages(stage_context, stages)
    201 
    202   @contextlib.contextmanager

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    362               runner_execution_context, stage, self._num_workers)
    363 
--> 364           stage_results = self._run_stage(
    365               runner_execution_context,
    366               bundle_context_manager,

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    553 
    554     while True:
--> 555       last_result, deferred_inputs, fired_timers = self._run_bundle(
    556               runner_execution_context,
    557               bundle_context_manager,

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    599         expected_timer_output)
    600 
--> 601     result, splits = bundle_manager.process_bundle(
    602         data_input, data_output, input_timers, expected_timer_output)
    603     # Now we collect all the deferred inputs remaining from bundle execution.

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
    892       # If there is no split_manager, write all input data to the channel.
    893       for transform_id, elements in inputs.items():
--> 894         self._send_input_to_worker(process_bundle_id, transform_id, elements)
    895 
    896     # Actually start the bundle.

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _send_input_to_worker(self, process_bundle_id, read_transform_id, byte_streams)
    768     data_out = self._worker_handler.data_conn.output_stream(
    769         process_bundle_id, read_transform_id)
--> 770     for byte_stream in byte_streams:
    771       data_out.write(byte_stream)
    772     data_out.close()

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/execution.py in __iter__(self)
    250     _GroupingBuffer.
    251     """
--> 252     return itertools.chain(*self.partition(1))
    253 
    254   # these should never be accessed, but they allow this class to meet the

/usr/local/lib/python3.8/dist-packages/apache_beam/runners/portability/fn_api_runner/execution.py in partition(self, n)
    237         key = key_coder_impl.decode(encoded_key)
    238         for wkvs in windowed_key_values(key, windowed_values):
--> 239           coder_impl.encode_to_stream(wkvs, output_stream_list[idx % n], True)
    240       for ix, output_stream in enumerate(output_stream_list):
    241         self._grouped_output[ix] = [output_stream.get()]

/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream()

/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.encode_to_stream()

/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.encode_to_stream()

/usr/local/lib/python3.8/dist-packages/apache_beam/coders/coder_impl.cpython-38-x86_64-linux-gnu.so in apache_beam.coders.coder_impl.SequenceCoderImpl.encode_to_stream()

/usr/local/lib/python3.8/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.OutputStream.write()

/usr/local/lib/python3.8/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.OutputStream.extend()

AssertionError: OutputStream realloc failed.

Expected behavior I would expect tfds to load just 1% of the dataset without need for a lot of memory

Additional context It would be great if there was a way to stream the dataset from disk through a tensorflow Dataset with prefetching capabilities.

gnthibault avatar Jun 18 '21 12:06 gnthibault

It seems that this exact problem has been reported here: https://github.com/tensorflow/datasets/issues/1007 However, the person did not mentionned if the target was to load the whole dataset or a single sample.

I think that, upon single sample load, the method should NOT issue a memory error

gnthibault avatar Jun 18 '21 15:06 gnthibault

Additional note: interesting elements here: https://beam.apache.org/documentation/runners/direct/

In Beam 2.19.0 and newer, you can use the direct_running_mode pipeline option to set the running mode. direct_running_mode can be one of ['in_memory', 'multi_threading', 'multi_processing'].

in_memory: Runner and workers’ communication happens in memory (not through gRPC). This is a default mode.

multi_threading: Runner and workers communicate through gRPC and each worker runs in a thread.

multi_processing: Runner and workers communicate through gRPC and each worker runs in a subprocess.

Currently trying something different from in_memory All 3 options resulted in the same error unfortunately.

gnthibault avatar Jun 18 '21 15:06 gnthibault

When tfds.load is called:

  • TFDS will download, process and generate the full datasets (all splits,...)
  • Then TFDS will read a subset of the dataset specify by the split (train[:1%])

So the train[:1%] still generate the full dataset. (generating a new dataset each time the user change the split would be quite inefficient)

multi_processing likely won't change anything as the generation will be still limited by the system memory (all process share the same memory). The best way is to use Google Cloud or other distributed generation. See: https://www.tensorflow.org/datasets/beam_datasets#generating_a_beam_dataset

Conchylicultor avatar Jun 21 '21 10:06 Conchylicultor

what is the size of the BigEarthNet dataset uncompressed because I have started downloading since yesterday and more than 700 Gb has been occupied on my external usb drive and the download is not over yet . I was wondering if the dataset is that huge or there is something wrong with my file??

LeilaMaria avatar Apr 12 '22 09:04 LeilaMaria