cupy-xarray icon indicating copy to clipboard operation
cupy-xarray copied to clipboard

Add Kvikio backend entrypoint

Open dcherian opened this issue 3 years ago • 8 comments

TODO:

  • [x] https://github.com/pydata/xarray/pull/6874
  • [x] https://github.com/pydata/xarray/pull/8100
  • [ ] https://github.com/pydata/xarray/pull/8408
  • [ ] add docs - https://cupy-xarray--10.org.readthedocs.build/generated/cupy_xarray.kvikio.KvikioBackendEntrypoint.html
  • [x] add tests

This PR registers a "kvikio" backend that allows us to use kvikio to read from network into GPU memory directly using "GDS".

To get this to work we need

  1. [x] https://github.com/zarr-developers/zarr-python/pull/934
  2. [x] https://github.com/pydata/xarray/pull/6874
  3. [x] kvikio.zarr working (https://github.com/rapidsai/kvikio)

This PR subclasses the existing ZarrStore and overrides necessary methods. most of the code is actually copied over from xarray/backends/zarr.py

For a short demo see https://github.com/dcherian/cupy-xarray/blob/kvikio-entrypoint/docs/source/kvikio.ipynb

Tip:

  • Run usr/local/cuda-12/gds/tools/gdscheck -p to check for GPU Direct Storage compatibility on your system

dcherian avatar Aug 02 '22 21:08 dcherian

Xarray + Zarr + GPUs!! So exciting...

andersy005 avatar Aug 02 '22 22:08 andersy005

So excited I couldn't wait to hack it together :P

dcherian avatar Aug 02 '22 22:08 dcherian

Cool, great work @dcherian, it's like Christmas came early! I've tried to test this branch but am encountering some cuFile issues. Will just post what I've done so far in case anyone wants to reproduce this branch:

# May need to install nvidia-gds first
# https://docs.nvidia.com/cuda/cuda-installation-guide-linux/index.html#ubuntu-installation-common
sudo apt install nvidia-gds

git clone https://github.com/dcherian/cupy-xarray.git
cd cupy-xarray

mamba create --name cupy-xarray python=3.9 cupy=11.0 rapidsai-nightly::kvikio=22.10 jupyterlab=3.4.5 pooch=1.6.0 netcdf4=1.6.0 watermark=2.3.1
mamba activate cupy-xarray
python -m ipykernel install --user --name cupy-xarray

# https://github.com/pydata/xarray/pull/6874
pip install git+https://github.com/dcherian/xarray.git@kvikio
# https://github.com/zarr-developers/zarr-python/pull/934
# pip install git+https://github.com/madsbk/zarr-python.git@cupy_support
pip install zarr==2.13.0a2
# https://github.com/xarray-contrib/cupy-xarray/pull/10
git switch kvikio-entrypoint
pip install --editable=.

# Start jupyter lab
jupyter lab --no-browser
# Then open the docs/kvikio.ipynb notebook

With that, I got an error on the ds = xr.open_dataset(store, engine="kvikio", consolidated=False) cell, full traceback as follows:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Input In [4], in <cell line: 2>()
      1 # Consolidated must be False
----> 2 ds = xr.open_dataset(store, engine="kvikio", consolidated=False)
      3 print(ds.air._variable._data)
      4 ds

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/api.py:531, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, backend_kwargs, **kwargs)
    519 decoders = _resolve_decoders_kwargs(
    520     decode_cf,
    521     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)
    527     decode_coords=decode_coords,
    528 )
    530 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 531 backend_ds = backend.open_dataset(
    532     filename_or_obj,
    533     drop_variables=drop_variables,
    534     **decoders,
    535     **kwargs,
    536 )
    537 ds = _dataset_from_backend_dataset(
    538     backend_ds,
    539     filename_or_obj,
   (...)
    547     **kwargs,
    548 )
    549 return ds

File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:190, in KvikioBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel)
    188 store_entrypoint = StoreBackendEntrypoint()
    189 with close_on_error(store):
--> 190     ds = store_entrypoint.open_dataset(
    191         store,
    192         mask_and_scale=mask_and_scale,
    193         decode_times=decode_times,
    194         concat_characters=concat_characters,
    195         decode_coords=decode_coords,
    196         drop_variables=drop_variables,
    197         use_cftime=use_cftime,
    198         decode_timedelta=decode_timedelta,
    199     )
    200 return ds

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/store.py:26, in StoreBackendEntrypoint.open_dataset(self, store, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta)
     14 def open_dataset(
     15     self,
     16     store,
   (...)
     24     decode_timedelta=None,
     25 ):
---> 26     vars, attrs = store.load()
     27     encoding = store.get_encoding()
     29     vars, attrs, coord_names = conventions.decode_cf_variables(
     30         vars,
     31         attrs,
   (...)
     38         decode_timedelta=decode_timedelta,
     39     )

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/common.py:125, in AbstractDataStore.load(self)
    103 def load(self):
    104     """
    105     This loads the variables and attributes simultaneously.
    106     A centralized loading function makes it easier to create
   (...)
    122     are requested, so care should be taken to make sure its fast.
    123     """
    124     variables = FrozenDict(
--> 125         (_decode_variable_name(k), v) for k, v in self.get_variables().items()
    126     )
    127     attributes = FrozenDict(self.get_attrs())
    128     return variables, attributes

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/zarr.py:461, in ZarrStore.get_variables(self)
    460 def get_variables(self):
--> 461     return FrozenDict(
    462         (k, self.open_store_variable(k, v)) for k, v in self.zarr_group.arrays()
    463     )

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/core/utils.py:474, in FrozenDict(*args, **kwargs)
    473 def FrozenDict(*args, **kwargs) -> Frozen:
--> 474     return Frozen(dict(*args, **kwargs))

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/zarr.py:462, in <genexpr>(.0)
    460 def get_variables(self):
    461     return FrozenDict(
--> 462         (k, self.open_store_variable(k, v)) for k, v in self.zarr_group.arrays()
    463     )

File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:130, in GDSZarrStore.open_store_variable(self, name, zarr_array)
    128 else:
    129     array_wrapper = CupyZarrArrayWrapper
--> 130 data = indexing.LazilyIndexedArray(array_wrapper(name, self))
    132 attributes = dict(attributes)
    133 encoding = {
    134     "chunks": zarr_array.chunks,
    135     "preferred_chunks": dict(zip(dimensions, zarr_array.chunks)),
    136     "compressor": zarr_array.compressor,
    137     "filters": zarr_array.filters,
    138 }

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/zarr.py:64, in ZarrArrayWrapper.__init__(self, variable_name, datastore)
     61 self.datastore = datastore
     62 self.variable_name = variable_name
---> 64 array = self.get_array()
     65 self.shape = array.shape
     67 dtype = array.dtype

File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:34, in EagerCupyZarrArrayWrapper.get_array(self)
     33 def get_array(self):
---> 34     return np.asarray(self)

File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:31, in EagerCupyZarrArrayWrapper.__array__(self)
     30 def __array__(self):
---> 31     return self.datastore.zarr_group[self.variable_name][:].get()

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:807, in Array.__getitem__(self, selection)
    805     result = self.vindex[selection]
    806 else:
--> 807     result = self.get_basic_selection(pure_selection, fields=fields)
    808 return result

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:933, in Array.get_basic_selection(self, selection, out, fields)
    930     return self._get_basic_selection_zd(selection=selection, out=out,
    931                                         fields=fields)
    932 else:
--> 933     return self._get_basic_selection_nd(selection=selection, out=out,
    934                                         fields=fields)

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:976, in Array._get_basic_selection_nd(self, selection, out, fields)
    970 def _get_basic_selection_nd(self, selection, out=None, fields=None):
    971     # implementation of basic selection for array with at least one dimension
    972 
    973     # setup indexer
    974     indexer = BasicIndexer(selection, self)
--> 976     return self._get_selection(indexer=indexer, out=out, fields=fields)

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:1267, in Array._get_selection(self, indexer, out, fields)
   1261 if not hasattr(self.chunk_store, "getitems") or \
   1262    any(map(lambda x: x == 0, self.shape)):
   1263     # sequentially get one key at a time from storage
   1264     for chunk_coords, chunk_selection, out_selection in indexer:
   1265 
   1266         # load chunk selection into output array
-> 1267         self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
   1268                             drop_axes=indexer.drop_axes, fields=fields)
   1269 else:
   1270     # allow storage to get multiple items at once
   1271     lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:1966, in Array._chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes, fields)
   1962 ckey = self._chunk_key(chunk_coords)
   1964 try:
   1965     # obtain compressed data for chunk
-> 1966     cdata = self.chunk_store[ckey]
   1968 except KeyError:
   1969     # chunk not initialized
   1970     if self._fill_value is not None:

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/storage.py:1066, in DirectoryStore.__getitem__(self, key)
   1064 filepath = os.path.join(self.path, key)
   1065 if os.path.isfile(filepath):
-> 1066     return self._fromfile(filepath)
   1067 else:
   1068     raise KeyError(key)

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/kvikio/zarr.py:43, in GDSStore._fromfile(self, fn)
     41 else:
     42     nbytes = os.path.getsize(fn)
---> 43     with kvikio.CuFile(fn, "r") as f:
     44         ret = cupy.empty(nbytes, dtype="u1")
     45         read = f.read(ret)

File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/kvikio/cufile.py:67, in CuFile.__init__(self, file, flags)
     49 def __init__(self, file: Union[pathlib.Path, str], flags: str = "r"):
     50     """Open and register file for GDS IO operations
     51 
     52     CuFile opens the file twice and maintains two file descriptors.
   (...)
     65         "+" -> "open for updating (reading and writing)"
     66     """
---> 67     self._handle = libkvikio.CuFile(file, flags)

File libkvikio.pyx:103, in kvikio._lib.libkvikio.CuFile.__init__()

RuntimeError: cuFile error at: /workspace/.conda-bld/work/cpp/include/kvikio/file_handle.hpp:172: internal error

Interestingly, nvidia-smi is showing some GPU memory being allocated already 0 N/A N/A 11852 C ...vs/cupy-xarray/bin/python 187MiB, but I think there's some CUDA driver issue I might need to sort out first. Will do a bit more troubleshooting :smile:

Edit: Looking at https://docs.nvidia.com/gpudirect-storage/release-notes/index.html#known-limitations and https://docs.nvidia.com/gpudirect-storage/release-notes/index.html#support-matrix, it seems that gpudirect-storage is only supported for certain filesystems like ext4, but I have btrfs on my laptop :sweat_smile:. Let me try it on my university HPC :slightly_smiling_face:

Edit2: Nevermind, I just remembered my root folder is ext4 so I just put the Zarr file in /tmp. Now it works!!

image

weiji14 avatar Aug 17 '22 18:08 weiji14

As in https://github.com/xarray-contrib/xbatcher/issues/87#issuecomment-1240466368

Now available in zarr-python 2.13.0a2 for testing.

joshmoore avatar Sep 08 '22 09:09 joshmoore

Should add 2.13 final is out as well

jakirkham avatar Sep 29 '22 20:09 jakirkham

As an update, we need more upstream work in Xarray: https://github.com/pydata/xarray/pull/8100

and some work to dask to get that to work (see the notebook)

@negin513 This should be OK to try on derecho now. Dask will not work, but everything else should.

dcherian avatar Aug 22 '23 18:08 dcherian

Cool blogpost @weiji14 ! Where are you running your ERA5 benchmark?

This kvikio engine can be optimized quite a bit given the work we (Earthmover) have recently done on the Xarray Zarr backend. There's a number of unnecessary metadata reads that can be removed.

dcherian avatar Mar 27 '24 21:03 dcherian

Cool blogpost @weiji14 ! Where are you running your ERA5 benchmark?

Thanks! I'm running the benchmark locally on my laptop (with an RTX A2000 8GB GPU), see also https://github.com/zarr-developers/zarr-benchmark/discussions/14, and I've got a few more extra technical details mentioned at my own site https://weiji14.github.io/blog/when-cloud-native-geospatial-meets-gpu-native-machine-learning :wink:

This kvikio engine can be optimized quite a bit given the work we (Earthmover) have recently done on the Xarray Zarr backend. There's a number of unnecessary metadata reads that can be removed.

Yes, I saw https://earthmover.io/blog/cloud-native-dataloader, and think a lot of the pieces are coming together. We really should wrap up this PR at some point, I'd be happy to give this another review if you'd like to push more changes on this branch.

weiji14 avatar Mar 27 '24 22:03 weiji14