to_zarr(mode='w') does not overwrite correctly/successfully when changing chunk size
What happened?
Reproducible example:
import xarray as xr
import numpy as np
# Create dataset
data_vars = {'temperature':(['lat','lon','time'], np.random.rand(400,800,1000),
{'units': 'Celsius'})}
# define coordinates
coords = {'time': (['time'], np.arange(1,1001)),
'lat': (['lat'], np.arange(1,401)),
'lon': (['lon'], np.arange(1,801)),}
# create dataset
ds = xr.Dataset(data_vars=data_vars,
coords=coords, )
ds = ds.chunk({"lat": 20, "lon": 80, "time":100})
ds.to_zarr('temperature',mode='w', consolidated=True)
This works.
Note that the ds.temperature.encoding is now empty and equal to {}
If I load the data, ds = xr.open_zarr('temperature') the chunk size is correct
Then if I re-generate the dataset as above, change the chunk size, and overwrite the file, it works:
ds = xr.Dataset(data_vars=data_vars, coords=coords, )
ds = ds.chunk({"lat": 100, "lon": 80, "time":100})
ds.to_zarr('temperature',mode='w', consolidated=True)
When i re-load the zarr files now the chunk size are (100,80,100).
However if I do:
ds = xr.Dataset(data_vars=data_vars, coords=coords, )
ds = ds.chunk({"lat": 20, "lon": 80, "time":100}). # 20 for lat
ds.to_zarr('temperature',mode='w', consolidated=True)
ds = xr.open_zarr('temperature')
ds = ds.chunk({"lat": 100, "lon": 80, "time":100})
ds.to_zarr('temperature',mode='w', consolidated=True)
when i then re-open the file, the chunk size is still (20,80,100).
Ok then maybe it's the encoding, in fact, even if I change the chunk size using .chunk the encoding remains unchanged:
ds = xr.Dataset(data_vars=data_vars, coords=coords, )
ds = ds.chunk({"lat": 20, "lon": 80, "time":100}). # 20 for lat
ds.to_zarr('temperature',mode='w', consolidated=True)
ds = xr.open_zarr('temperature')
ds = ds.chunk({"lat": 100, "lon": 80, "time":100})
ds.temperature.encoding
gives
{'chunks': (20, 80, 100),
'preferred_chunks': {'lat': 20, 'lon': 80, 'time': 100},
'compressor': Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0),
'filters': None,
'_FillValue': nan,
'dtype': dtype('float64')}
but if I print to screen ds it looks right - with chunks (100,80,100)
I then tried to:
- set enconding to empty:
ds.temperature.encoding={} - overwriting the encoding with the new chunk values:
ds.temperature.encoding['chunks'] = (100,80, 100)
ds.temperature.encoding['preferred_chunks']= {'lat': 100, 'lon': 80, 'time': 100}
when I try either one of the two fixing of encoding, and then I try to overwrite the zarr file, I get the error below,
ds = xr.Dataset(data_vars=data_vars, coords=coords, )
ds = ds.chunk({"lat": 20, "lon": 80, "time":100}) # 20 for lat
ds.to_zarr('temperature',mode='w', consolidated=True)
ds = xr.open_zarr('temperature')
ds.temperature.encoding = {}
ds = ds.chunk({"lat": 100, "lon": 80, "time":100})
ds.to_zarr('temperature',mode='w', consolidated=True)
ValueError: destination buffer too small; expected at least 6400000, got 1280000
I searched for the error above in the open issues and didn't find anything.
What did you expect to happen?
I expected to be able to overwrite the file with whatever new combination of chunk size I want, especially after fixing the encoding.
Minimal Complete Verifiable Example
No response
MVCE confirmation
- [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
- [X] Complete example — the example is self-contained, including all data and the text of any traceback.
- [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
- [x] New issue — a search of GitHub Issues suggests this is not a duplicate.
Relevant log output
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Input In [46], in <cell line: 7>()
5 ds.temperature.encoding = {}
6 ds = ds.chunk({"lat": 100, "lon": 80, "time":100})
----> 7 ds.to_zarr('temperature',mode='w', consolidated=True)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/core/dataset.py:2036, in Dataset.to_zarr(self, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options)
2033 if encoding is None:
2034 encoding = {}
-> 2036 return to_zarr(
2037 self,
2038 store=store,
2039 chunk_store=chunk_store,
2040 storage_options=storage_options,
2041 mode=mode,
2042 synchronizer=synchronizer,
2043 group=group,
2044 encoding=encoding,
2045 compute=compute,
2046 consolidated=consolidated,
2047 append_dim=append_dim,
2048 region=region,
2049 safe_chunks=safe_chunks,
2050 )
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/backends/api.py:1432, in to_zarr(dataset, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options)
1430 # TODO: figure out how to properly handle unlimited_dims
1431 dump_to_store(dataset, zstore, writer, encoding=encoding)
-> 1432 writes = writer.sync(compute=compute)
1434 if compute:
1435 _finalize_store(writes, zstore)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/backends/common.py:166, in ArrayWriter.sync(self, compute)
160 import dask.array as da
162 # TODO: consider wrapping targets with dask.delayed, if this makes
163 # for any discernible difference in perforance, e.g.,
164 # targets = [dask.delayed(t) for t in self.targets]
--> 166 delayed_store = da.store(
167 self.sources,
168 self.targets,
169 lock=self.lock,
170 compute=compute,
171 flush=True,
172 regions=self.regions,
173 )
174 self.sources = []
175 self.targets = []
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/array/core.py:1223, in store(***failed resolving arguments***)
1221 elif compute:
1222 store_dsk = HighLevelGraph(layers, dependencies)
-> 1223 compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
1224 return None
1226 else:
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/base.py:344, in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
341 # see https://github.com/dask/dask/issues/8991.
342 # This merge should be removed once the underlying issue is fixed.
343 dsk2 = HighLevelGraph.merge(dsk2)
--> 344 return schedule(dsk2, keys, **kwargs)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
78 elif isinstance(pool, multiprocessing.pool.Pool):
79 pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
82 pool.submit,
83 pool._max_workers,
84 dsk,
85 result,
86 cache=cache,
87 get_id=_thread_get_id,
88 pack_exception=pack_exception,
89 **kwargs,
90 )
92 # Cleanup pools associated to dead threads
93 with pools_lock:
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
506 _execute_task(task, data) # Re-execute locally
507 else:
--> 508 raise_exception(exc, tb)
509 res, worker_id = loads(res_info)
510 state["cache"][key] = res
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/local.py:316, in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
219 try:
220 task, data = loads(task_info)
--> 221 result = _execute_task(task, data)
222 id = get_id()
223 result = dumps((result, id))
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/dask/array/core.py:122, in getter(a, b, asarray, lock)
117 # Below we special-case `np.matrix` to force a conversion to
118 # `np.ndarray` and preserve original Dask behavior for `getter`,
119 # as for all purposes `np.matrix` is array-like and thus
120 # `is_arraylike` evaluates to `True` in that case.
121 if asarray and (not is_arraylike(c) or isinstance(c, np.matrix)):
--> 122 c = np.asarray(c)
123 finally:
124 if lock:
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/core/indexing.py:358, in ImplicitToExplicitIndexingAdapter.__array__(self, dtype)
357 def __array__(self, dtype=None):
--> 358 return np.asarray(self.array, dtype=dtype)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/core/indexing.py:522, in CopyOnWriteArray.__array__(self, dtype)
521 def __array__(self, dtype=None):
--> 522 return np.asarray(self.array, dtype=dtype)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/core/indexing.py:423, in LazilyIndexedArray.__array__(self, dtype)
421 def __array__(self, dtype=None):
422 array = as_indexable(self.array)
--> 423 return np.asarray(array[self.key], dtype=None)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/xarray/backends/zarr.py:73, in ZarrArrayWrapper.__getitem__(self, key)
71 array = self.get_array()
72 if isinstance(key, indexing.BasicIndexer):
---> 73 return array[key.tuple]
74 elif isinstance(key, indexing.VectorizedIndexer):
75 return array.vindex[
76 indexing._arrayize_vectorized_indexer(key, self.shape).tuple
77 ]
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/zarr/core.py:788, in Array.__getitem__(self, selection)
786 result = self.vindex[selection]
787 else:
--> 788 result = self.get_basic_selection(pure_selection, fields=fields)
789 return result
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/zarr/core.py:914, in Array.get_basic_selection(self, selection, out, fields)
911 return self._get_basic_selection_zd(selection=selection, out=out,
912 fields=fields)
913 else:
--> 914 return self._get_basic_selection_nd(selection=selection, out=out,
915 fields=fields)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/zarr/core.py:957, in Array._get_basic_selection_nd(self, selection, out, fields)
951 def _get_basic_selection_nd(self, selection, out=None, fields=None):
952 # implementation of basic selection for array with at least one dimension
953
954 # setup indexer
955 indexer = BasicIndexer(selection, self)
--> 957 return self._get_selection(indexer=indexer, out=out, fields=fields)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/zarr/core.py:1247, in Array._get_selection(self, indexer, out, fields)
1241 if not hasattr(self.chunk_store, "getitems") or \
1242 any(map(lambda x: x == 0, self.shape)):
1243 # sequentially get one key at a time from storage
1244 for chunk_coords, chunk_selection, out_selection in indexer:
1245
1246 # load chunk selection into output array
-> 1247 self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
1248 drop_axes=indexer.drop_axes, fields=fields)
1249 else:
1250 # allow storage to get multiple items at once
1251 lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/zarr/core.py:1951, in Array._chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes, fields)
1948 out[out_selection] = fill_value
1950 else:
-> 1951 self._process_chunk(out, cdata, chunk_selection, drop_axes,
1952 out_is_ndarray, fields, out_selection)
File /opt/miniconda3/envs/june2022/lib/python3.10/site-packages/zarr/core.py:1859, in Array._process_chunk(self, out, cdata, chunk_selection, drop_axes, out_is_ndarray, fields, out_selection, partial_read_decode)
1857 if isinstance(cdata, PartialReadBuffer):
1858 cdata = cdata.read_full()
-> 1859 self._compressor.decode(cdata, dest)
1860 else:
1861 chunk = ensure_ndarray(cdata).view(self._dtype)
File numcodecs/blosc.pyx:562, in numcodecs.blosc.Blosc.decode()
File numcodecs/blosc.pyx:371, in numcodecs.blosc.decompress()
ValueError: destination buffer too small; expected at least 6400000, got 1280000
Anything else we need to know?
No response
Environment
xarray: 2022.3.0 pandas: 1.4.3 numpy: 1.22.4 scipy: 1.8.1 netCDF4: 1.5.8 pydap: None h5netcdf: 1.0.0 h5py: 3.6.0 Nio: None zarr: 2.12.0 cftime: 1.6.0 nc_time_axis: 1.4.1 PseudoNetCDF: None rasterio: 1.2.10 cfgrib: 0.9.10.1 iris: None bottleneck: 1.3.4 dask: 2022.6.0 distributed: 2022.6.0 matplotlib: 3.5.2 cartopy: 0.20.2 seaborn: 0.11.2 numbagg: None fsspec: 2022.5.0 cupy: None pint: 0.19.2 sparse: 0.13.0 setuptools: 62.6.0 pip: 22.1.2 conda: None pytest: None IPython: 8.4.0 sphinx: None