-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
We are working on integrating zarr with xarray. In the process, we have encountered a performance issue that I am documenting here. At this point, it is not clear if the core issue is in zarr, gcsfs, dask, or xarray. I originally started posting this in zarr, but in the process, I became more convinced the issue was with xarray.
Dask Only
Here is an example using only dask and zarr.
# connect to a local dask scheduler
from dask.distributed import Client
client = Client('tcp://129.236.20.45:8786')
# create a big dask array
import dask.array as dsa
shape = (30, 50, 1080, 2160)
chunkshape = (1, 1, 1080, 2160)
ar = dsa.random.random(shape, chunks=chunkshape)
# connect to gcs and create MutableMapping
import gcsfs
fs = gcsfs.GCSFileSystem(project='pangeo-181919')
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/test999', gcs=fs, check=True,
create=True)
# create a zarr array to store into
import zarr
za = zarr.create(ar.shape, chunks=chunkshape, dtype=ar.dtype, store=gcsmap)
# write it
ar.store(za, lock=False)
When you do this, it spends a long time serializing stuff before the computation starts.
For a more fine-grained look at the process, one can instead do
delayed_obj = a.store(za, compute=False, lock=False)
%prun future = client.compute(dobj)
This reveals that the pre-compute step takes about 10s. Monitoring the distributed scheduler, I can see that, once the computation starts, it takes about 1:30 to store the array (27 GB). (This is actually not bad!)
Some debugging by @mrocklin revealed the following step is quite slow
import cloudpickle
%time len(cloudpickle.dumps(za))
On my system, this was taking close to 1s. On contrast, when the store
passed to gcsmap
is not a GCSMap
but instead a path, it is in the microsecond territory. So pickling GCSMap
objects is relatively slow. I'm not sure whether this pickling happens when we call client.compute
or during the task execution.
There is room for improvement here, but overall, zarr + gcsfs + dask seem to integrate well and give decent performance.
Xarray
This get much worse once xarray enters the picture. (Note that this example requires the xarray PR #1528, which has not been merged yet.)
# wrap the dask array in an xarray
import xarray as xr
import numpy as np
ds = xr.DataArray(ar, dims=['time', 'depth', 'lat', 'lon'],
coords={'lat': np.linspace(-90, 90, Ny),
'lon': np.linspace(0, 360, Nx)}).to_dataset(name='temperature')
# store to a different bucket
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/test1', gcs=fs, check=True, create=True)
ds.to_zarr(store=gcsmap, mode='w')
Now the store step takes 18 minutes. Most of this time, is upfront, during which there is little CPU activity and no network activity. After about 15 minutes or so, it finally starts computing, at which point the writes to gcs proceed more-or-less at the same rate as with the dask-only example.
Profiling the to_zarr
with snakeviz reveals that it is spending most of its time waiting for thread locks.
I don't understand this, since I specifically eliminated locks when storing the zarr arrays.