4

I am loading in the following data using xr.mfdataset. There is 16GB of data, across many files.

import xarray as xr
from datetime import datetime
from pathlib import Path

from dask.diagnostics import ProgressBar
def add_time_dim(xda: xr.Dataset) -> xr.Dataset:
    # https://stackoverflow.com/a/65416801/9940782 
    xda = xda.expand_dims(time = [datetime.now()])
    return xda

raw_folder = data_dir / "raw/modis_ndvi_1000"
    
files = [f for f in raw_folder.glob("*.nc")]
data = xr.open_mfdataset(files, preprocess=add_time_dim)
data
<xarray.Dataset>
Dimensions:     (time: 647, lat: 5600, lon: 4480)
Coordinates:
  * time        (time) datetime64[ns] 2004-04-30 2005-10-10 ... 2018-10-31
  * lat         (lat) float64 -19.99 -19.98 -19.97 -19.96 ... 29.98 29.99 30.0
  * lon         (lon) float64 20.0 20.01 20.02 20.03 ... 59.96 59.97 59.98 59.99
Data variables:
    modis_ndvi  (time, lat, lon) float32 dask.array<chunksize=(1, 5600, 4480), meta=np.ndarray>

After selecting my region of interest I have halved the size of the dataset (~8GB)

<xarray.Dataset>
Dimensions:     (time: 647, lat: 1255, lon: 983)
Coordinates:
  * time        (time) datetime64[ns] 2004-04-30 2005-10-10 ... 2018-10-31
  * lat         (lat) float64 -5.196 -5.187 -5.179 -5.17 ... 5.982 5.991 6.0
  * lon         (lon) float64 33.51 33.52 33.53 33.54 ... 42.26 42.27 42.28
Data variables:
    modis_ndvi  (time, lat, lon) float32 dask.array<chunksize=(1, 1255, 983), meta=np.ndarray>

## Every time I try to save the data, the process is Killed. How can I write this large file to netcdf?

out_folder = data_dir / "interim/modis_ndvi_1000_preprocessed"
out_folder.mkdir(exist_ok=True)
out_file = out_folder / f"modis_ndvi_1000_{subset_str}.nc"

data.to_netcdf(out_file, compute=False)
with ProgressBar():
    print(f"Writing to {out_file}")
    data.compute()


Killed

What do I need to do? How to cope with this large dataset, how to write it out to disk in parallel?

The process is killed because all available memory gets used up (which can be seen when watching htop)

htop showing the memory usage

1 Answer 1

3

to_netcdf(compute=False) returns a dask.delayed.Delayed object. You should store that as a variable and compute that rather than computing the array:

write_job = data.to_netcdf(out_file, compute=False)
with ProgressBar():
    print(f"Writing to {out_file}")
    write_job.compute()

The code as you have it starts the delayed write job then tries to bring the whole array data into memory.

That said, zarr is better suited to parallel writes. Even with arrays backed with distributed task clusters, to_netcdf brings the array to the local thread (in chunks, but still) to write to the netcdf in the main thread. Writing with zarr schedules the write, then the workers write to the storage in parallel. If you're on a networked or cloud filesystem this can make a huge difference. If you're able to use zarr I'd check out that format!

Sign up to request clarification or add additional context in comments.

5 Comments

So would you recommend writing to zarr first and then reading from zarr, calling .compute() and then writing that object out to NetCDF format? Or just working with zarr format and forgetting entirely about NetCDF? Thanks!
The latter :) evolve past netcdf!
Fwiw though I do still use netCDF too - it still has wider adoption and compatibility with legacy tools. Important point though is don't call compute on the array - that brings whatever you're computing (in your code's case, the whole array) into memory.
when writing to zarr, should you still use the delayed object and then later call delayed.compute()? This is all super info, the zarr stuff is crazy cool!
if you want the progress bar then sure! there's no need to use compute=False unless you want to do something with the delayed object (such as using dask's ProgressBar). But you can always just write to netCDF or zarr with compute=True (the default) and everything should work fine.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.