Add Kvikio backend entrypoint
TODO:
- [x] https://github.com/pydata/xarray/pull/6874
- [x] https://github.com/pydata/xarray/pull/8100
- [ ] https://github.com/pydata/xarray/pull/8408
- [ ] add docs - https://cupy-xarray--10.org.readthedocs.build/generated/cupy_xarray.kvikio.KvikioBackendEntrypoint.html
- [x] add tests
This PR registers a "kvikio" backend that allows us to use kvikio to read from network into GPU memory directly using "GDS".
To get this to work we need
- [x] https://github.com/zarr-developers/zarr-python/pull/934
- [x] https://github.com/pydata/xarray/pull/6874
- [x] kvikio.zarr working (https://github.com/rapidsai/kvikio)
This PR subclasses the existing ZarrStore and overrides necessary methods. most of the code is actually copied over from xarray/backends/zarr.py
For a short demo see https://github.com/dcherian/cupy-xarray/blob/kvikio-entrypoint/docs/source/kvikio.ipynb
Tip:
- Run
usr/local/cuda-12/gds/tools/gdscheck -pto check for GPU Direct Storage compatibility on your system
Xarray + Zarr + GPUs!! So exciting...

So excited I couldn't wait to hack it together :P
Cool, great work @dcherian, it's like Christmas came early! I've tried to test this branch but am encountering some cuFile issues. Will just post what I've done so far in case anyone wants to reproduce this branch:
# May need to install nvidia-gds first
# https://docs.nvidia.com/cuda/cuda-installation-guide-linux/index.html#ubuntu-installation-common
sudo apt install nvidia-gds
git clone https://github.com/dcherian/cupy-xarray.git
cd cupy-xarray
mamba create --name cupy-xarray python=3.9 cupy=11.0 rapidsai-nightly::kvikio=22.10 jupyterlab=3.4.5 pooch=1.6.0 netcdf4=1.6.0 watermark=2.3.1
mamba activate cupy-xarray
python -m ipykernel install --user --name cupy-xarray
# https://github.com/pydata/xarray/pull/6874
pip install git+https://github.com/dcherian/xarray.git@kvikio
# https://github.com/zarr-developers/zarr-python/pull/934
# pip install git+https://github.com/madsbk/zarr-python.git@cupy_support
pip install zarr==2.13.0a2
# https://github.com/xarray-contrib/cupy-xarray/pull/10
git switch kvikio-entrypoint
pip install --editable=.
# Start jupyter lab
jupyter lab --no-browser
# Then open the docs/kvikio.ipynb notebook
With that, I got an error on the ds = xr.open_dataset(store, engine="kvikio", consolidated=False) cell, full traceback as follows:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
Input In [4], in <cell line: 2>()
1 # Consolidated must be False
----> 2 ds = xr.open_dataset(store, engine="kvikio", consolidated=False)
3 print(ds.air._variable._data)
4 ds
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/api.py:531, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, backend_kwargs, **kwargs)
519 decoders = _resolve_decoders_kwargs(
520 decode_cf,
521 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...)
527 decode_coords=decode_coords,
528 )
530 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 531 backend_ds = backend.open_dataset(
532 filename_or_obj,
533 drop_variables=drop_variables,
534 **decoders,
535 **kwargs,
536 )
537 ds = _dataset_from_backend_dataset(
538 backend_ds,
539 filename_or_obj,
(...)
547 **kwargs,
548 )
549 return ds
File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:190, in KvikioBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel)
188 store_entrypoint = StoreBackendEntrypoint()
189 with close_on_error(store):
--> 190 ds = store_entrypoint.open_dataset(
191 store,
192 mask_and_scale=mask_and_scale,
193 decode_times=decode_times,
194 concat_characters=concat_characters,
195 decode_coords=decode_coords,
196 drop_variables=drop_variables,
197 use_cftime=use_cftime,
198 decode_timedelta=decode_timedelta,
199 )
200 return ds
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/store.py:26, in StoreBackendEntrypoint.open_dataset(self, store, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta)
14 def open_dataset(
15 self,
16 store,
(...)
24 decode_timedelta=None,
25 ):
---> 26 vars, attrs = store.load()
27 encoding = store.get_encoding()
29 vars, attrs, coord_names = conventions.decode_cf_variables(
30 vars,
31 attrs,
(...)
38 decode_timedelta=decode_timedelta,
39 )
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/common.py:125, in AbstractDataStore.load(self)
103 def load(self):
104 """
105 This loads the variables and attributes simultaneously.
106 A centralized loading function makes it easier to create
(...)
122 are requested, so care should be taken to make sure its fast.
123 """
124 variables = FrozenDict(
--> 125 (_decode_variable_name(k), v) for k, v in self.get_variables().items()
126 )
127 attributes = FrozenDict(self.get_attrs())
128 return variables, attributes
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/zarr.py:461, in ZarrStore.get_variables(self)
460 def get_variables(self):
--> 461 return FrozenDict(
462 (k, self.open_store_variable(k, v)) for k, v in self.zarr_group.arrays()
463 )
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/core/utils.py:474, in FrozenDict(*args, **kwargs)
473 def FrozenDict(*args, **kwargs) -> Frozen:
--> 474 return Frozen(dict(*args, **kwargs))
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/zarr.py:462, in <genexpr>(.0)
460 def get_variables(self):
461 return FrozenDict(
--> 462 (k, self.open_store_variable(k, v)) for k, v in self.zarr_group.arrays()
463 )
File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:130, in GDSZarrStore.open_store_variable(self, name, zarr_array)
128 else:
129 array_wrapper = CupyZarrArrayWrapper
--> 130 data = indexing.LazilyIndexedArray(array_wrapper(name, self))
132 attributes = dict(attributes)
133 encoding = {
134 "chunks": zarr_array.chunks,
135 "preferred_chunks": dict(zip(dimensions, zarr_array.chunks)),
136 "compressor": zarr_array.compressor,
137 "filters": zarr_array.filters,
138 }
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/xarray/backends/zarr.py:64, in ZarrArrayWrapper.__init__(self, variable_name, datastore)
61 self.datastore = datastore
62 self.variable_name = variable_name
---> 64 array = self.get_array()
65 self.shape = array.shape
67 dtype = array.dtype
File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:34, in EagerCupyZarrArrayWrapper.get_array(self)
33 def get_array(self):
---> 34 return np.asarray(self)
File ~/Documents/github/cupy-xarray/cupy_xarray/kvikio.py:31, in EagerCupyZarrArrayWrapper.__array__(self)
30 def __array__(self):
---> 31 return self.datastore.zarr_group[self.variable_name][:].get()
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:807, in Array.__getitem__(self, selection)
805 result = self.vindex[selection]
806 else:
--> 807 result = self.get_basic_selection(pure_selection, fields=fields)
808 return result
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:933, in Array.get_basic_selection(self, selection, out, fields)
930 return self._get_basic_selection_zd(selection=selection, out=out,
931 fields=fields)
932 else:
--> 933 return self._get_basic_selection_nd(selection=selection, out=out,
934 fields=fields)
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:976, in Array._get_basic_selection_nd(self, selection, out, fields)
970 def _get_basic_selection_nd(self, selection, out=None, fields=None):
971 # implementation of basic selection for array with at least one dimension
972
973 # setup indexer
974 indexer = BasicIndexer(selection, self)
--> 976 return self._get_selection(indexer=indexer, out=out, fields=fields)
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:1267, in Array._get_selection(self, indexer, out, fields)
1261 if not hasattr(self.chunk_store, "getitems") or \
1262 any(map(lambda x: x == 0, self.shape)):
1263 # sequentially get one key at a time from storage
1264 for chunk_coords, chunk_selection, out_selection in indexer:
1265
1266 # load chunk selection into output array
-> 1267 self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
1268 drop_axes=indexer.drop_axes, fields=fields)
1269 else:
1270 # allow storage to get multiple items at once
1271 lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/core.py:1966, in Array._chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes, fields)
1962 ckey = self._chunk_key(chunk_coords)
1964 try:
1965 # obtain compressed data for chunk
-> 1966 cdata = self.chunk_store[ckey]
1968 except KeyError:
1969 # chunk not initialized
1970 if self._fill_value is not None:
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/zarr/storage.py:1066, in DirectoryStore.__getitem__(self, key)
1064 filepath = os.path.join(self.path, key)
1065 if os.path.isfile(filepath):
-> 1066 return self._fromfile(filepath)
1067 else:
1068 raise KeyError(key)
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/kvikio/zarr.py:43, in GDSStore._fromfile(self, fn)
41 else:
42 nbytes = os.path.getsize(fn)
---> 43 with kvikio.CuFile(fn, "r") as f:
44 ret = cupy.empty(nbytes, dtype="u1")
45 read = f.read(ret)
File ~/mambaforge/envs/cupy-xarray/lib/python3.9/site-packages/kvikio/cufile.py:67, in CuFile.__init__(self, file, flags)
49 def __init__(self, file: Union[pathlib.Path, str], flags: str = "r"):
50 """Open and register file for GDS IO operations
51
52 CuFile opens the file twice and maintains two file descriptors.
(...)
65 "+" -> "open for updating (reading and writing)"
66 """
---> 67 self._handle = libkvikio.CuFile(file, flags)
File libkvikio.pyx:103, in kvikio._lib.libkvikio.CuFile.__init__()
RuntimeError: cuFile error at: /workspace/.conda-bld/work/cpp/include/kvikio/file_handle.hpp:172: internal error
Interestingly, nvidia-smi is showing some GPU memory being allocated already 0 N/A N/A 11852 C ...vs/cupy-xarray/bin/python 187MiB, but I think there's some CUDA driver issue I might need to sort out first. Will do a bit more troubleshooting :smile:
Edit: Looking at https://docs.nvidia.com/gpudirect-storage/release-notes/index.html#known-limitations and https://docs.nvidia.com/gpudirect-storage/release-notes/index.html#support-matrix, it seems that gpudirect-storage is only supported for certain filesystems like ext4, but I have btrfs on my laptop :sweat_smile:. Let me try it on my university HPC :slightly_smiling_face:
Edit2: Nevermind, I just remembered my root folder is ext4 so I just put the Zarr file in /tmp. Now it works!!

As in https://github.com/xarray-contrib/xbatcher/issues/87#issuecomment-1240466368
Now available in zarr-python 2.13.0a2 for testing.
Should add 2.13 final is out as well
As an update, we need more upstream work in Xarray: https://github.com/pydata/xarray/pull/8100
and some work to dask to get that to work (see the notebook)
@negin513 This should be OK to try on derecho now. Dask will not work, but everything else should.
Cool blogpost @weiji14 ! Where are you running your ERA5 benchmark?
This kvikio engine can be optimized quite a bit given the work we (Earthmover) have recently done on the Xarray Zarr backend. There's a number of unnecessary metadata reads that can be removed.
Cool blogpost @weiji14 ! Where are you running your ERA5 benchmark?
Thanks! I'm running the benchmark locally on my laptop (with an RTX A2000 8GB GPU), see also https://github.com/zarr-developers/zarr-benchmark/discussions/14, and I've got a few more extra technical details mentioned at my own site https://weiji14.github.io/blog/when-cloud-native-geospatial-meets-gpu-native-machine-learning :wink:
This kvikio engine can be optimized quite a bit given the work we (Earthmover) have recently done on the Xarray Zarr backend. There's a number of unnecessary metadata reads that can be removed.
Yes, I saw https://earthmover.io/blog/cloud-native-dataloader, and think a lot of the pieces are coming together. We really should wrap up this PR at some point, I'd be happy to give this another review if you'd like to push more changes on this branch.