Skip to content

Best practices for zarr and GCS streaming applications #595

@skgbanga

Description

@skgbanga

Hello,

We are exploring zarr as a potential file format for our application. Our application is a streaming application which generates rows of data, which are continuously being appended to a 2D matrix.

I couldn't find 'best guidelines' when it comes to streaming and zarr and gcs. (for that matter, any cloud storage). Please point me in the right direction if there already exists something like this.

To evaluate zarr, I wrote a small script (Kudos on good docs! I was able to write this small app in very little time).
Note that this is NOT optimized at all. The point of this issue/post is to figure out the best practices for such an application.

import os
import shutil
import time
import argparse
import zarr
import numpy as np
import gcsfs

TEST_PROJECT = "..."
TEST_BUCKET = "..."

TEST_GOOGLE_SERVICE_ACCOUNT_INFO = {}

n = 100
xs = 2
chunk_size = 10


def timer(fn):
    def wrapper(*args, **kwargs):
        start = time.time()
        fn(*args, **kwargs)
        dur = time.time() - start
        return dur

    return wrapper


@timer
def iterate(store):
    z = zarr.create(store=store, shape=(chunk_size, xs), chunks=(chunk_size, None), dtype="float")

    for i in range(n):
        row = np.arange(xs, dtype="float")
        z[i, :] = row

        if (i + 1) % chunk_size == 0:  # time to add a new chunk
            a, b = z.shape
            z.resize(a + chunk_size, b)

    z.resize(n, xs)


def in_memory():
    return iterate(None)


def disc():
    shutil.rmtree('data/example.zarr')
    store = zarr.DirectoryStore("data/example.zarr")
    return iterate(store)


def google_cloud():
    gcs = gcsfs.GCSFileSystem(TEST_PROJECT, token=TEST_GOOGLE_SERVICE_ACCOUNT_INFO)
    root = os.path.join(TEST_BUCKET, "sandeep/example.zarr")
    for f in gcs.find(root):
        gcs.rm(f)

    store = gcsfs.GCSMap(root, gcs=gcs, check=False)
    return iterate(store)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument("--memory", action="store_true")
    group.add_argument("--disc", action="store_true")
    group.add_argument("--gcs", action="store_true")
    args = parser.parse_args()

    if args.memory:
        dur = in_memory()
    elif args.disc:
        dur = disc()
    elif args.gcs:
        dur = google_cloud()
    else:
        raise ValueError("Please specify an option")

    print(f"Time taken {dur:.6f}")

Results:

$ ./foo.py --memory
Time taken 0.018762
$ ./foo.py --disc
Time taken 0.070137
$ ./foo.py --gcs
Time taken 54.315994

Above is 100 * 2 matrix, so 200 floats.

As you can see, this naive method of appending rows to zarr is clearly not the right way to do. (for reference, if I manually upload example.zarr to gcloud using gsutil it takes ~1.6secs). My guess is that everytime I do z[i, :] = row, it does a gcs write and that is destroying the performance.

So my major question is:

  • what is the right model for streaming data to zarr archive in gcs?

PS: A quickly look at strace ./foo.py --gcs showed a lot this:

futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 29, {1598977280, 926829000}, ffffffff) = 0
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a0, FUTEX_WAKE_PRIVATE, 1) = 1
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 31, {1598977280, 926990000}, ffffffff) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a0, FUTEX_WAKE_PRIVATE, 1) = 1
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 33, {1598977280, 927165000}, ffffffff) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 35, {1598977280, 927344000}, ffffffff) = 0
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 37, {1598977280, 927526000}, ffffffff) = 0
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
futex(0x7f1edf5db4a4, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x7f1edf5db4a0, {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 1
futex(0xf4c7a0, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 0, {1598977290, 923842000}, ffffffff) = 0
futex(0x7f1edf5db4a4, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 41, {1598977281, 292908000}, ffffffff) = -1 EAGAIN (Resource temporarily unavailable)
futex(0x7f1edf5db460, FUTEX_WAKE_PRIVATE, 1) = 0
sendto(5, "\0", 1, 0, NULL, 0)          = 1

I know zarr supports parallel writes to archive. Are these futex calls because of those?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions