diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 2742de01e0b83..17afcba67df58 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -20,6 +20,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; @@ -728,6 +729,14 @@ public UpdateRequest doc(Object... source) { return this; } + /** + * Sets the doc to use for updates when a script is not specified. The doc is provided in a bytes form. + */ + public UpdateRequest doc(BytesReference source, XContentType contentType) { + safeDoc().source(source, contentType); + return this; + } + /** * Sets the doc to use for updates when a script is not specified, the doc provided * is a field and value pairs. diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index 4408d13303d2a..9d52231835ff0 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -37,19 +38,21 @@ public TransportGetAsyncSearchAction(TransportService transportService, ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool) { + ThreadPool threadPool, + BigArrays bigArrays) { super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new); this.transportService = transportService; - this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); + this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool, bigArrays); } static AsyncResultsService createResultsService(TransportService transportService, ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool) { + ThreadPool threadPool, + BigArrays bigArrays) { AsyncTaskIndexService store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, - threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays); return new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener, transportService.getTaskManager(), clusterService); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java index 6f1dedb01f20a..3ee66fe918ecf 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -37,16 +38,17 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, - threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays); } @Override diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 07c3392da0287..7f8e72a134b14 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.engine.DocumentMissingException; @@ -64,14 +65,15 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService, Client client, NodeClient nodeClient, SearchService searchService, - TransportSearchAction searchAction) { + TransportSearchAction searchAction, + BigArrays bigArrays) { super(SubmitAsyncSearchAction.NAME, transportService, actionFilters, SubmitAsyncSearchRequest::new); this.nodeClient = nodeClient; this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction(); this.searchAction = searchAction; this.threadContext = transportService.getThreadPool().getThreadContext(); this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadContext, client, - ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry); + ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry, bigArrays); } @Override diff --git a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java index 337c417a4fff8..9eca15005a8f8 100644 --- a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java +++ b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.async; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -24,9 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; -import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import java.util.ArrayList; import java.util.Collection; @@ -76,21 +75,12 @@ public Collection createComponents( List components = new ArrayList<>(); if (DiscoveryNode.canContainData(environment.settings())) { // only data nodes should be eligible to run the maintenance service. - AsyncTaskIndexService indexService = new AsyncTaskIndexService<>( - XPackPlugin.ASYNC_RESULTS_INDEX, - clusterService, - threadPool.getThreadContext(), - client, - ASYNC_SEARCH_ORIGIN, - AsyncSearchResponse::new, - namedWriteableRegistry - ); AsyncTaskMaintenanceService maintenanceService = new AsyncTaskMaintenanceService( clusterService, nodeEnvironment.nodeId(), settings, threadPool, - indexService + new OriginSettingClient(client, ASYNC_SEARCH_ORIGIN) ); components.add(maintenanceService); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index d6dca84f5cbc1..085a327394940 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -26,6 +26,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.Tuple; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; @@ -116,6 +118,7 @@ static XContentBuilder mappings() throws IOException { private final SecurityContext securityContext; private final NamedWriteableRegistry registry; private final Writeable.Reader reader; + private final BigArrays bigArrays; public AsyncTaskIndexService(String index, @@ -124,7 +127,8 @@ public AsyncTaskIndexService(String index, Client client, String origin, Writeable.Reader reader, - NamedWriteableRegistry registry) { + NamedWriteableRegistry registry, + BigArrays bigArrays) { this.index = index; this.clusterService = clusterService; this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext); @@ -132,6 +136,7 @@ public AsyncTaskIndexService(String index, this.clientWithOrigin = new OriginSettingClient(client, origin); this.registry = registry; this.reader = reader; + this.bigArrays = bigArrays; } /** @@ -190,20 +195,25 @@ public Authentication getAuthentication() { public void createResponse(String docId, Map headers, R response, - ActionListener listener) throws IOException { - createIndexIfNecessary(listener.delegateFailure((ignored, l) -> { - // TODO: Integrate with circuit breaker + ActionListener outerListener) throws IOException { + createIndexIfNecessary(outerListener.delegateFailure((listener, ignored) -> { try { - final XContentBuilder source = XContentFactory.jsonBuilder() + final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking()); + final XContentBuilder source = XContentFactory.jsonBuilder(buffer); + listener = ActionListener.runBefore(listener, source::close); + source .startObject() .field(HEADERS_FIELD, headers) .field(EXPIRATION_TIME_FIELD, response.getExpirationTime()) .directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os)) .endObject(); + // do not close the buffer or the XContentBuilder until the IndexRequest is completed (i.e., listener is notified); + // otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage. + source.flush(); final IndexRequest indexRequest = new IndexRequest(index) .create(true) .id(docId) - .source(source); + .source(buffer.bytes(), source.contentType()); clientWithOrigin.index(indexRequest, listener); } catch (Exception e) { listener.onFailure(e); @@ -217,19 +227,24 @@ public void createResponse(String docId, public void updateResponse(String docId, Map> responseHeaders, R response, - ActionListener listener) { - createIndexIfNecessary(listener.delegateFailure((ignored, l) -> { + ActionListener outerListener) { + createIndexIfNecessary(outerListener.delegateFailure((listener, ignored) -> { try { - // TODO: Integrate with circuit breaker - final XContentBuilder source = XContentFactory.jsonBuilder() + final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking()); + final XContentBuilder source = XContentFactory.jsonBuilder(buffer); + listener = ActionListener.runBefore(listener, source::close); + source .startObject() .field(RESPONSE_HEADERS_FIELD, responseHeaders) .directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os)) .endObject(); + // do not close the buffer or the XContentBuilder until the UpdateRequest is completed (i.e., listener is notified); + // otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage. + source.flush(); final UpdateRequest request = new UpdateRequest() .index(index) .id(docId) - .doc(source) + .doc(buffer.bytes(), source.contentType()) .retryOnConflict(5); clientWithOrigin.update(request, listener); } catch (Exception e) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java index 5112f3970e72b..9877d25c7e8c8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -55,7 +56,7 @@ public class AsyncTaskMaintenanceService extends AbstractLifecycleComponent impl private final String index; private final String localNodeId; private final ThreadPool threadPool; - private final AsyncTaskIndexService indexService; + private final Client clientWithOrigin; private final TimeValue delay; private boolean isCleanupRunning; @@ -65,12 +66,12 @@ public AsyncTaskMaintenanceService(ClusterService clusterService, String localNodeId, Settings nodeSettings, ThreadPool threadPool, - AsyncTaskIndexService indexService) { + Client clientWithOrigin) { this.clusterService = clusterService; this.index = XPackPlugin.ASYNC_RESULTS_INDEX; this.localNodeId = localNodeId; this.threadPool = threadPool; - this.indexService = indexService; + this.clientWithOrigin = clientWithOrigin; this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings); } @@ -125,8 +126,7 @@ synchronized void executeNextCleanup() { long nowInMillis = System.currentTimeMillis(); DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index) .setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis)); - indexService.getClientWithOrigin() - .execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup)); + clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java index f204d8039ecad..d178b41f918fa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/TransportDeleteAsyncResultAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -34,13 +35,14 @@ public TransportDeleteAsyncResultAction(TransportService transportService, ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool) { + ThreadPool threadPool, + BigArrays bigArrays) { super(DeleteAsyncResultAction.NAME, transportService, actionFilters, DeleteAsyncResultRequest::new); this.transportService = transportService; this.clusterService = clusterService; AsyncTaskIndexService store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, - (in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry); + (in) -> {throw new UnsupportedOperationException("Reading is not supported during deletion");}, registry, bigArrays); this.deleteResultsService = new DeleteAsyncResultsService(store, transportService.getTaskManager()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index f083ed920c2aa..8af60129696de 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -123,9 +124,10 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, public void setup() { clusterService = getInstanceFromNode(ClusterService.class); TransportService transportService = getInstanceFromNode(TransportService.class); + BigArrays bigArrays = getInstanceFromNode(BigArrays.class); taskManager = transportService.getTaskManager(); indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), - client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry()); + client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry(), bigArrays); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java index 8ffb94a927efc..fc8c7fa597a48 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java @@ -11,8 +11,15 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerStats; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; @@ -85,9 +92,10 @@ public String toString() { @Before public void setup() { ClusterService clusterService = getInstanceFromNode(ClusterService.class); + BigArrays bigArrays = getInstanceFromNode(BigArrays.class); TransportService transportService = getInstanceFromNode(TransportService.class); indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), - client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry()); + client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry(), bigArrays); } public void testEncodeSearchResponse() throws IOException { @@ -133,4 +141,127 @@ public void testEncodeSearchResponse() throws IOException { } } } + + static class AdjustableLimitCircuitBreaker extends NoopCircuitBreaker { + private long used = 0; + private long limit = 0; + + AdjustableLimitCircuitBreaker(String name) { + super(name); + } + + @Override + public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + if (bytes <= 0) { + addWithoutBreaking(bytes); + } else { + if (used + bytes > limit) { + throw new CircuitBreakingException("Current used [" + used + "] and requesting bytes [" + bytes + "] " + + "is greater than the limit [" + limit + "]", Durability.TRANSIENT); + } + used += bytes; + } + } + + @Override + public void addWithoutBreaking(long bytes) { + used += bytes; + } + + @Override + public long getUsed() { + return used; + } + + @Override + public long getLimit() { + return limit; + } + + void adjustLimit(long limit) { + if (limit < used) { + throw new IllegalArgumentException("Limit must not be smaller than used; used=" + used + "; limit=" + limit); + } + this.limit = limit; + } + } + + public void testCircuitBreaker() throws Exception { + AdjustableLimitCircuitBreaker circuitBreaker = new AdjustableLimitCircuitBreaker("test"); + CircuitBreakerService circuitBreakerService = new CircuitBreakerService() { + @Override + public CircuitBreaker getBreaker(String name) { + assertThat(name, equalTo(CircuitBreaker.REQUEST)); + return circuitBreaker; + } + + @Override + public AllCircuitBreakerStats stats() { + return null; + } + + @Override + public CircuitBreakerStats stats(String name) { + return null; + } + }; + BigArrays bigArrays = new BigArrays(null, circuitBreakerService, CircuitBreaker.REQUEST); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + TransportService transportService = getInstanceFromNode(TransportService.class); + indexService = new AsyncTaskIndexService<>("test", clusterService, transportService.getThreadPool().getThreadContext(), + client(), ASYNC_SEARCH_ORIGIN, TestAsyncResponse::new, writableRegistry(), bigArrays); + + AsyncExecutionId executionId = new AsyncExecutionId( + Long.toString(randomNonNegativeLong()), + new TaskId(randomAlphaOfLength(10), randomNonNegativeLong())); + long expirationTime = randomLong(); + String testMessage = randomAlphaOfLength(10); + { + circuitBreaker.adjustLimit(randomIntBetween(1, 64)); // small limit + TestAsyncResponse initialResponse = new TestAsyncResponse(testMessage, expirationTime); + PlainActionFuture createFuture = new PlainActionFuture<>(); + indexService.createResponse(executionId.getDocId(), Collections.emptyMap(), initialResponse, createFuture); + expectThrows(CircuitBreakingException.class, createFuture::actionGet); + assertThat(circuitBreaker.getUsed(), equalTo(0L)); + } + { + circuitBreaker.adjustLimit(randomIntBetween(16 * 1024, 1024 * 1024)); // large enough + TestAsyncResponse initialResponse = new TestAsyncResponse(testMessage, expirationTime); + PlainActionFuture createFuture = new PlainActionFuture<>(); + indexService.createResponse(executionId.getDocId(), Collections.emptyMap(), initialResponse, createFuture); + assertThat(createFuture.actionGet().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertThat(circuitBreaker.getUsed(), equalTo(0L)); + if (randomBoolean()) { + PlainActionFuture getFuture = new PlainActionFuture<>(); + indexService.getResponse(executionId, randomBoolean(), getFuture); + assertThat(getFuture.actionGet(), equalTo(initialResponse)); + } + } + + int updates = randomIntBetween(1, 5); + for (int u = 0; u < updates; u++) { + if (randomBoolean()) { + circuitBreaker.adjustLimit(randomIntBetween(16 * 1024, 1024 * 1024)); + testMessage = randomAlphaOfLength(10); + TestAsyncResponse updateResponse = new TestAsyncResponse(testMessage, randomLong()); + PlainActionFuture updateFuture = new PlainActionFuture<>(); + indexService.updateResponse(executionId.getDocId(), Collections.emptyMap(), updateResponse, updateFuture); + updateFuture.actionGet(); + assertThat(circuitBreaker.getUsed(), equalTo(0L)); + } else { + circuitBreaker.adjustLimit(randomIntBetween(1, 64)); // small limit + PlainActionFuture updateFuture = new PlainActionFuture<>(); + TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(100), randomLong()); + indexService.updateResponse(executionId.getDocId(), Collections.emptyMap(), updateResponse, updateFuture); + expectThrows(CircuitBreakingException.class, updateFuture::actionGet); + assertThat(circuitBreaker.getUsed(), equalTo(0L)); + } + if (randomBoolean()) { + PlainActionFuture getFuture = new PlainActionFuture<>(); + indexService.getResponse(executionId, randomBoolean(), getFuture); + assertThat(getFuture.actionGet().test, equalTo(testMessage)); + assertThat(getFuture.actionGet().expirationTimeMillis, equalTo(expirationTime)); + } + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 74b2a56c2132d..a24448afe5131 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -37,9 +38,10 @@ public class AsyncTaskServiceTests extends ESSingleNodeTestCase { public void setup() { ClusterService clusterService = getInstanceFromNode(ClusterService.class); TransportService transportService = getInstanceFromNode(TransportService.class); + BigArrays bigArrays = getInstanceFromNode(BigArrays.class); indexService = new AsyncTaskIndexService<>(index, clusterService, transportService.getThreadPool().getThreadContext(), - client(), "test_origin", AsyncSearchResponse::new, writableRegistry()); + client(), "test_origin", AsyncSearchResponse::new, writableRegistry(), bigArrays); } public void testEnsuredAuthenticatedUserIsSame() throws IOException { diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java index 744228e507897..6d5914ab9ed01 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementService.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -107,13 +108,14 @@ public String getDescription() { public AsyncTaskManagementService(String index, Client client, String origin, NamedWriteableRegistry registry, TaskManager taskManager, String action, AsyncOperation operation, Class taskClass, ClusterService clusterService, - ThreadPool threadPool) { + ThreadPool threadPool, + BigArrays bigArrays) { this.taskManager = taskManager; this.action = action; this.operation = operation; this.taskClass = taskClass; this.asyncTaskIndexService = new AsyncTaskIndexService<>(index, clusterService, threadPool.getThreadContext(), client, - origin, i -> new StoredAsyncResponse<>(operation::readResponse, i), registry); + origin, i -> new StoredAsyncResponse<>(operation::readResponse, i), registry, bigArrays); this.clusterService = clusterService; this.threadPool = threadPool; } diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java index 5e847e9af876e..8f9a231e2f1c6 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetResultAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -41,10 +42,11 @@ public TransportEqlAsyncGetResultAction(TransportService transportService, ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool) { + ThreadPool threadPool, + BigArrays bigArrays) { super(EqlAsyncActionNames.EQL_ASYNC_GET_RESULT_ACTION_NAME, transportService, actionFilters, GetAsyncResultRequest::new); this.transportService = transportService; - this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool); + this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool, bigArrays); } static AsyncResultsService> createResultsService( @@ -52,10 +54,11 @@ static AsyncResultsService ClusterService clusterService, NamedWriteableRegistry registry, Client client, - ThreadPool threadPool) { + ThreadPool threadPool, + BigArrays bigArrays) { Writeable.Reader> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); AsyncTaskIndexService> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, - clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); + clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry, bigArrays); return new AsyncResultsService<>(store, false, EqlSearchTask.class, (task, listener, timeout) -> AsyncTaskManagementService.addCompletionListener(threadPool, task, listener, timeout), transportService.getTaskManager(), clusterService); diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java index f7b46a7a83c00..b2514a947112c 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlAsyncGetStatusAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -40,17 +41,18 @@ public class TransportEqlAsyncGetStatusAction extends HandledTransportAction> reader = in -> new StoredAsyncResponse<>(EqlSearchResponse::new, in); this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, - threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry); + threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, reader, registry, bigArrays); } @Override diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index c6c231be39e11..06d19d6552c3f 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateUtils; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.fetch.subphase.FieldAndFormat; @@ -64,7 +65,7 @@ public class TransportEqlSearchAction extends HandledTransportAction(XPackPlugin.ASYNC_RESULTS_INDEX, client, ASYNC_SEARCH_ORIGIN, - registry, taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, threadPool); + registry, taskManager, EqlSearchAction.INSTANCE.name(), this, EqlSearchTask.class, clusterService, threadPool, bigArrays); } @Override diff --git a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java index f2743122ec03b..4a3c51be0528d 100644 --- a/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java +++ b/x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/async/AsyncTaskManagementServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -129,9 +130,10 @@ public TestResponse readResponse(StreamInput inputStream) throws IOException { public void setup() { clusterService = getInstanceFromNode(ClusterService.class); transportService = getInstanceFromNode(TransportService.class); + BigArrays bigArrays = getInstanceFromNode(BigArrays.class); AsyncTaskIndexService> store = new AsyncTaskIndexService<>(index, clusterService, transportService.getThreadPool().getThreadContext(), client(), "test", - in -> new StoredAsyncResponse<>(TestResponse::new, in), writableRegistry()); + in -> new StoredAsyncResponse<>(TestResponse::new, in), writableRegistry(), bigArrays); results = new AsyncResultsService<>(store, true, TestTask.class, (task, listener, timeout) -> addCompletionListener(transportService.getThreadPool(), task, listener, timeout), transportService.getTaskManager(), clusterService); @@ -147,8 +149,10 @@ public void shutdownExec() { private AsyncTaskManagementService createManagementService( AsyncTaskManagementService.AsyncOperation operation) { + BigArrays bigArrays = getInstanceFromNode(BigArrays.class); return new AsyncTaskManagementService<>(index, client(), "test_origin", writableRegistry(), - transportService.getTaskManager(), "test_action", operation, TestTask.class, clusterService, transportService.getThreadPool()); + transportService.getTaskManager(), "test_action", operation, TestTask.class, clusterService, transportService.getThreadPool(), + bigArrays); } public void testReturnBeforeTimeout() throws Exception {