Skip to content

Commit 8a682e3

Browse files
committed
chore: maybe java idk
1 parent 6f38f39 commit 8a682e3

File tree

1 file changed

+94
-28
lines changed

1 file changed

+94
-28
lines changed

templates/java/api_helpers.mustache

Lines changed: 94 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,91 @@ public CompletableFuture<List<SearchForFacetValuesResponse>> searchForFacetsAsyn
585585
);
586586
}
587587
588+
/**
589+
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
590+
*
591+
* @summary Helper: Chunks the given `objects` list in subset of 1000 elements max in order to
592+
* make it fit in `batch` requests.
593+
* @param indexName - The `indexName` to replace `objects` in.
594+
* @param objects - The array of `objects` to store in the given Algolia `indexName`.
595+
* @param action - The `batch` `action` to perform on the given array of `objects`.
596+
* @param waitForTasks - Whether or not we should wait until every `batch` tasks has been
597+
* processed, this operation may slow the total execution time of this method but is more
598+
* reliable.
599+
* @param batchSize - The size of the chunk of `objects`. The number of `batch` calls will be
600+
* equal to `length(objects) / batchSize`. Defaults to 1000.
601+
* @param referenceIndexName - This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).
602+
* @param requestOptions - The requestOptions to send along with the query, they will be forwarded
603+
* to the `getTask` method and merged with the transporter requestOptions.
604+
*/
605+
public <T> List<WatchResponse> chunkedPush(
606+
String indexName,
607+
Iterable<T> objects,
608+
Action action,
609+
boolean waitForTasks,
610+
int batchSize,
611+
String referenceIndexName,
612+
RequestOptions requestOptions
613+
) {
614+
if (this.ingestionTransporter == null) {
615+
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
616+
}
617+
618+
List<WatchResponse> responses = new ArrayList<>();
619+
Iterable<T> records = new ArrayList<>();
620+
621+
for (T item : objects) {
622+
if (requests.size() == batchSize) {
623+
WatchResponse watch = this.ingestionTransporter.push(
624+
indexName,
625+
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
626+
waitForTasks,
627+
null,
628+
requestOptions
629+
);
630+
responses.add(watch);
631+
records.clear();
632+
}
633+
634+
records.add(item);
635+
}
636+
637+
if (records.size() > 0) {
638+
WatchResponse watch = this.ingestionTransporter.push(
639+
indexName,
640+
new PushTaskPayload().setAction(action).setRecords(this.objectsToPushTaskRecords(records)),
641+
waitForTasks,
642+
null,
643+
requestOptions
644+
);
645+
responses.add(watch);
646+
}
647+
648+
if (waitForTasks) {
649+
responses.forEach(response -> TaskUtils.retryUntil(
650+
() -> {
651+
try {
652+
return this.ingestionTransporter.getEvent(response.runID, response.eventID);
653+
} catch (AlgoliaApiException e) {
654+
if (e.getStatusCode() == 404) {
655+
return null;
656+
}
657+
658+
throw e;
659+
}
660+
},
661+
(Event response) -> {
662+
return response != null;
663+
},
664+
50,
665+
null
666+
)
667+
);
668+
}
669+
670+
return responses;
671+
}
672+
588673
/**
589674
* Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
590675
* in `batch` requests.
@@ -730,17 +815,7 @@ public <T> List<WatchResponse> saveObjectsWithTransformation(
730815
int batchSize,
731816
RequestOptions requestOptions
732817
) {
733-
if (this.ingestionTransporter == null) {
734-
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
735-
}
736-
737-
return this.ingestionTransporter.push(
738-
indexName,
739-
new PushTaskPayload().setAction(com.algolia.model.ingestion.Action.ADD_OBJECT).setRecords(this.objectsToPushTaskRecords(objects)),
740-
waitForTasks,
741-
null,
742-
requestOptions
743-
);
818+
return chunkedPush(indexName, objects, Action.ADD_OBJECT, waitForTasks, batchSize, null, requestOptions);
744819
}
745820

746821
private <T> List<PushTaskRecords> objectsToPushTaskRecords(Iterable<T> objects) {
@@ -970,23 +1045,14 @@ public <T> List<WatchResponse> partialUpdateObjectsWithTransformation(
9701045
int batchSize,
9711046
RequestOptions requestOptions
9721047
) {
973-
if (this.ingestionTransporter == null) {
974-
throw new AlgoliaRuntimeException("`setTransformationRegion` must have been called before calling this method.");
975-
}
976-
977-
return this.ingestionTransporter.push(
978-
indexName,
979-
new PushTaskPayload()
980-
.setAction(
981-
createIfNotExists
982-
? com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT
983-
: com.algolia.model.ingestion.Action.PARTIAL_UPDATE_OBJECT_NO_CREATE
984-
)
985-
.setRecords(this.objectsToPushTaskRecords(objects)),
986-
waitForTasks,
987-
null,
988-
requestOptions
989-
);
1048+
return chunkedPush(
1049+
indexName,
1050+
objects,
1051+
createIfNotExists ? Action.PARTIAL_UPDATE_OBJECT : Action.PARTIAL_UPDATE_OBJECT_NO_CREATE,
1052+
waitForTasks,
1053+
batchSize,
1054+
requestOptions
1055+
);
9901056
}
9911057

9921058
/**

0 commit comments

Comments
 (0)