13

I am trying to efficiently restructure a large multidimentional dataset. Let assume I have a number of remotely sensed images over time with a number of bands with coordinates x y for pixel location, time for time of image acquisition, and band for different data collected.

In my use case lets assume the xarray coord lengths are roughly x (3000), y (3000), time (10), with bands (40) of floating point data. So 100gb+ of data.

I have been trying to work from this example but I am having trouble translating it to this case.

Small dataset example

NOTE: the actual data is much larger than this example.

import numpy as np
import dask.array as da
import xarray as xr

nrows = 100
ncols = 200
row_chunks = 50
col_chunks = 50

data = da.random.random(size=(1, nrows, ncols), chunks=(1, row_chunks, col_chunks))

def create_band(data, x, y, band_name):

    return xr.DataArray(data,
                        dims=('band', 'y', 'x'),
                        coords={'band': [band_name],
                                'y': y,
                                'x': x})

def create_coords(data, left, top, celly, cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left, right, ncols) + cellx/2.0
    y = np.linspace(top, bottom, nrows) - celly/2.0
    
    return x, y

x, y = create_coords(data, 1000, 2000, 30, 30)

src = []

for time in ['t1', 't2', 't3']:

    src_t = xr.concat([create_band(data, x, y, band) for band in ['blue', 'green', 'red', 'nir']], dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})
    
    src.append(src_t)

src = xr.concat(src, dim='time')

print(src)


<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (time: 3, band: 4, y: 100, x: 200)>
dask.array<concatenate, shape=(3, 4, 100, 200), dtype=float64, chunksize=(1, 1, 50, 50), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) float64 1.015e+03 1.045e+03 1.075e+03 ... 6.985e+03 7.015e+03
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * y        (y) float64 1.985e+03 1.955e+03 1.924e+03 ... -984.7 -1.015e+03
  * time     (time) object 't1' 't2' 't3'

Restructured - stacked and transposed

I need to store the output of the following:

print(src.stack(sample=('y','x','time')).T)

<xarray.DataArray 'random_sample-5840d8564d778d573dd403f27c3f47a5' (sample: 60000, band: 4)>
dask.array<transpose, shape=(60000, 4), dtype=float64, chunksize=(3600, 1), chunktype=numpy.ndarray>
Coordinates:
  * band     (band) object 'blue' 'green' 'red' 'nir'
  * sample   (sample) MultiIndex
  - y        (sample) float64 1.985e+03 1.985e+03 ... -1.015e+03 -1.015e+03
  - x        (sample) float64 1.015e+03 1.015e+03 ... 7.015e+03 7.015e+03
  - time     (sample) object 't1' 't2' 't3' 't1' 't2' ... 't3' 't1' 't2' 't3'

I am hoping to use dask and xarray to write the result to disk in chunks, accessible for open_mfdataset. parquet seems like a good option, but I can't figure out how to write it in chunks (src is too big to store in memory).

@dask.delayed
def stacker(data):
   return data.stack(sample=('y','x','time')).T.to_pandas() 

stacker(src).to_parquet('out_*.parquet')

def stack_write(data):
   data.stack(sample=('y','x','time')).T.to_pandas().to_parquet('out_*.parquet')
   return None

stack_write(src)

At this point I am just hoping for some good ideas. Thanks!

9
  • 1
    Is src stored in memory before "stacked and transposed" operation ? Commented Oct 26, 2020 at 10:50
  • @Rivers No it is not. Should be done lazily in chunks by dask. Commented Oct 27, 2020 at 14:47
  • 1
    Ok I see. And have you already completed this "stack and transpose" operation successfully ? Or is it impossble to complete it successfully precisely because of the fact that src is too big to store in memory ? Commented Oct 27, 2020 at 20:39
  • @Rivers no too big to complete, hence storing chunks. Commented Oct 28, 2020 at 21:27
  • 1
    Thanks. I'm asking because it will make a big difference for the rest of the code (we won't be able to proceed in the same ways). Could you show the line of code with which you load the data ? (I'll be very busy in the coming days so I'll be able to answer next week) Commented Nov 4, 2020 at 11:36

2 Answers 2

4

I have a solution here (https://github.com/pydata/xarray/issues/1077#issuecomment-644803374) for writing multiindexed datasets to file.

You'll have to manually "encode" the dataset into a form that can be written as netCDF. And then "decode" when you read it back.

import numpy as np
import pandas as pd
import xarray as xr


def encode_multiindex(ds, idxname):
    encoded = ds.reset_index(idxname)
    coords = dict(zip(ds.indexes[idxname].names, ds.indexes[idxname].levels))
    for coord in coords:
        encoded[coord] = coords[coord].values
    shape = [encoded.sizes[coord] for coord in coords]
    encoded[idxname] = np.ravel_multi_index(ds.indexes[idxname].codes, shape)
    encoded[idxname].attrs["compress"] = " ".join(ds.indexes[idxname].names)
    return encoded


def decode_to_multiindex(encoded, idxname):
    names = encoded[idxname].attrs["compress"].split(" ")
    shape = [encoded.sizes[dim] for dim in names]
    indices = np.unravel_index(encoded.landpoint.values, shape)
    arrays = [encoded[dim].values[index] for dim, index in zip(names, indices)]
    mindex = pd.MultiIndex.from_arrays(arrays)

    decoded = xr.Dataset({}, {idxname: mindex})
    for varname in encoded.data_vars:
        if idxname in encoded[varname].dims:
            decoded[varname] = (idxname, encoded[varname].values)
    return decoded
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks that is helpful. Any thoughts on writing to the hdf in chunks?
Nothing except "don't do it". HDF in parallel is messy. Zarr is a better option, it works well with parallel writes and xarray/dask. You can then convert the zarr to HDF.
2

This is not the solution, for the moment, but a version of your code, modifed so that it will be easily reproducible if others want to try to solve this problem:

The problem is with the stack operation (concatenated.stack(sample=('y','x','time')). At this step, the memory keeps increasing and the process is killed.

The concatenated object is a "Dask-backed" xarray.DataArray. So we could expect the stack operation to be done lazily by Dask. So, why is the process killed at this step ?

2 possibilities for what is happening here:

  • The stack operation is in fact done lazily by Dask, but because the data are very that huge, even the minimum required memory for Dask is too much

  • The stack operation is NOT Dask-backed


import numpy as np
import dask.array as da
import xarray as xr
from numpy.random import RandomState

nrows = 20000
ncols = 20000
row_chunks = 500
col_chunks = 500


# Create a reproducible random numpy array
prng = RandomState(1234567890)
numpy_array = prng.rand(1, nrows, ncols)

data = da.from_array(numpy_array, chunks=(1, row_chunks, col_chunks))


def create_band(data, x, y, band_name):

    return xr.DataArray(data,
                        dims=('band', 'y', 'x'),
                        coords={'band': [band_name],
                                'y': y,
                                'x': x})

def create_coords(data, left, top, celly, cellx):
    nrows = data.shape[-2]
    ncols = data.shape[-1]
    right = left + cellx*ncols
    bottom = top - celly*nrows
    x = np.linspace(left, right, ncols) + cellx/2.0
    y = np.linspace(top, bottom, nrows) - celly/2.0
    
    return x, y


x, y = create_coords(data, 1000, 2000, 30, 30)

bands = ['blue', 'green', 'red', 'nir']
times = ['t1', 't2', 't3']
bands_list = [create_band(data, x, y, band) for band in bands]

src = []

for time in times:

    src_t = xr.concat(bands_list, dim='band')\
                    .expand_dims(dim='time')\
                    .assign_coords({'time': [time]})

    src.append(src_t)


concatenated = xr.concat(src, dim='time')
print(concatenated)
# computed = concatenated.compute() # "computed" is ~35.8GB

stacked = concatenated.stack(sample=('y','x','time'))

transposed = stacked.T

One can try to change the values of nrows and ncols in order to vary the size of concatenated. And for performance we could/should vary the chunks too.

Note: I even tried this

concatenated.to_netcdf("concatenated.nc")
concatenated = xr.open_dataarray("concatenated.nc", chunks=10)

This is in order to be sure it's a Dask-backed DataArray and to be able to adjust the chunks too. I tried different value/s for chunks: but always out of memory.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.