kerchunk icon indicating copy to clipboard operation
kerchunk copied to clipboard

Successful example of using MultiZarrToZarr

Open pbranson opened this issue 4 years ago • 9 comments

Not so much an issue, but just some examples of applying this to a large 30 year Sea Surface Temperature (SST) dataset (10576 x 4500 x 6000), chunked in (1, 2250, 3000).

I tested combining the single time reference files into months and years, I found that due to the template parsing monthly aggregates worked the best as compromise between granularity and template parsing overhead. I also tested using zarr or json for the combined references and the difference is negligible. Storage space vs having a single container for the references. Possibly zipped json is the way to go?

Ultimately, using many small workers, using the ReferenceFileSystem, extractions across time took ~15s - this is compared to many hours using THREDDS or via netCDF.

This notebook demonstrates creating aggregated references at monthly and yearly timescales: https://github.com/pbranson/pixeldrill/blob/main/notebooks/create_agg_index.ipynb The chunking layout changes in 2016 sporadically, so there is some bespoke code to detect these and stack them together when found. When the in full timeseries is recomposed, a simple sort by time re-orders the data.

This notebook demonstrates using 180 single core workers with 1GB memory per worker to load the data: https://github.com/pbranson/pixeldrill/blob/main/notebooks/access_via_aggindex.ipynb

Thanks for everyone's work on this library!

pbranson avatar Sep 03 '21 09:09 pbranson

@pbranson thanks for sharing this success story! At least you had a few chunks in the spatial dimension but (1, 2250, 3000) is still pretty painful for time series extraction at a specific location, right?
But I'm guessing you were not in a position to rechunk this dataset.

rsignell-usgs avatar Sep 03 '21 12:09 rsignell-usgs

Cool, thanks for this!

Should we have a page or section of the README linking to creation and use notebooks?

due to the template parsing

Templates can be turned off in the combined output (template_count=0 in MultiZarrToZarr), and with decent compression, templates don't seem to help that much anyway. Having said that, simple_templates=True as an argument to ReferenceFileSystem speeds up parsing massively, and all reference templates currently made by reference-maker are "simple".

Opening the full stack in a delayed fashion is too slow

The NWM example showed that it works well to do a two-stage combine with dask: calling MultiZarrToZarr on subsets of the files, and then MultiZarrToZarr again on those intermediate subsets. Indeed, you no longer need to write the files anywhere, you can just have dask pass the results around. @lsterzinger - please link your notebook here.

The chunking layout changes in 2016 sporadically, so there is some bespoke code to detect these and stack them together when found.

@rabernat : this kind of custom processing would make it hard to fit this into a pangeo-forge recipe, at least one that calls the builtin recipe classes.

martindurant avatar Sep 03 '21 13:09 martindurant

this kind of custom processing would make it hard to fit this into a pangeo-forge recipe

I'm not sure. I looked at the notebook but can't quite figure out what the custom processing is and where it should happen. In Pangeo Forge, we would like to have whatever customization points are needed to accommodate messy real-world data. It would be fantastic to try to write a recipe for this use case. That would help us figure out where the recipe class needs to be expanded. Could someone open an issue to track that idea: https://github.com/pangeo-forge/staged-recipes/issues/new/choose?

rabernat avatar Sep 03 '21 13:09 rabernat

@rsignell-usgs the current delivery model for this (updating) dataset is to netCDF, and the services associated with that. So this was a proof of concept of a possible interim solution to make the data more accessible. Certainly there is a fair whack of redundant data being read, but the chunks are still pretty small and mayby I am being helped by the async reads. I was surprised how well it scaled.

Rechunking is definitely an option as well as translating to Zarr, but these arent a part of the current workflow for this data but it is being considered. There is a fair amount of data so duplication that will come with some cost until there is a transition away from all the tooling around opendap, and I wonder is there an opendap interface to zarr?

I was just trying to catchup on the work being done in pangeo-forge and it seems like if translation is an option that AODN should consider that as an approach. These datasets could definitely be put into the recipies, but they are all in ap-southeast-2 so would need to processed 'down-under'

Templates can be turned off in the combined output (template_count=0 in MultiZarrToZarr), and with decent compression, templates don't seem to help that much anyway. Having said that, simple_templates=True

Thanks @martindurant I was looking for a way to disable the templating and use Zarr with compression but didnt figure out that was what template_count was for. Will definitely try that, as well as the two stage approach. It still seems that for these types of queries across finely chunked dimensions, resolving the dask graph to (a small amount of) concrete values is a reasonable approach. Most people want to get timeseries in their locality

pbranson avatar Sep 03 '21 13:09 pbranson

but they are all in ap-southeast-2 so would need to processed 'down-under'

We need to get a bakery deployed in your region! Are you interested in collaborating on this?

https://github.com/pangeo-forge/pangeo-forge-aws-bakery

rabernat avatar Sep 03 '21 13:09 rabernat

I looked at the notebook but can't quite figure out what the custom processing is

Its in this notebook (forgot to set xr display to text): https://github.com/pbranson/pixeldrill/blob/main/notebooks/create_agg_index.ipynb

        #Deal with different chunk sizes - create a separate aggregate file for each chunking layout
        chunking = {}
        for r in references:
            ds=open_dataset('s3://' + r)
            key = ds['sea_surface_temperature'].chunks
            if key in chunking.keys():
                chunking[key].append(r)
            else:
                chunking[key] = [r,]
        
        #Label each set with a, b, c, ...
        labels = [chr(i) for i in range(97,97+len(chunking.keys()))]
        agg_files=[]
        for i, (chunks, refs) in enumerate(chunking.items()):
            
            #setup output location
            agg_file = f"{dest}{mask}{suffix}.{extension}".replace(f'.{extension}',f'_{labels[i]}.{extension}')
...

Basically it creates a separate aggregate for each chunking layout, then reorders after the xr.concat

We need to get a bakery deployed in your region! Are you interested in collaborating on this?

I agree that would be great! I'm not really in a position to facilitate that, but will discuss it with a few people who may be. I expect that if the AODN go the route of translating the data to Zarr or ncZarr then it could be on the cards if we could find the right funding. I will try raise it at the next Pangeo-Oceania hook up if I can make it

pbranson avatar Sep 03 '21 13:09 pbranson

template_count=0 in MultiZarrToZarr

I have tested this, and it makes about 30% faster than using json with simple_template=true

It also makes opening the larger stacks much faster, however the overall extraction is much slower, I think due to increased graph size and scheduling overhead - possibly I could finetune the dask chunks but it seems that the monthly load is the best way to go in this instance (aside from rechunking on disk)

pbranson avatar Sep 03 '21 15:09 pbranson

Keep in mind that you can always override the on-disk chunking when you open with xarray / dask. In particular, it often make sense to use larger dask chunks than the (relatively small) hdf5 chunks. Thanks to fsspec's async capabilities, it should work close to the same speed as if there were one big chunk on disk.

rabernat avatar Sep 03 '21 15:09 rabernat

I made a dask report of the load using larger workers, whilst it seems to be using async, a fair amount of time is spent waiting for a sync after 'cat' the references.

I have put the performance reports from the yearly 'delayed' approach, setting large dask chunks (time=180) and a second profile for the monthly 'eager' loading, returning the concrete values. For similar compute resources the monthly-eager approach is ~5x faster https://gist.github.com/pbranson/18cee7f5f6f19ee578d6c2ac4540f182

pbranson avatar Sep 03 '21 16:09 pbranson