dataset.to_zarr(store=zarr_store, compute=True) results in memory spike which isn't cleared
What happened?
I am using rioxarray which in turn is using xarray. We use to_zarr from dataset which is an xarray object.
When using dataset to write to zarr store using the to_zarr method. A spike in memory is observed which is not freed post the execution of the upload.
What did you expect to happen?
When using dataset to write to zarr store using the to_zarr method. A spike in memory is observed, which is then freed post the completion of execution.
The input to it is a stacked multiband raster created out of sentinel tile.
Minimal Complete Verifiable Example
import os
import xarray
from fsspec.mapping import FSMap
import rioxarray
from fsspec.implementations.local import LocalFileSystem
from memory_profiler import profile
@profile
def _update_zarr_store_with_mosaic(
path_to_mosaic_file: str,
date: str,
index: int,
) -> bool:
local_fs: LocalFileSystem = LocalFileSystem()
parent_path = "/Users/punit/work/atlas"
zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
data = data.expand_dims({"time": 1})
data = data.assign_coords(time=[date])
data.attrs = {}
dataset = data.to_dataset(name="data")
try:
if index == 0:
# during first push, we need to create time dimension hence we don't call append dim
zarr_out = dataset.to_zarr(store=zarr_store)
else:
# during consequent pushes to zarr store, we want new data along the time dimension
zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
except Exception:
return False
return True
if __name__ == "__main__":
date_to_mosaic_file_path_dict = {
"2024-02-27": "path_to_file_1",
"2024-03-03": "path_to_file_2",
"2024-03-08": "path_to_file_3",
}
xarray.show_versions()
for idx, date in enumerate(sorted(date_to_mosaic_file_path_dict.keys())):
_update_zarr_store_with_mosaic(date_to_mosaic_file_path_dict[date], date, idx)
MVCE confirmation
- [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
- [X] Complete example — the example is self-contained, including all data and the text of any traceback.
- [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
- [X] New issue — a search of GitHub Issues suggests this is not a duplicate.
- [X] Recent environment — the issue occurs with the latest version of xarray and its dependencies.
Relevant log output
INSTALLED VERSIONS
------------------
commit: None
python: 3.11.6 (main, Nov 2 2023, 04:39:43) [Clang 14.0.3 (clang-1403.0.22.14.1)]
python-bits: 64
OS: Darwin
OS-release: 22.4.0
machine: arm64
processor: arm
byteorder: little
LC_ALL: None
LANG: None
LOCALE: (None, 'UTF-8')
libhdf5: None
libnetcdf: None
xarray: 2024.2.0
pandas: 2.2.1
numpy: 1.24.1
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.13.3
cftime: None
nc_time_axis: None
iris: None
bottleneck: None
dask: 2022.12.1
distributed: None
matplotlib: 3.8.3
cartopy: None
seaborn: None
numbagg: None
fsspec: 2023.6.0
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: 69.1.1
pip: 23.3.1
conda: None
pytest: 7.2.0
mypy: None
IPython: None
sphinx: None
Filename: /Users/punit/work/atlas/sample.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
9 134.3 MiB 134.3 MiB 1 @profile
10 def _update_zarr_store_with_mosaic(
11 path_to_mosaic_file: str,
12 date: str,
13 index: int,
14 ) -> bool:
15 134.3 MiB 0.0 MiB 1 local_fs: LocalFileSystem = LocalFileSystem()
16 134.3 MiB 0.0 MiB 1 parent_path = "/Users/punit/work/atlas"
17 134.3 MiB 0.0 MiB 1 zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
18 134.3 MiB 0.0 MiB 1 zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
19 142.8 MiB 8.5 MiB 1 data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
20
21 142.8 MiB 0.0 MiB 1 data = data.expand_dims({"time": 1})
22 142.8 MiB 0.1 MiB 1 data = data.assign_coords(time=[date])
23 142.8 MiB 0.0 MiB 1 data.attrs = {}
24 142.8 MiB 0.0 MiB 1 dataset = data.to_dataset(name="data")
25 142.8 MiB 0.0 MiB 1 try:
26 142.8 MiB 0.0 MiB 1 if index == 0:
27 # during first push, we need to create time dimension hence we don't call append dim
28 142.8 MiB 0.0 MiB 1 zarr_out = dataset.to_zarr(store=zarr_store)
29 else:
30 # during consequent pushes to zarr store, we want new data along the time dimension
31 zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
32 142.8 MiB 0.0 MiB 1 except Exception:
33 142.8 MiB 0.0 MiB 1 return False
34 return True
Filename: /Users/punit/work/atlas/sample.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
9 142.8 MiB 142.8 MiB 1 @profile
10 def _update_zarr_store_with_mosaic(
11 path_to_mosaic_file: str,
12 date: str,
13 index: int,
14 ) -> bool:
15 142.8 MiB 0.0 MiB 1 local_fs: LocalFileSystem = LocalFileSystem()
16 142.8 MiB 0.0 MiB 1 parent_path = "/Users/punit/work/atlas"
17 142.8 MiB 0.0 MiB 1 zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
18 142.8 MiB 0.0 MiB 1 zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
19 143.2 MiB 0.3 MiB 1 data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
20
21 143.2 MiB 0.0 MiB 1 data = data.expand_dims({"time": 1})
22 143.2 MiB 0.0 MiB 1 data = data.assign_coords(time=[date])
23 143.2 MiB 0.0 MiB 1 data.attrs = {}
24 143.2 MiB 0.0 MiB 1 dataset = data.to_dataset(name="data")
25 143.2 MiB 0.0 MiB 1 try:
26 143.2 MiB 0.0 MiB 1 if index == 0:
27 # during first push, we need to create time dimension hence we don't call append dim
28 zarr_out = dataset.to_zarr(store=zarr_store)
29 else:
30 # during consequent pushes to zarr store, we want new data along the time dimension
31 511.9 MiB 368.7 MiB 1 zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
32 except Exception:
33 return False
34 511.9 MiB 0.0 MiB 1 return True
Filename: /Users/punit/work/atlas/sample.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
9 511.9 MiB 511.9 MiB 1 @profile
10 def _update_zarr_store_with_mosaic(
11 path_to_mosaic_file: str,
12 date: str,
13 index: int,
14 ) -> bool:
15 511.9 MiB 0.0 MiB 1 local_fs: LocalFileSystem = LocalFileSystem()
16 511.9 MiB 0.0 MiB 1 parent_path = "/Users/punit/work/atlas"
17 511.9 MiB 0.0 MiB 1 zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
18 511.9 MiB 0.0 MiB 1 zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
19 512.2 MiB 0.3 MiB 1 data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
20
21 512.2 MiB 0.0 MiB 1 data = data.expand_dims({"time": 1})
22 512.2 MiB 0.0 MiB 1 data = data.assign_coords(time=[date])
23 512.2 MiB 0.0 MiB 1 data.attrs = {}
24 512.2 MiB 0.0 MiB 1 dataset = data.to_dataset(name="data")
25 512.2 MiB 0.0 MiB 1 try:
26 512.2 MiB 0.0 MiB 1 if index == 0:
27 # during first push, we need to create time dimension hence we don't call append dim
28 zarr_out = dataset.to_zarr(store=zarr_store)
29 else:
30 # during consequent pushes to zarr store, we want new data along the time dimension
31 543.0 MiB 30.9 MiB 1 zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
32 except Exception:
33 return False
34 543.0 MiB -0.0 MiB 1 return True
Anything else we need to know?
- The log is an output of the code running with three files that is the dictionary contains three paths.
- I can provide the input tiff files, but they are quite big (1.4, 1.1, 1.1 GB)respectively.
Environment
commit: None python: 3.11.6 (main, Nov 2 2023, 04:39:43) [Clang 14.0.3 (clang-1403.0.22.14.1)] python-bits: 64 OS: Darwin OS-release: 22.4.0 machine: arm64 processor: arm byteorder: little LC_ALL: None LANG: None LOCALE: (None, 'UTF-8') libhdf5: None libnetcdf: None
xarray: 2024.2.0 pandas: 2.2.1 numpy: 1.24.1 scipy: None netCDF4: None pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.13.3 cftime: None nc_time_axis: None iris: None bottleneck: None dask: 2022.12.1 distributed: None matplotlib: 3.8.3 cartopy: None seaborn: None numbagg: None fsspec: 2023.6.0 cupy: None pint: None sparse: None flox: None numpy_groupies: None setuptools: 69.1.1 pip: 23.3.1 conda: None pytest: 7.2.0 mypy: None IPython: None sphinx: None
Thanks for opening your first issue here at xarray! Be sure to follow the issue template! If you have an idea for a solution, we would really welcome a Pull Request with proposed changes. See the Contributing Guide for more. It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better. Thank you!
It is observed that the memory usage increases in writes = writer.sync(compute=compute) method inside xarray api. when memory profiler was added inside xarray functions.
Filename: /Users/punit/work/atlas/venv/lib/python3.11/site-packages/xarray/backends/api.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
1700 126.1 MiB 126.1 MiB 1 @profile
1701 def to_zarr(
1702 dataset: Dataset,
1703 store: MutableMapping | str | os.PathLike[str] | None = None,
1704 chunk_store: MutableMapping | str | os.PathLike | None = None,
1705 mode: ZarrWriteModes | None = None,
1706 synchronizer=None,
1707 group: str | None = None,
1708 encoding: Mapping | None = None,
1709 *,
1710 compute: bool = True,
1711 consolidated: bool | None = None,
1712 append_dim: Hashable | None = None,
1713 region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
1714 safe_chunks: bool = True,
1715 storage_options: dict[str, str] | None = None,
1716 zarr_version: int | None = None,
1717 write_empty_chunks: bool | None = None,
1718 chunkmanager_store_kwargs: dict[str, Any] | None = None,
1719 ) -> backends.ZarrStore | Delayed:
1720 """This function creates an appropriate datastore for writing a dataset to
1721 a zarr ztore
1722
1723 See `Dataset.to_zarr` for full API docs.
1724 """
1725
1726 # Load empty arrays to avoid bug saving zero length dimensions (Issue #5741)
1727 126.1 MiB 0.0 MiB 7 for v in dataset.variables.values():
1728 126.1 MiB 0.0 MiB 6 if v.size == 0:
1729 v.load()
1730
1731 # expand str and path-like arguments
1732 126.1 MiB 0.0 MiB 1 store = _normalize_path(store)
1733 126.1 MiB 0.0 MiB 1 chunk_store = _normalize_path(chunk_store)
1734
1735 126.1 MiB 0.0 MiB 1 if storage_options is None:
1736 126.1 MiB 0.0 MiB 1 mapper = store
1737 126.1 MiB 0.0 MiB 1 chunk_mapper = chunk_store
1738 else:
1739 from fsspec import get_mapper
1740
1741 if not isinstance(store, str):
1742 raise ValueError(
1743 f"store must be a string to use storage_options. Got {type(store)}"
1744 )
1745 mapper = get_mapper(store, **storage_options)
1746 if chunk_store is not None:
1747 chunk_mapper = get_mapper(chunk_store, **storage_options)
1748 else:
1749 chunk_mapper = chunk_store
1750
1751 126.1 MiB 0.0 MiB 1 if encoding is None:
1752 126.1 MiB 0.0 MiB 1 encoding = {}
1753
1754 126.1 MiB 0.0 MiB 1 if mode is None:
1755 126.1 MiB 0.0 MiB 1 if append_dim is not None:
1756 126.1 MiB 0.0 MiB 1 mode = "a"
1757 elif region is not None:
1758 mode = "r+"
1759 else:
1760 mode = "w-"
1761
1762 126.1 MiB 0.0 MiB 1 if mode not in ["a", "a-"] and append_dim is not None:
1763 raise ValueError("cannot set append_dim unless mode='a' or mode=None")
1764
1765 126.1 MiB 0.0 MiB 1 if mode not in ["a", "a-", "r+"] and region is not None:
1766 raise ValueError(
1767 "cannot set region unless mode='a', mode='a-', mode='r+' or mode=None"
1768 )
1769
1770 126.1 MiB 0.0 MiB 1 if mode not in ["w", "w-", "a", "a-", "r+"]:
1771 raise ValueError(
1772 "The only supported options for mode are 'w', "
1773 f"'w-', 'a', 'a-', and 'r+', but mode={mode!r}"
1774 )
1775
1776 # validate Dataset keys, DataArray names
1777 126.1 MiB 0.0 MiB 1 _validate_dataset_names(dataset)
1778
1779 126.1 MiB 0.0 MiB 1 if region is not None:
1780 open_kwargs = dict(
1781 store=store,
1782 synchronizer=synchronizer,
1783 group=group,
1784 consolidated=consolidated,
1785 storage_options=storage_options,
1786 zarr_version=zarr_version,
1787 )
1788 region, region_was_autodetected = _validate_and_autodetect_region(
1789 dataset, region, mode, open_kwargs
1790 )
1791 # drop indices to avoid potential race condition with auto region
1792 if region_was_autodetected:
1793 dataset = dataset.drop_vars(dataset.indexes)
1794 if append_dim is not None and append_dim in region:
1795 raise ValueError(
1796 f"cannot list the same dimension in both ``append_dim`` and "
1797 f"``region`` with to_zarr(), got {append_dim} in both"
1798 )
1799
1800 126.1 MiB 0.0 MiB 1 if zarr_version is None:
1801 # default to 2 if store doesn't specify it's version (e.g. a path)
1802 126.1 MiB 0.0 MiB 1 zarr_version = int(getattr(store, "_store_version", 2))
1803
1804 126.1 MiB 0.0 MiB 1 if consolidated is None and zarr_version > 2:
1805 consolidated = False
1806
1807 126.1 MiB 0.0 MiB 1 if mode == "r+":
1808 already_consolidated = consolidated
1809 consolidate_on_close = False
1810 else:
1811 126.1 MiB 0.0 MiB 1 already_consolidated = False
1812 126.1 MiB 0.0 MiB 1 consolidate_on_close = consolidated or consolidated is None
1813 126.1 MiB 0.0 MiB 2 zstore = backends.ZarrStore.open_group(
1814 126.1 MiB 0.0 MiB 1 store=mapper,
1815 126.1 MiB 0.0 MiB 1 mode=mode,
1816 126.1 MiB 0.0 MiB 1 synchronizer=synchronizer,
1817 126.1 MiB 0.0 MiB 1 group=group,
1818 126.1 MiB 0.0 MiB 1 consolidated=already_consolidated,
1819 126.1 MiB 0.0 MiB 1 consolidate_on_close=consolidate_on_close,
1820 126.1 MiB 0.0 MiB 1 chunk_store=chunk_mapper,
1821 126.1 MiB 0.0 MiB 1 append_dim=append_dim,
1822 126.1 MiB 0.0 MiB 1 write_region=region,
1823 126.1 MiB 0.0 MiB 1 safe_chunks=safe_chunks,
1824 126.1 MiB 0.0 MiB 1 stacklevel=4, # for Dataset.to_zarr()
1825 126.1 MiB 0.0 MiB 1 zarr_version=zarr_version,
1826 126.1 MiB 0.0 MiB 1 write_empty=write_empty_chunks,
1827 )
1828
1829 126.1 MiB 0.0 MiB 1 if mode in ["a", "a-", "r+"]:
1830 126.3 MiB 0.2 MiB 1 _validate_datatypes_for_zarr_append(zstore, dataset)
1831 126.3 MiB 0.0 MiB 1 if append_dim is not None:
1832 126.3 MiB 0.0 MiB 1 existing_dims = zstore.get_dimensions()
1833 126.3 MiB 0.0 MiB 1 if append_dim not in existing_dims:
1834 raise ValueError(
1835 f"append_dim={append_dim!r} does not match any existing "
1836 f"dataset dimensions {existing_dims}"
1837 )
1838 126.4 MiB 0.1 MiB 1 existing_var_names = set(zstore.zarr_group.array_keys())
1839 126.4 MiB 0.0 MiB 7 for var_name in existing_var_names:
1840 126.4 MiB 0.0 MiB 6 if var_name in encoding.keys():
1841 raise ValueError(
1842 f"variable {var_name!r} already exists, but encoding was provided"
1843 )
1844 126.4 MiB 0.0 MiB 1 if mode == "r+":
1845 new_names = [k for k in dataset.variables if k not in existing_var_names]
1846 if new_names:
1847 raise ValueError(
1848 f"dataset contains non-pre-existing variables {new_names}, "
1849 "which is not allowed in ``xarray.Dataset.to_zarr()`` with "
1850 "mode='r+'. To allow writing new variables, set mode='a'."
1851 )
1852
1853 126.4 MiB 0.0 MiB 1 writer = ArrayWriter()
1854 # TODO: figure out how to properly handle unlimited_dims
1855 126.6 MiB 0.2 MiB 1 dump_to_store(dataset, zstore, writer, encoding=encoding)
1856 484.8 MiB 358.2 MiB 2 writes = writer.sync(
1857 126.6 MiB 0.0 MiB 1 compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs
1858 )
1859
1860 484.8 MiB 0.0 MiB 1 if compute:
1861 484.8 MiB 0.0 MiB 1 _finalize_store(writes, zstore)
1862 else:
1863 import dask
1864
1865 return dask.delayed(_finalize_store)(writes, zstore)
1866
1867 484.8 MiB 0.0 MiB 1 return zstore
You should be able to trace it in to Zarr. My guess is that it's the encoding/compression pipeline.
It is funny that it is not releasing that memory though. Are you able to debug this further?