Skip to content

Commit f04c413

Browse files
committed
feat: python
1 parent 37fdfa2 commit f04c413

File tree

2 files changed

+76
-27
lines changed

2 files changed

+76
-27
lines changed

templates/python/api.mustache

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ from algoliasearch.search.models import (
1212
SecuredApiKeyRestrictions,
1313
)
1414

15-
from algoliasearch.ingestion.models import WatchResponse
15+
from algoliasearch.ingestion.models import (WatchResponse, Event, PushTaskRecords)
1616
from algoliasearch.ingestion.config import IngestionConfig
1717
from algoliasearch.ingestion.client import (IngestionClient, IngestionClientSync)
1818
{{/isSearchClient}}

templates/python/search_helpers.mustache

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
"`apiKey` is required when waiting for an `update` operation."
7575
)
7676

77-
{{^isSyncClient}}async {{/isSyncClient}}def _func(_prev: Optional[GetApiKeyResponse]) -> GetApiKeyResponse:
77+
{{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[GetApiKeyResponse]) -> GetApiKeyResponse:
7878
try:
7979
return {{^isSyncClient}}await {{/isSyncClient}}self.get_api_key(key=key, request_options=request_options)
8080
except RequestException as e:
@@ -209,7 +209,7 @@
209209
page = search_synonyms_params.page or 0
210210
search_synonyms_params.hits_per_page = hits_per_page
211211

212-
{{^isSyncClient}}async {{/isSyncClient}}def _func(_prev: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse:
212+
{{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[SearchSynonymsResponse]) -> SearchSynonymsResponse:
213213
nonlocal page
214214
resp = {{^isSyncClient}}await {{/isSyncClient}}self.search_synonyms(
215215
index_name=index_name,
@@ -303,19 +303,7 @@
303303
"""
304304
Helper: Similar to the `save_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client's config at instantiation.
305305
"""
306-
if self._ingestion_transporter is None:
307-
raise ValueError("`region` must be provided at client instantiation before calling this method.")
308-
309-
return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.push(
310-
index_name=index_name,
311-
push_task_payload={
312-
"action": Action.ADDOBJECT,
313-
"records": objects,
314-
},
315-
watch=wait_for_tasks,
316-
request_options=request_options,
317-
)
318-
306+
return {{^isSyncClient}}await {{/isSyncClient}}self.chunked_push(index_name=index_name, objects=objects, action=Action.ADDOBJECT, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options)
319307
320308
{{^isSyncClient}}async {{/isSyncClient}}def delete_objects(
321309
self,
@@ -356,19 +344,80 @@
356344
"""
357345
Helper: Similar to the `partial_update_objects` method but requires a Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/) to be created first, in order to transform records before indexing them to Algolia. The `region` must've been passed to the client instantiation method.
358346
"""
359-
if self._ingestion_transporter is None:
360-
raise ValueError("`region` must be provided at client instantiation before calling this method.")
347+
return {{^isSyncClient}}await {{/isSyncClient}}self.chunked_push(index_name=index_name, objects=objects, action=Action.PARTIALUPDATEOBJECT if create_if_not_exists else Action.PARTIALUPDATEOBJECTNOCREATE, wait_for_tasks=wait_for_tasks, batch_size=batch_size, request_options=request_options)
361348
362-
return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.push(
363-
index_name=index_name,
364-
push_task_payload={
365-
"action": Action.PARTIALUPDATEOBJECT if create_if_not_exists else Action.PARTIALUPDATEOBJECTNOCREATE,
366-
"records": objects,
367-
},
368-
watch=wait_for_tasks,
369-
request_options=request_options,
370-
)
349+
{{^isSyncClient}}async {{/isSyncClient}}def chunked_push(
350+
self,
351+
index_name: str,
352+
objects: List[Dict[str, Any]],
353+
action: Action = Action.ADDOBJECT,
354+
wait_for_tasks: bool = False,
355+
batch_size: int = 1000,
356+
reference_index_name: Optional[str] = None,
357+
request_options: Optional[Union[dict, RequestOptions]] = None,
358+
) -> List[WatchResponse]:
359+
"""
360+
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
361+
"""
371362
363+
if self._ingestion_transporter is None:
364+
raise ValueError("`region` must be provided at client instantiation before calling this method.")
365+
records: List[PushTaskRecords] = []
366+
responses: List[WatchResponse] = []
367+
for i, obj in enumerate(objects):
368+
records.append(obj) # pyright: ignore
369+
if len(records) == batch_size or i == len(objects) - 1:
370+
responses.append(
371+
{{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.push(
372+
index_name=index_name,
373+
push_task_payload={
374+
"action": action,
375+
"records": records,
376+
},
377+
watch=wait_for_tasks,
378+
reference_index_name=reference_index_name,
379+
request_options=request_options,
380+
)
381+
)
382+
requests = []
383+
if wait_for_tasks:
384+
for response in responses:
385+
{{^isSyncClient}}async {{/isSyncClient}}def _func(_: Optional[Event]) -> Event:
386+
if self._ingestion_transporter is None:
387+
raise ValueError(
388+
"`region` must be provided at client instantiation before calling this method."
389+
)
390+
if response.event_id is None:
391+
raise ValueError(
392+
"received unexpected response from the push endpoint, eventID must not be undefined"
393+
)
394+
try:
395+
return {{^isSyncClient}}await {{/isSyncClient}}self._ingestion_transporter.get_event(run_id=response.run_id, event_id=response.event_id, request_options=request_options)
396+
except RequestException as e:
397+
if e.status_code == 404:
398+
return None # pyright: ignore
399+
raise e
400+
401+
_retry_count = 0
402+
403+
def _aggregator(_: Event | None) -> None:
404+
nonlocal _retry_count
405+
_retry_count += 1
406+
407+
def _validate(_resp: Event | None) -> bool:
408+
return _resp is not None
409+
410+
timeout = RetryTimeout()
411+
412+
{{^isSyncClient}}await {{/isSyncClient}}create_iterable{{#isSyncClient}}_sync{{/isSyncClient}}(
413+
func=_func,
414+
validate=_validate,
415+
aggregator=_aggregator,
416+
timeout=lambda: timeout(_retry_count),
417+
error_validate=lambda _: _retry_count >= 50,
418+
error_message=lambda _: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
419+
)
420+
return responses
372421
373422
{{^isSyncClient}}async {{/isSyncClient}}def chunked_batch(
374423
self,

0 commit comments

Comments
 (0)