From 34f9a7f97959b578a806fe5ae77fbc3bdff10e5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Aguiar?= Date: Wed, 30 Jul 2025 17:19:28 -0300 Subject: [PATCH] fix: allow configuring max parallel write requests --- CHANGELOG.md | 1 + README.md | 5 +- .../openfga/sdk/api/client/OpenFgaClient.java | 55 +++++++++++++++---- .../api/configuration/ClientWriteOptions.java | 10 ++++ .../sdk/api/client/OpenFgaClientTest.java | 15 +++-- 5 files changed, 68 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ace0859c..e668121e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## [Unreleased](https://github.com/openfga/java-sdk/compare/v0.8.3...HEAD) +- fix: allow configuring maxParallelRequests for non-transaction writes (#187) ## v0.8.3 diff --git a/README.md b/README.md index 91f5bd09..37b5f1fa 100644 --- a/README.md +++ b/README.md @@ -542,7 +542,7 @@ Convenience `WriteTuples` and `DeleteTuples` methods are also available. ###### Non-transaction mode -The SDK will split the writes into separate requests and send them sequentially to avoid violating rate limits. +The SDK will split the writes into smaller transactions and send them with limited parallelization to avoid violating rate limits. > Passing `ClientWriteOptions` with `.disableTransactions(true)` is required to use non-transaction mode. > All other fields of `ClientWriteOptions` are optional. @@ -570,7 +570,8 @@ var options = new ClientWriteOptions() // You can rely on the model id set in the configuration or override it for this specific request .authorizationModelId("01GXSA8YR785C4FYS3C0RTG7B1") .disableTransactions(true) - .transactionChunkSize(5); // Maximum number of requests to be sent in a transaction in a particular chunk + .transactionChunkSize(5) // Maximum number of requests per transaction chunk, defaults to 1 + .maxParallelRequests(5); // Max number of requests to issue in parallel, defaults to 10 var response = fgaClient.write(request, options).get(); ``` diff --git a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java index 540cf7e2..783de265 100644 --- a/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java +++ b/src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java @@ -414,7 +414,9 @@ private CompletableFuture writeNonTransaction( var options = writeOptions != null ? writeOptions - : new ClientWriteOptions().transactionChunkSize(DEFAULT_MAX_METHOD_PARALLEL_REQS); + : new ClientWriteOptions() + .transactionChunkSize(1) + .maxParallelRequests(DEFAULT_MAX_METHOD_PARALLEL_REQS); if (options.getAdditionalHeaders() == null) { options.additionalHeaders(new HashMap<>()); @@ -434,19 +436,50 @@ private CompletableFuture writeNonTransaction( return this.writeTransactions(storeId, emptyTransaction, writeOptions); } - var futureResponse = this.writeTransactions(storeId, transactions.get(0), options); - - for (int i = 1; i < transactions.size(); i++) { - final int index = i; // Must be final in this scope for closure. + int maxParallelRequests = options.getMaxParallelRequests() != null + ? options.getMaxParallelRequests() + : DEFAULT_MAX_METHOD_PARALLEL_REQS; - // The resulting completable future of this chain will result in either: - // 1. The first exception thrown in a failed completion. Other thenCompose() will not be evaluated. - // 2. The final successful ClientWriteResponse. - futureResponse = futureResponse.thenCompose( - _response -> this.writeTransactions(storeId, transactions.get(index), options)); + if (maxParallelRequests <= 1) { + var futureResponse = this.writeTransactions(storeId, transactions.get(0), options); + for (int i = 1; i < transactions.size(); i++) { + final int index = i; + futureResponse = futureResponse.thenCompose( + _response -> this.writeTransactions(storeId, transactions.get(index), options)); + } + return futureResponse; } - return futureResponse; + var executor = Executors.newScheduledThreadPool(maxParallelRequests); + var latch = new CountDownLatch(transactions.size()); + var failure = new AtomicReference(); + var lastResponse = new AtomicReference(); + + Consumer singleWriteRequest = + tx -> this.writeTransactions(storeId, tx, options).whenComplete((response, throwable) -> { + try { + if (throwable != null) { + failure.compareAndSet(null, throwable); + } else { + lastResponse.set(response); + } + } finally { + latch.countDown(); + } + }); + + try { + transactions.forEach(tx -> executor.execute(() -> singleWriteRequest.accept(tx))); + latch.await(); + if (failure.get() != null) { + return CompletableFuture.failedFuture(failure.get()); + } + return CompletableFuture.completedFuture(lastResponse.get()); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } finally { + executor.shutdown(); + } } private Stream> chunksOf(int chunkSize, List list) { diff --git a/src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java b/src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java index b874d815..7b9c21be 100644 --- a/src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java +++ b/src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java @@ -19,6 +19,7 @@ public class ClientWriteOptions implements AdditionalHeadersSupplier { private String authorizationModelId; private Boolean disableTransactions = false; private int transactionChunkSize; + private Integer maxParallelRequests; public ClientWriteOptions additionalHeaders(Map additionalHeaders) { this.additionalHeaders = additionalHeaders; @@ -56,4 +57,13 @@ public ClientWriteOptions transactionChunkSize(int transactionChunkSize) { public int getTransactionChunkSize() { return transactionChunkSize > 0 ? transactionChunkSize : 1; } + + public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) { + this.maxParallelRequests = maxParallelRequests; + return this; + } + + public Integer getMaxParallelRequests() { + return maxParallelRequests; + } } diff --git a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java index f111ac07..23bc17f6 100644 --- a/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java +++ b/src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java @@ -1216,8 +1216,10 @@ public void writeTest_nonTransaction() throws Exception { ClientWriteRequest request = new ClientWriteRequest() .writes(List.of(writeTuple, writeTuple, writeTuple, writeTuple, writeTuple)) .deletes(List.of(tuple, tuple, tuple, tuple, tuple)); - ClientWriteOptions options = - new ClientWriteOptions().disableTransactions(true).transactionChunkSize(2); + ClientWriteOptions options = new ClientWriteOptions() + .disableTransactions(true) + .transactionChunkSize(2) + .maxParallelRequests(1); // When var response = fga.write(request, options).get(); @@ -1284,8 +1286,10 @@ public void writeTest_nonTransactionsWithFailure() { .user(user) .condition(DEFAULT_CONDITION)) .collect(Collectors.toList())); - ClientWriteOptions options = - new ClientWriteOptions().disableTransactions(true).transactionChunkSize(1); + ClientWriteOptions options = new ClientWriteOptions() + .disableTransactions(true) + .transactionChunkSize(1) + .maxParallelRequests(1); // When var execException = assertThrows( @@ -2005,7 +2009,8 @@ public void shouldSplitBatchesSuccessfully(WireMockRuntimeInfo wireMockRuntimeIn .correlationId("cor-3"); ClientBatchCheckRequest request = new ClientBatchCheckRequest().checks(List.of(item1, item2, item3)); - ClientBatchCheckOptions options = new ClientBatchCheckOptions().maxBatchSize(2); + ClientBatchCheckOptions options = + new ClientBatchCheckOptions().maxBatchSize(2).maxParallelRequests(1); // When ClientBatchCheckResponse response = fga.batchCheck(request, options).join();