xarray icon indicating copy to clipboard operation
xarray copied to clipboard

dataset.to_zarr(store=zarr_store, compute=True) results in memory spike which isn't cleared

Open punit-kulal opened this issue 1 year ago • 3 comments

What happened?

I am using rioxarray which in turn is using xarray. We use to_zarr from dataset which is an xarray object. When using dataset to write to zarr store using the to_zarr method. A spike in memory is observed which is not freed post the execution of the upload.

What did you expect to happen?

When using dataset to write to zarr store using the to_zarr method. A spike in memory is observed, which is then freed post the completion of execution. The input to it is a stacked multiband raster created out of sentinel tile.

Minimal Complete Verifiable Example

import os

import xarray
from fsspec.mapping import FSMap
import rioxarray
from fsspec.implementations.local import LocalFileSystem
from memory_profiler import profile

@profile
def _update_zarr_store_with_mosaic(
    path_to_mosaic_file: str,
    date: str,
    index: int,
) -> bool:
    local_fs: LocalFileSystem = LocalFileSystem()
    parent_path = "/Users/punit/work/atlas"
    zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
    zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
    data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)

    data = data.expand_dims({"time": 1})
    data = data.assign_coords(time=[date])
    data.attrs = {}
    dataset = data.to_dataset(name="data")
    try:
        if index == 0:
            # during first push, we need to create time dimension hence we don't call append dim
            zarr_out = dataset.to_zarr(store=zarr_store)
        else:
            # during consequent pushes to zarr store, we want new data along the time dimension
            zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
    except Exception:
        return False
    return True

if __name__ == "__main__":
    date_to_mosaic_file_path_dict = {
        "2024-02-27": "path_to_file_1",
        "2024-03-03": "path_to_file_2",
        "2024-03-08": "path_to_file_3",
    }
    xarray.show_versions()
    for idx, date in enumerate(sorted(date_to_mosaic_file_path_dict.keys())):
        _update_zarr_store_with_mosaic(date_to_mosaic_file_path_dict[date], date, idx)

MVCE confirmation

  • [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [X] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [X] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [X] New issue — a search of GitHub Issues suggests this is not a duplicate.
  • [X] Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

INSTALLED VERSIONS
------------------
commit: None
python: 3.11.6 (main, Nov  2 2023, 04:39:43) [Clang 14.0.3 (clang-1403.0.22.14.1)]
python-bits: 64
OS: Darwin
OS-release: 22.4.0
machine: arm64
processor: arm
byteorder: little
LC_ALL: None
LANG: None
LOCALE: (None, 'UTF-8')
libhdf5: None
libnetcdf: None

xarray: 2024.2.0
pandas: 2.2.1
numpy: 1.24.1
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.13.3
cftime: None
nc_time_axis: None
iris: None
bottleneck: None
dask: 2022.12.1
distributed: None
matplotlib: 3.8.3
cartopy: None
seaborn: None
numbagg: None
fsspec: 2023.6.0
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: 69.1.1
pip: 23.3.1
conda: None
pytest: 7.2.0
mypy: None
IPython: None
sphinx: None
Filename: /Users/punit/work/atlas/sample.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     9    134.3 MiB    134.3 MiB           1   @profile
    10                                         def _update_zarr_store_with_mosaic(
    11                                             path_to_mosaic_file: str,
    12                                             date: str,
    13                                             index: int,
    14                                         ) -> bool:
    15    134.3 MiB      0.0 MiB           1       local_fs: LocalFileSystem = LocalFileSystem()
    16    134.3 MiB      0.0 MiB           1       parent_path = "/Users/punit/work/atlas"
    17    134.3 MiB      0.0 MiB           1       zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
    18    134.3 MiB      0.0 MiB           1       zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
    19    142.8 MiB      8.5 MiB           1       data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
    20                                         
    21    142.8 MiB      0.0 MiB           1       data = data.expand_dims({"time": 1})
    22    142.8 MiB      0.1 MiB           1       data = data.assign_coords(time=[date])
    23    142.8 MiB      0.0 MiB           1       data.attrs = {}
    24    142.8 MiB      0.0 MiB           1       dataset = data.to_dataset(name="data")
    25    142.8 MiB      0.0 MiB           1       try:
    26    142.8 MiB      0.0 MiB           1           if index == 0:
    27                                                     # during first push, we need to create time dimension hence we don't call append dim
    28    142.8 MiB      0.0 MiB           1               zarr_out = dataset.to_zarr(store=zarr_store)
    29                                                 else:
    30                                                     # during consequent pushes to zarr store, we want new data along the time dimension
    31                                                     zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
    32    142.8 MiB      0.0 MiB           1       except Exception:
    33    142.8 MiB      0.0 MiB           1           return False
    34                                             return True


Filename: /Users/punit/work/atlas/sample.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     9    142.8 MiB    142.8 MiB           1   @profile
    10                                         def _update_zarr_store_with_mosaic(
    11                                             path_to_mosaic_file: str,
    12                                             date: str,
    13                                             index: int,
    14                                         ) -> bool:
    15    142.8 MiB      0.0 MiB           1       local_fs: LocalFileSystem = LocalFileSystem()
    16    142.8 MiB      0.0 MiB           1       parent_path = "/Users/punit/work/atlas"
    17    142.8 MiB      0.0 MiB           1       zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
    18    142.8 MiB      0.0 MiB           1       zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
    19    143.2 MiB      0.3 MiB           1       data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
    20                                         
    21    143.2 MiB      0.0 MiB           1       data = data.expand_dims({"time": 1})
    22    143.2 MiB      0.0 MiB           1       data = data.assign_coords(time=[date])
    23    143.2 MiB      0.0 MiB           1       data.attrs = {}
    24    143.2 MiB      0.0 MiB           1       dataset = data.to_dataset(name="data")
    25    143.2 MiB      0.0 MiB           1       try:
    26    143.2 MiB      0.0 MiB           1           if index == 0:
    27                                                     # during first push, we need to create time dimension hence we don't call append dim
    28                                                     zarr_out = dataset.to_zarr(store=zarr_store)
    29                                                 else:
    30                                                     # during consequent pushes to zarr store, we want new data along the time dimension
    31    511.9 MiB    368.7 MiB           1               zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
    32                                             except Exception:
    33                                                 return False
    34    511.9 MiB      0.0 MiB           1       return True


Filename: /Users/punit/work/atlas/sample.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     9    511.9 MiB    511.9 MiB           1   @profile
    10                                         def _update_zarr_store_with_mosaic(
    11                                             path_to_mosaic_file: str,
    12                                             date: str,
    13                                             index: int,
    14                                         ) -> bool:
    15    511.9 MiB      0.0 MiB           1       local_fs: LocalFileSystem = LocalFileSystem()
    16    511.9 MiB      0.0 MiB           1       parent_path = "/Users/punit/work/atlas"
    17    511.9 MiB      0.0 MiB           1       zarr_store_path = os.path.join(parent_path, "testdata/zarr_store")
    18    511.9 MiB      0.0 MiB           1       zarr_store: FSMap = local_fs.get_mapper(root=zarr_store_path)
    19    512.2 MiB      0.3 MiB           1       data = rioxarray.open_rasterio(path_to_mosaic_file, chunks=True)
    20                                         
    21    512.2 MiB      0.0 MiB           1       data = data.expand_dims({"time": 1})
    22    512.2 MiB      0.0 MiB           1       data = data.assign_coords(time=[date])
    23    512.2 MiB      0.0 MiB           1       data.attrs = {}
    24    512.2 MiB      0.0 MiB           1       dataset = data.to_dataset(name="data")
    25    512.2 MiB      0.0 MiB           1       try:
    26    512.2 MiB      0.0 MiB           1           if index == 0:
    27                                                     # during first push, we need to create time dimension hence we don't call append dim
    28                                                     zarr_out = dataset.to_zarr(store=zarr_store)
    29                                                 else:
    30                                                     # during consequent pushes to zarr store, we want new data along the time dimension
    31    543.0 MiB     30.9 MiB           1               zarr_out = dataset.to_zarr(store=zarr_store, append_dim="time")
    32                                             except Exception:
    33                                                 return False
    34    543.0 MiB     -0.0 MiB           1       return True

Anything else we need to know?

  1. The log is an output of the code running with three files that is the dictionary contains three paths.
  2. I can provide the input tiff files, but they are quite big (1.4, 1.1, 1.1 GB)respectively.

Environment

commit: None python: 3.11.6 (main, Nov 2 2023, 04:39:43) [Clang 14.0.3 (clang-1403.0.22.14.1)] python-bits: 64 OS: Darwin OS-release: 22.4.0 machine: arm64 processor: arm byteorder: little LC_ALL: None LANG: None LOCALE: (None, 'UTF-8') libhdf5: None libnetcdf: None

xarray: 2024.2.0 pandas: 2.2.1 numpy: 1.24.1 scipy: None netCDF4: None pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.13.3 cftime: None nc_time_axis: None iris: None bottleneck: None dask: 2022.12.1 distributed: None matplotlib: 3.8.3 cartopy: None seaborn: None numbagg: None fsspec: 2023.6.0 cupy: None pint: None sparse: None flox: None numpy_groupies: None setuptools: 69.1.1 pip: 23.3.1 conda: None pytest: 7.2.0 mypy: None IPython: None sphinx: None

punit-kulal avatar Mar 13 '24 09:03 punit-kulal

Thanks for opening your first issue here at xarray! Be sure to follow the issue template! If you have an idea for a solution, we would really welcome a Pull Request with proposed changes. See the Contributing Guide for more. It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better. Thank you!

welcome[bot] avatar Mar 13 '24 09:03 welcome[bot]

It is observed that the memory usage increases in writes = writer.sync(compute=compute) method inside xarray api. when memory profiler was added inside xarray functions.

Filename: /Users/punit/work/atlas/venv/lib/python3.11/site-packages/xarray/backends/api.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
  1700    126.1 MiB    126.1 MiB           1   @profile
  1701                                         def to_zarr(
  1702                                             dataset: Dataset,
  1703                                             store: MutableMapping | str | os.PathLike[str] | None = None,
  1704                                             chunk_store: MutableMapping | str | os.PathLike | None = None,
  1705                                             mode: ZarrWriteModes | None = None,
  1706                                             synchronizer=None,
  1707                                             group: str | None = None,
  1708                                             encoding: Mapping | None = None,
  1709                                             *,
  1710                                             compute: bool = True,
  1711                                             consolidated: bool | None = None,
  1712                                             append_dim: Hashable | None = None,
  1713                                             region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
  1714                                             safe_chunks: bool = True,
  1715                                             storage_options: dict[str, str] | None = None,
  1716                                             zarr_version: int | None = None,
  1717                                             write_empty_chunks: bool | None = None,
  1718                                             chunkmanager_store_kwargs: dict[str, Any] | None = None,
  1719                                         ) -> backends.ZarrStore | Delayed:
  1720                                             """This function creates an appropriate datastore for writing a dataset to
  1721                                             a zarr ztore
  1722                                         
  1723                                             See `Dataset.to_zarr` for full API docs.
  1724                                             """
  1725                                         
  1726                                             # Load empty arrays to avoid bug saving zero length dimensions (Issue #5741)
  1727    126.1 MiB      0.0 MiB           7       for v in dataset.variables.values():
  1728    126.1 MiB      0.0 MiB           6           if v.size == 0:
  1729                                                     v.load()
  1730                                         
  1731                                             # expand str and path-like arguments
  1732    126.1 MiB      0.0 MiB           1       store = _normalize_path(store)
  1733    126.1 MiB      0.0 MiB           1       chunk_store = _normalize_path(chunk_store)
  1734                                         
  1735    126.1 MiB      0.0 MiB           1       if storage_options is None:
  1736    126.1 MiB      0.0 MiB           1           mapper = store
  1737    126.1 MiB      0.0 MiB           1           chunk_mapper = chunk_store
  1738                                             else:
  1739                                                 from fsspec import get_mapper
  1740                                         
  1741                                                 if not isinstance(store, str):
  1742                                                     raise ValueError(
  1743                                                         f"store must be a string to use storage_options. Got {type(store)}"
  1744                                                     )
  1745                                                 mapper = get_mapper(store, **storage_options)
  1746                                                 if chunk_store is not None:
  1747                                                     chunk_mapper = get_mapper(chunk_store, **storage_options)
  1748                                                 else:
  1749                                                     chunk_mapper = chunk_store
  1750                                         
  1751    126.1 MiB      0.0 MiB           1       if encoding is None:
  1752    126.1 MiB      0.0 MiB           1           encoding = {}
  1753                                         
  1754    126.1 MiB      0.0 MiB           1       if mode is None:
  1755    126.1 MiB      0.0 MiB           1           if append_dim is not None:
  1756    126.1 MiB      0.0 MiB           1               mode = "a"
  1757                                                 elif region is not None:
  1758                                                     mode = "r+"
  1759                                                 else:
  1760                                                     mode = "w-"
  1761                                         
  1762    126.1 MiB      0.0 MiB           1       if mode not in ["a", "a-"] and append_dim is not None:
  1763                                                 raise ValueError("cannot set append_dim unless mode='a' or mode=None")
  1764                                         
  1765    126.1 MiB      0.0 MiB           1       if mode not in ["a", "a-", "r+"] and region is not None:
  1766                                                 raise ValueError(
  1767                                                     "cannot set region unless mode='a', mode='a-', mode='r+' or mode=None"
  1768                                                 )
  1769                                         
  1770    126.1 MiB      0.0 MiB           1       if mode not in ["w", "w-", "a", "a-", "r+"]:
  1771                                                 raise ValueError(
  1772                                                     "The only supported options for mode are 'w', "
  1773                                                     f"'w-', 'a', 'a-', and 'r+', but mode={mode!r}"
  1774                                                 )
  1775                                         
  1776                                             # validate Dataset keys, DataArray names
  1777    126.1 MiB      0.0 MiB           1       _validate_dataset_names(dataset)
  1778                                         
  1779    126.1 MiB      0.0 MiB           1       if region is not None:
  1780                                                 open_kwargs = dict(
  1781                                                     store=store,
  1782                                                     synchronizer=synchronizer,
  1783                                                     group=group,
  1784                                                     consolidated=consolidated,
  1785                                                     storage_options=storage_options,
  1786                                                     zarr_version=zarr_version,
  1787                                                 )
  1788                                                 region, region_was_autodetected = _validate_and_autodetect_region(
  1789                                                     dataset, region, mode, open_kwargs
  1790                                                 )
  1791                                                 # drop indices to avoid potential race condition with auto region
  1792                                                 if region_was_autodetected:
  1793                                                     dataset = dataset.drop_vars(dataset.indexes)
  1794                                                 if append_dim is not None and append_dim in region:
  1795                                                     raise ValueError(
  1796                                                         f"cannot list the same dimension in both ``append_dim`` and "
  1797                                                         f"``region`` with to_zarr(), got {append_dim} in both"
  1798                                                     )
  1799                                         
  1800    126.1 MiB      0.0 MiB           1       if zarr_version is None:
  1801                                                 # default to 2 if store doesn't specify it's version (e.g. a path)
  1802    126.1 MiB      0.0 MiB           1           zarr_version = int(getattr(store, "_store_version", 2))
  1803                                         
  1804    126.1 MiB      0.0 MiB           1       if consolidated is None and zarr_version > 2:
  1805                                                 consolidated = False
  1806                                         
  1807    126.1 MiB      0.0 MiB           1       if mode == "r+":
  1808                                                 already_consolidated = consolidated
  1809                                                 consolidate_on_close = False
  1810                                             else:
  1811    126.1 MiB      0.0 MiB           1           already_consolidated = False
  1812    126.1 MiB      0.0 MiB           1           consolidate_on_close = consolidated or consolidated is None
  1813    126.1 MiB      0.0 MiB           2       zstore = backends.ZarrStore.open_group(
  1814    126.1 MiB      0.0 MiB           1           store=mapper,
  1815    126.1 MiB      0.0 MiB           1           mode=mode,
  1816    126.1 MiB      0.0 MiB           1           synchronizer=synchronizer,
  1817    126.1 MiB      0.0 MiB           1           group=group,
  1818    126.1 MiB      0.0 MiB           1           consolidated=already_consolidated,
  1819    126.1 MiB      0.0 MiB           1           consolidate_on_close=consolidate_on_close,
  1820    126.1 MiB      0.0 MiB           1           chunk_store=chunk_mapper,
  1821    126.1 MiB      0.0 MiB           1           append_dim=append_dim,
  1822    126.1 MiB      0.0 MiB           1           write_region=region,
  1823    126.1 MiB      0.0 MiB           1           safe_chunks=safe_chunks,
  1824    126.1 MiB      0.0 MiB           1           stacklevel=4,  # for Dataset.to_zarr()
  1825    126.1 MiB      0.0 MiB           1           zarr_version=zarr_version,
  1826    126.1 MiB      0.0 MiB           1           write_empty=write_empty_chunks,
  1827                                             )
  1828                                         
  1829    126.1 MiB      0.0 MiB           1       if mode in ["a", "a-", "r+"]:
  1830    126.3 MiB      0.2 MiB           1           _validate_datatypes_for_zarr_append(zstore, dataset)
  1831    126.3 MiB      0.0 MiB           1           if append_dim is not None:
  1832    126.3 MiB      0.0 MiB           1               existing_dims = zstore.get_dimensions()
  1833    126.3 MiB      0.0 MiB           1               if append_dim not in existing_dims:
  1834                                                         raise ValueError(
  1835                                                             f"append_dim={append_dim!r} does not match any existing "
  1836                                                             f"dataset dimensions {existing_dims}"
  1837                                                         )
  1838    126.4 MiB      0.1 MiB           1           existing_var_names = set(zstore.zarr_group.array_keys())
  1839    126.4 MiB      0.0 MiB           7           for var_name in existing_var_names:
  1840    126.4 MiB      0.0 MiB           6               if var_name in encoding.keys():
  1841                                                         raise ValueError(
  1842                                                             f"variable {var_name!r} already exists, but encoding was provided"
  1843                                                         )
  1844    126.4 MiB      0.0 MiB           1           if mode == "r+":
  1845                                                     new_names = [k for k in dataset.variables if k not in existing_var_names]
  1846                                                     if new_names:
  1847                                                         raise ValueError(
  1848                                                             f"dataset contains non-pre-existing variables {new_names}, "
  1849                                                             "which is not allowed in ``xarray.Dataset.to_zarr()`` with "
  1850                                                             "mode='r+'. To allow writing new variables, set mode='a'."
  1851                                                         )
  1852                                         
  1853    126.4 MiB      0.0 MiB           1       writer = ArrayWriter()
  1854                                             # TODO: figure out how to properly handle unlimited_dims
  1855    126.6 MiB      0.2 MiB           1       dump_to_store(dataset, zstore, writer, encoding=encoding)
  1856    484.8 MiB    358.2 MiB           2       writes = writer.sync(
  1857    126.6 MiB      0.0 MiB           1           compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs
  1858                                             )
  1859                                         
  1860    484.8 MiB      0.0 MiB           1       if compute:
  1861    484.8 MiB      0.0 MiB           1           _finalize_store(writes, zstore)
  1862                                             else:
  1863                                                 import dask
  1864                                         
  1865                                                 return dask.delayed(_finalize_store)(writes, zstore)
  1866                                         
  1867    484.8 MiB      0.0 MiB           1       return zstore

punit-kulal avatar Mar 13 '24 09:03 punit-kulal

You should be able to trace it in to Zarr. My guess is that it's the encoding/compression pipeline.

It is funny that it is not releasing that memory though. Are you able to debug this further?

dcherian avatar Mar 13 '24 17:03 dcherian