Skip to content

Commit 247ef3d

Browse files
committed
Allow filtering on chunked queries
They are always AND'ed together, for fairly obvious reasons. This removes the previous metric filter, because users can now add this in the filter field.
1 parent 1d8325d commit 247ef3d

File tree

1 file changed

+12
-12
lines changed

1 file changed

+12
-12
lines changed

src/obelisk/asynchronous/core.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import httpx
66
import json
77
from pydantic import BaseModel, AwareDatetime, ValidationError, model_validator
8-
from typing import AsyncGenerator, Dict, Generator, Iterator, List, Literal, Optional, Any, get_args
8+
from typing import AsyncIterator, Dict, Iterator, List, Literal, Optional, Any, get_args
99
from typing_extensions import Self
1010
from numbers import Number
1111

@@ -90,12 +90,12 @@ def to_dict(self) -> Dict:
9090

9191
class ChunkedParams(BaseModel):
9292
dataset: str
93-
metric: str
9493
groupBy: Optional[List[FieldName]] = None
9594
aggregator: Optional[Aggregator] = None
9695
fields: Optional[List[FieldName]] = None
9796
orderBy: Optional[List[str]] = None # More complex than just FieldName, can be prefixed with - to invert sort
9897
dataType: Optional[DataType] = None
98+
filter: Optional[str] = None
9999
start: datetime
100100
end: datetime
101101
jump: timedelta = timedelta(hours=1)
@@ -112,6 +112,9 @@ def chunks(self) -> Iterator[QueryParams]:
112112
current_start = self.start
113113
while current_start < self.end:
114114
current_end = current_start + self.jump
115+
filter=f'timestamp>={current_start.isoformat()};timestamp<{current_end.isoformat()}'
116+
if self.filter:
117+
filter += f';{self.filter}'
115118

116119
yield QueryParams(
117120
dataset=self.dataset,
@@ -120,7 +123,7 @@ def chunks(self) -> Iterator[QueryParams]:
120123
fields=self.fields,
121124
orderBy=self.orderBy,
122125
dataType=self.dataType,
123-
filter=f'timestamp>={current_start.isoformat()} and timestamp<{current_end.isoformat()} and metric=={self.metric}'
126+
filter=filter
124127
)
125128

126129
current_start += self.jump
@@ -219,12 +222,9 @@ async def query(
219222

220223
async def query_time_chunked(
221224
self,
222-
dataset: str,
223-
params: QueryParams,
224-
from_time: datetime,
225-
to_time: datetime,
226-
jump: timedelta,
227-
filter_: Optional[str] = None,
228-
direction: Literal["asc", "desc"] = "asc",
229-
) -> AsyncGenerator[List[Datapoint], None]:
230-
raise NotImplementedError()
225+
params: ChunkedParams
226+
) -> AsyncIterator[List[Datapoint]]
227+
for chunk in params.chunks():
228+
yield await self.query(
229+
chunk
230+
)

0 commit comments

Comments
 (0)