Skip to content

Commit 019ecc8

Browse files
committed
refactors CodecPipelines
1 parent 9405fda commit 019ecc8

File tree

10 files changed

+688
-502
lines changed

10 files changed

+688
-502
lines changed

src/zarr/abc/codec.py

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
from __future__ import annotations
22

33
from abc import abstractmethod
4-
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable, Optional, Tuple, TypeVar
4+
from typing import (
5+
TYPE_CHECKING,
6+
Awaitable,
7+
Callable,
8+
Iterable,
9+
Optional,
10+
Protocol,
11+
Tuple,
12+
TypeVar,
13+
runtime_checkable,
14+
)
515

616
import numpy as np
717
from zarr.abc.metadata import Metadata
818

919
from zarr.common import ArraySpec, concurrent_map
10-
from zarr.store import StorePath
1120

1221

1322
if TYPE_CHECKING:
@@ -21,18 +30,45 @@
2130

2231

2332
def noop_for_none(
33+
<<<<<<< HEAD:src/zarr/abc/codec.py
2434
func: Callable[[Optional[T], ArraySpec, RuntimeConfiguration], Awaitable[U]],
2535
) -> Callable[[T, ArraySpec, RuntimeConfiguration], Awaitable[U]]:
36+
=======
37+
func: Callable[[T, ArraySpec, RuntimeConfiguration], Awaitable[Optional[U]]],
38+
) -> Callable[[Optional[T], ArraySpec, RuntimeConfiguration], Awaitable[Optional[U]]]:
39+
>>>>>>> 51d3c921 (refactors CodecPipelines):src/zarr/v3/abc/codec.py
2640
async def wrap(
2741
chunk: Optional[T], chunk_spec: ArraySpec, runtime_configuration: RuntimeConfiguration
28-
) -> U:
42+
) -> Optional[U]:
2943
if chunk is None:
3044
return None
3145
return await func(chunk, chunk_spec, runtime_configuration)
3246

3347
return wrap
3448

3549

50+
@runtime_checkable
51+
class ByteGetter(Protocol):
52+
async def get(
53+
self, byte_range: Optional[Tuple[int, Optional[int]]] = None
54+
) -> Optional[BytesLike]:
55+
...
56+
57+
58+
@runtime_checkable
59+
class ByteSetter(Protocol):
60+
async def get(
61+
self, byte_range: Optional[Tuple[int, Optional[int]]] = None
62+
) -> Optional[BytesLike]:
63+
...
64+
65+
async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None:
66+
...
67+
68+
async def delete(self) -> None:
69+
...
70+
71+
3672
class Codec(Metadata):
3773
is_fixed_size: bool
3874

@@ -62,9 +98,9 @@ async def decode(
6298

6399
async def decode_batch(
64100
self,
65-
chunk_arrays_and_specs: Iterable[Tuple[np.ndarray, ArraySpec]],
101+
chunk_arrays_and_specs: Iterable[Tuple[Optional[np.ndarray], ArraySpec]],
66102
runtime_configuration: RuntimeConfiguration,
67-
) -> Iterable[np.ndarray]:
103+
) -> Iterable[Optional[np.ndarray]]:
68104
return await concurrent_map(
69105
[
70106
(chunk_array, chunk_spec, runtime_configuration)
@@ -110,9 +146,9 @@ async def decode(
110146

111147
async def decode_batch(
112148
self,
113-
chunk_bytes_and_specs: Iterable[Tuple[BytesLike, ArraySpec]],
149+
chunk_bytes_and_specs: Iterable[Tuple[Optional[BytesLike], ArraySpec]],
114150
runtime_configuration: RuntimeConfiguration,
115-
) -> Iterable[np.ndarray]:
151+
) -> Iterable[Optional[np.ndarray]]:
116152
return await concurrent_map(
117153
[
118154
(chunk_bytes, chunk_spec, runtime_configuration)
@@ -150,7 +186,7 @@ class ArrayBytesCodecPartialDecodeMixin:
150186
@abstractmethod
151187
async def decode_partial(
152188
self,
153-
store_path: StorePath,
189+
byte_getter: ByteGetter,
154190
selection: SliceSelection,
155191
chunk_spec: ArraySpec,
156192
runtime_configuration: RuntimeConfiguration,
@@ -159,13 +195,13 @@ async def decode_partial(
159195

160196
async def decode_partial_batched(
161197
self,
162-
batch_info: Iterable[Tuple[StorePath, SliceSelection, ArraySpec]],
198+
batch_info: Iterable[Tuple[ByteGetter, SliceSelection, ArraySpec]],
163199
runtime_configuration: RuntimeConfiguration,
164200
) -> Iterable[Optional[np.ndarray]]:
165201
return await concurrent_map(
166202
[
167-
(store_path, selection, chunk_spec, runtime_configuration)
168-
for store_path, selection, chunk_spec in batch_info
203+
(byte_getter, selection, chunk_spec, runtime_configuration)
204+
for byte_getter, selection, chunk_spec in batch_info
169205
],
170206
self.decode_partial,
171207
runtime_configuration.concurrency,
@@ -176,7 +212,7 @@ class ArrayBytesCodecPartialEncodeMixin:
176212
@abstractmethod
177213
async def encode_partial(
178214
self,
179-
store_path: StorePath,
215+
byte_setter: ByteSetter,
180216
chunk_array: np.ndarray,
181217
selection: SliceSelection,
182218
chunk_spec: ArraySpec,
@@ -186,13 +222,13 @@ async def encode_partial(
186222

187223
async def encode_partial_batched(
188224
self,
189-
batch_info: Iterable[Tuple[StorePath, np.ndarray, SliceSelection, ArraySpec]],
225+
batch_info: Iterable[Tuple[ByteSetter, np.ndarray, SliceSelection, ArraySpec]],
190226
runtime_configuration: RuntimeConfiguration,
191227
) -> None:
192228
await concurrent_map(
193229
[
194-
(store_path, chunk_array, selection, chunk_spec, runtime_configuration)
195-
for store_path, chunk_array, selection, chunk_spec in batch_info
230+
(byte_setter, chunk_array, selection, chunk_spec, runtime_configuration)
231+
for byte_setter, chunk_array, selection, chunk_spec in batch_info
196232
],
197233
self.encode_partial,
198234
runtime_configuration.concurrency,
@@ -211,9 +247,9 @@ async def decode(
211247

212248
async def decode_batch(
213249
self,
214-
chunk_bytes_and_specs: Iterable[Tuple[BytesLike, ArraySpec]],
250+
chunk_bytes_and_specs: Iterable[Tuple[Optional[BytesLike], ArraySpec]],
215251
runtime_configuration: RuntimeConfiguration,
216-
) -> Iterable[BytesLike]:
252+
) -> Iterable[Optional[BytesLike]]:
217253
return await concurrent_map(
218254
[
219255
(chunk_bytes, chunk_spec, runtime_configuration)

src/zarr/array.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ async def getitem(self, selection: Selection):
207207
)
208208

209209
# reading chunks and decoding them
210-
await self.codecs.read_batched(
210+
await self.codecs.read_batch(
211211
[
212212
(
213213
self.store_path
@@ -253,7 +253,7 @@ async def setitem(self, selection: Selection, value: np.ndarray) -> None:
253253
value = value.astype(self.metadata.dtype, order="A")
254254

255255
# merging with existing data and encoding chunks
256-
await self.codecs.write_batched(
256+
await self.codecs.write_batch(
257257
[
258258
(
259259
self.store_path

src/zarr/codecs/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
from zarr.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation # noqa: F401
88
from zarr.codecs.transpose import TransposeCodec # noqa: F401
99
from zarr.codecs.zstd import ZstdCodec # noqa: F401
10+
from zarr.codecs.pipeline import CodecPipeline, BatchedCodecPipeline, InterleavedCodecPipeline # noqa: F401

src/zarr/codecs/pipeline/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from __future__ import annotations
2+
3+
from zarr.codecs.pipeline.core import CodecPipeline # noqa: F401
4+
from zarr.codecs.pipeline.batched import BatchedCodecPipeline # noqa: F401
5+
from zarr.codecs.pipeline.interleaved import InterleavedCodecPipeline # noqa: F401

0 commit comments

Comments
 (0)