diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java new file mode 100644 index 0000000000000..276d3013ed928 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.greaterThan; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class WriteMemoryLimitsIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // Need at least two threads because we are going to block one + .put("thread_pool.write.size", 2) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); + } + + @Override + protected int numberOfReplicas() { + return 1; + } + + @Override + protected int numberOfShards() { + return 1; + } + + public void testWriteBytesAreIncremented() throws Exception { + final String index = "test"; + assertAcked(prepareCreate(index, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); + ensureGreen(index); + + IndicesStatsResponse response = client().admin().indices().prepareStats(index).get(); + String primaryId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(ShardRouting::primary) + .findAny() + .get() + .currentNodeId(); + String replicaId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(sr -> sr.primary() == false) + .findAny() + .get() + .currentNodeId(); + String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName(); + String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName(); + + final CountDownLatch replicationSendPointReached = new CountDownLatch(1); + final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); + final CountDownLatch newActionsSendPointReached = new CountDownLatch(2); + final CountDownLatch latchBlockingReplication = new CountDownLatch(1); + + TransportService primaryService = internalCluster().getInstance(TransportService.class, primaryName); + final MockTransportService primaryTransportService = (MockTransportService) primaryService; + TransportService replicaService = internalCluster().getInstance(TransportService.class, replicaName); + final MockTransportService replicaTransportService = (MockTransportService) replicaService; + + primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(TransportShardBulkAction.ACTION_NAME + "[r]")) { + try { + replicationSendPointReached.countDown(); + latchBlockingReplicationSend.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + final BulkRequest bulkRequest = new BulkRequest(); + int totalRequestSize = 0; + for (int i = 0; i < 80; ++i) { + IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + totalRequestSize += request.ramBytesUsed(); + assertTrue(request.ramBytesUsed() > request.source().length()); + bulkRequest.add(request); + } + + final long bulkRequestSize = bulkRequest.ramBytesUsed(); + final long bulkShardRequestSize = totalRequestSize; + + try { + final ActionFuture successFuture = client(replicaName).bulk(bulkRequest); + replicationSendPointReached.await(); + + WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); + WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + + assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize)); + assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaBytes()); + assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getReplicaBytes()); + + ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); + // Block the replica Write thread pool + replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + newActionsSendPointReached.countDown(); + latchBlockingReplication.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + replicaThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + newActionsSendPointReached.countDown(); + latchBlockingReplication.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + newActionsSendPointReached.await(); + latchBlockingReplicationSend.countDown(); + + IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + final BulkRequest secondBulkRequest = new BulkRequest(); + secondBulkRequest.add(request); + + ActionFuture secondFuture = client(replicaName).bulk(secondBulkRequest); + + final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); + final long secondBulkShardRequestSize = request.ramBytesUsed(); + + assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(), + greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); + + latchBlockingReplication.countDown(); + + successFuture.actionGet(); + secondFuture.actionGet(); + + assertEquals(0, primaryWriteLimits.getCoordinatingBytes()); + assertEquals(0, primaryWriteLimits.getPrimaryBytes()); + assertEquals(0, primaryWriteLimits.getReplicaBytes()); + assertEquals(0, replicaWriteLimits.getCoordinatingBytes()); + assertEquals(0, replicaWriteLimits.getPrimaryBytes()); + assertEquals(0, replicaWriteLimits.getReplicaBytes()); + } finally { + if (replicationSendPointReached.getCount() > 0) { + replicationSendPointReached.countDown(); + } + while (newActionsSendPointReached.getCount() > 0) { + newActionsSendPointReached.countDown(); + } + if (latchBlockingReplicationSend.getCount() > 0) { + latchBlockingReplicationSend.countDown(); + } + if (latchBlockingReplication.getCount() > 0) { + latchBlockingReplication.countDown(); + } + primaryTransportService.clearAllRules(); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java index 227fc1581a730..75cf424df4628 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/support/replication/TransportReplicationActionRetryOnClosedNodeIT.java @@ -118,8 +118,8 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard replica) { - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 11acaca52f84b..679080fb41eb2 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action; +import org.apache.lucene.util.Accountable; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -40,7 +41,7 @@ * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} */ -public interface DocWriteRequest extends IndicesRequest { +public interface DocWriteRequest extends IndicesRequest, Accountable { /** * Set the index for this request diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 348855b7252a5..bcf6ccea3129b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -96,9 +96,11 @@ protected void shardOperationOnPrimary(final ShardRequest shardRequest, final In } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException { - executeShardOperation(shardRequest, replica); - return new ReplicaResult(); + protected void shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, replica); + return new ReplicaResult(); + }); } private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 397ce43747d53..2c5fd39557a0f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -75,10 +75,12 @@ protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShar } @Override - protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, IndexShard replica) { - replica.flush(request.getRequest()); - logger.trace("{} flush request executed on replica", replica.shardId()); - return new ReplicaResult(); + protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + replica.flush(request.getRequest()); + logger.trace("{} flush request executed on replica", replica.shardId()); + return new ReplicaResult(); + }); } // TODO: Remove this transition in 9.0 diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index a0ce70503958c..3485028708d75 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -68,9 +68,12 @@ protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, Ind } @Override - protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica) { - replica.refresh("api"); - logger.trace("{} refresh request executed on replica", replica.shardId()); - return new ReplicaResult(); + protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + replica.refresh("api"); + logger.trace("{} refresh request executed on replica", replica.shardId()); + return new ReplicaResult(); + }); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 6ede31a3f05e2..9edfe2625a24c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -30,7 +32,9 @@ import java.io.IOException; import java.util.Objects; -public class BulkItemRequest implements Writeable { +public class BulkItemRequest implements Writeable, Accountable { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkItemRequest.class); private int id; private DocWriteRequest request; @@ -115,4 +119,9 @@ public void writeThin(StreamOutput out) throws IOException { DocWriteRequest.writeDocumentRequestThin(out, request); out.writeOptionalWriteable(primaryResponse == null ? null : primaryResponse::writeThin); } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + request.ramBytesUsed(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 59f3cfc222a8f..312a906e4faaf 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -55,7 +57,9 @@ * Note that we only support refresh on the bulk request not per item. * @see org.elasticsearch.client.Client#bulk(BulkRequest) */ -public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest { +public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest, Accountable { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class); private static final int REQUEST_OVERHEAD = 50; @@ -373,4 +377,9 @@ private static String valueOrDefault(String value, String globalDefault) { } return value; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 498b510a51472..4b780ee8861b2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; @@ -32,12 +34,14 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.stream.Stream; -public class BulkShardRequest extends ReplicatedWriteRequest { +public class BulkShardRequest extends ReplicatedWriteRequest implements Accountable { public static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0; + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class); - private BulkItemRequest[] items; + private final BulkItemRequest[] items; public BulkShardRequest(StreamInput in) throws IOException { super(in); @@ -164,4 +168,9 @@ public void onRetry() { } } } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + Stream.of(items).mapToLong(Accountable::ramBytesUsed).sum(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index f22cbf1532df9..e443d0292f250 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -59,6 +59,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -111,21 +112,22 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest) { IndexRequest indexRequest = null; if (docWriteRequest instanceof IndexRequest) { indexRequest = (IndexRequest) docWriteRequest; @@ -158,6 +161,17 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { + long indexingBytes = bulkRequest.ramBytesUsed(); + final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes); + final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + try { + doInternalExecute(task, bulkRequest, releasingListener); + } catch (Exception e) { + releasingListener.onFailure(e); + } + } + + protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); @@ -727,7 +741,7 @@ private void processBulkIndexIngestRequest(Task task, BulkRequest original, Acti // before we continue the bulk request we should fork back on a write thread: if (originalThread == Thread.currentThread()) { assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); - doExecute(task, bulkRequest, actionListener); + doInternalExecute(task, bulkRequest, actionListener); } else { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override @@ -737,7 +751,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - doExecute(task, bulkRequest, actionListener); + doInternalExecute(task, bulkRequest, actionListener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 84c2e089d9a2c..d1c77df56b790 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -92,9 +92,10 @@ public class TransportShardBulkAction extends TransportWriteAction> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, @@ -138,6 +139,11 @@ public void onTimeout(TimeValue timeout) { ); } + @Override + protected long primaryOperationSize(BulkShardRequest request) { + return request.ramBytesUsed(); + } + public static void performOnPrimary( BulkShardRequest request, IndexShard primary, @@ -407,11 +413,18 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update } @Override - public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { - final long startBulkTime = System.nanoTime(); - final Translog.Location location = performOnReplica(request, replica); - replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); - return new WriteReplicaResult<>(request, location, null, replica, logger); + protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final long startBulkTime = System.nanoTime(); + final Translog.Location location = performOnReplica(request, replica); + replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); + return new WriteReplicaResult<>(request, location, null, replica, logger); + }); + } + + @Override + protected long replicaOperationSize(BulkShardRequest request) { + return request.ramBytesUsed(); } public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java new file mode 100644 index 0000000000000..84c702f110622 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.common.lease.Releasable; + +import java.util.concurrent.atomic.AtomicLong; + +public class WriteMemoryLimits { + + private final AtomicLong coordinatingBytes = new AtomicLong(0); + private final AtomicLong primaryBytes = new AtomicLong(0); + private final AtomicLong replicaBytes = new AtomicLong(0); + + public Releasable markCoordinatingOperationStarted(long bytes) { + coordinatingBytes.addAndGet(bytes); + return () -> coordinatingBytes.getAndAdd(-bytes); + } + + public long getCoordinatingBytes() { + return coordinatingBytes.get(); + } + + public Releasable markPrimaryOperationStarted(long bytes) { + primaryBytes.addAndGet(bytes); + return () -> primaryBytes.getAndAdd(-bytes); + } + + public long getPrimaryBytes() { + return primaryBytes.get(); + } + + public Releasable markReplicaOperationStarted(long bytes) { + replicaBytes.getAndAdd(bytes); + return () -> replicaBytes.getAndAdd(-bytes); + } + + public long getReplicaBytes() { + return replicaBytes.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index cfae37d701526..af7c24fa70b79 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.delete; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -53,6 +54,8 @@ public class DeleteRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeleteRequest.class); + private static final ShardId NO_SHARD_ID = null; private String id; @@ -261,4 +264,9 @@ private void writeBody(StreamOutput out) throws IOException { public String toString() { return "delete {[" + index + "][" + id + "]}"; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id); + } } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index bc7b388da8e76..ed8caba150d1e 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; @@ -76,6 +77,8 @@ */ public class IndexRequest extends ReplicatedWriteRequest implements DocWriteRequest, CompositeIndicesRequest { + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IndexRequest.class); + /** * Max length of the source document to include into string() * @@ -691,4 +694,9 @@ public void onRetry() { public long getAutoGeneratedTimestamp() { return autoGeneratedTimestamp; } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed()); + } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 94dde512aa105..638371f414111 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -20,6 +20,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.stream.Stream; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -54,10 +56,12 @@ public class TransportResyncReplicationAction extends TransportWriteAction> listener) { ActionListener.completeWith(listener, () -> new WritePrimaryResult<>(performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger)); } + @Override + protected long primaryOperationSize(ResyncReplicationRequest request) { + return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); + } + public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) { return request; } @Override - protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest request, - IndexShard replica) throws Exception { - Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, location, null, replica, logger); + protected void dispatchedShardOperationOnReplica(ResyncReplicationRequest request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + }); + } + + @Override + protected long replicaOperationSize(ResyncReplicationRequest request) { + return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); } public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java index b93298eefe871..835546282d6cd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -19,9 +19,6 @@ package org.elasticsearch.action.support; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; @@ -30,7 +27,6 @@ public final class ChannelActionListener< Response extends TransportResponse, Request extends TransportRequest> implements ActionListener { - private static final Logger logger = LogManager.getLogger(ChannelActionListener.class); private final TransportChannel channel; private final Request request; private final String actionName; @@ -52,12 +48,6 @@ public void onResponse(Response response) { @Override public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - e1.addSuppressed(e); - logger.warn(() -> new ParameterizedMessage( - "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); - } + TransportChannel.sendErrorResponse(channel, actionName, request, e); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 8ca3d8dafce4b..1c05b596d97f0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -213,13 +213,14 @@ protected abstract void shardOperationOnPrimary(Request shardRequest, IndexShard ActionListener> listener); /** - * Synchronously execute the specified replica operation. This is done under a permit from + * Execute the specified replica operation. This is done under a permit from * {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on */ - protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica) throws Exception; + protected abstract void shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica, + ActionListener listener); /** * Cluster level block to check before request execution. Returning null means that no blocks need to be checked. @@ -272,13 +273,31 @@ boolean isRetryableClusterBlockException(final Throwable e) { return false; } - protected void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { - execute(task, request, new ChannelActionListener<>(channel, actionName, request)); + private void handleOperationRequest(final Request request, final TransportChannel channel, Task task) { + Releasable releasable = checkOperationLimits(request); + ActionListener listener = + ActionListener.runBefore(new ChannelActionListener<>(channel, actionName, request), releasable::close); + execute(task, request, listener); + } + + protected Releasable checkOperationLimits(final Request request) { + return () -> {}; } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - new AsyncPrimaryAction( - request, new ChannelActionListener<>(channel, transportPrimaryAction, request), (ReplicationTask) task).run(); + Releasable releasable = checkPrimaryLimits(request.getRequest()); + ActionListener listener = + ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); + + try { + new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run(); + } catch (RuntimeException e) { + listener.onFailure(e); + } + } + + protected Releasable checkPrimaryLimits(final Request request) { + return () -> {}; } class AsyncPrimaryAction extends AbstractRunnable { @@ -489,10 +508,21 @@ public void runPostReplicaActions(ActionListener listener) { } } - protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, - final TransportChannel channel, final Task task) { - new AsyncReplicaAction( - replicaRequest, new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), (ReplicationTask) task).run(); + protected void handleReplicaRequest(final ConcreteReplicaRequest replicaRequest, final TransportChannel channel, + final Task task) { + Releasable releasable = checkReplicaLimits(replicaRequest.getRequest()); + ActionListener listener = + ActionListener.runBefore(new ChannelActionListener<>(channel, transportReplicaAction, replicaRequest), releasable::close); + + try { + new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run(); + } catch (RuntimeException e) { + listener.onFailure(e); + } + } + + protected Releasable checkReplicaLimits(final ReplicaRequest request) { + return () -> {}; } public static class RetryOnReplicaException extends ElasticsearchException { @@ -531,27 +561,31 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio @Override public void onResponse(Releasable releasable) { + assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; try { - assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; - final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); - replicaResult.runPostReplicaActions( - ActionListener.wrap(r -> { - final TransportReplicationAction.ReplicaResponse response = - new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint()); - releasable.close(); // release shard operation lock before responding to caller - if (logger.isTraceEnabled()) { - logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, - replicaRequest.getRequest().shardId(), - replicaRequest.getRequest()); - } - setPhase(task, "finished"); - onCompletionListener.onResponse(response); - }, e -> { - Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller - this.responseWithFailure(e); - }) - ); - } catch (final Exception e) { + shardOperationOnReplica(replicaRequest.getRequest(), replica, ActionListener.wrap((replicaResult) -> + replicaResult.runPostReplicaActions( + ActionListener.wrap(r -> { + final ReplicaResponse response = + new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint()); + releasable.close(); // release shard operation lock before responding to caller + if (logger.isTraceEnabled()) { + logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, + replicaRequest.getRequest().shardId(), + replicaRequest.getRequest()); + } + setPhase(task, "finished"); + onCompletionListener.onResponse(response); + }, e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + responseWithFailure(e); + }) + ), e -> { + Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller + AsyncReplicaAction.this.onFailure(e); + })); + // TODO: Evaludate if we still need to catch this exception + } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 07f8e96b4e796..8407dba506a29 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -22,6 +22,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.WriteRequest; @@ -32,6 +34,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperParsingException; @@ -57,12 +60,45 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { + private final boolean forceExecutionOnPrimary; + private final WriteMemoryLimits writeMemoryLimits; + private final String executor; + protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader request, - Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary) { + Writeable.Reader replicaRequest, String executor, boolean forceExecutionOnPrimary, + WriteMemoryLimits writeMemoryLimits) { + // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the + // ThreadPool.Names.WRITE thread pool in this class. super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - request, replicaRequest, executor, true, forceExecutionOnPrimary); + request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); + this.executor = executor; + this.forceExecutionOnPrimary = forceExecutionOnPrimary; + this.writeMemoryLimits = writeMemoryLimits; + } + + @Override + protected Releasable checkOperationLimits(Request request) { + return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request)); + } + + @Override + protected Releasable checkPrimaryLimits(Request request) { + return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request)); + } + + protected long primaryOperationSize(Request request) { + return 0; + } + + @Override + protected Releasable checkReplicaLimits(ReplicaRequest request) { + return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request)); + } + + protected long replicaOperationSize(ReplicaRequest request) { + return 0; } /** Syncs operation result to the translog or throws a shard not available failure */ @@ -104,18 +140,48 @@ protected ReplicationOperation.Replicas newReplicasProxy() { * and failure async refresh is performed on the primary shard according to the Request refresh policy */ @Override - protected abstract void shardOperationOnPrimary( - Request request, IndexShard primary, ActionListener> listener); + protected void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener) { + threadPool.executor(executor).execute(new ActionRunnable<>(listener) { + @Override + protected void doRun() { + dispatchedShardOperationOnPrimary(request, primary, listener); + } + + @Override + public boolean isForceExecution() { + return forceExecutionOnPrimary; + } + }); + } + + protected abstract void dispatchedShardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener); /** * Called once per replica with a reference to the replica {@linkplain IndexShard} to modify. * - * @return the result of the operation on replica, including current translog location and operation response and failure - * async refresh is performed on the replica shard according to the ReplicaRequest refresh policy + * @param listener listener for the result of the operation on replica, including current translog location and operation + * response and failure async refresh is performed on the replica shard according to the ReplicaRequest + * refresh policy */ @Override - protected abstract WriteReplicaResult shardOperationOnReplica( - ReplicaRequest request, IndexShard replica) throws Exception; + protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener listener) { + threadPool.executor(executor).execute(new ActionRunnable<>(listener) { + @Override + protected void doRun() { + dispatchedShardOperationOnReplica(request, replica, listener); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); + } + + protected abstract void dispatchedShardOperationOnReplica( + ReplicaRequest request, IndexShard replica, ActionListener listener); /** * Result of taking the action on the primary. diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index a695c26e134c9..6ee0469ffca4b 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.update; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocWriteRequest; @@ -59,6 +60,9 @@ public class UpdateRequest extends InstanceShardOperationRequest implements DocWriteRequest, WriteRequest, ToXContentObject { + + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(UpdateRequest.class); + private static ObjectParser PARSER; private static final ParseField SCRIPT_FIELD = new ParseField("script"); @@ -928,4 +932,16 @@ public String toString() { res.append(", detect_noop[").append(detectNoop).append("]"); return res.append("}").toString(); } + + @Override + public long ramBytesUsed() { + long childRequestBytes = 0; + if (doc != null) { + childRequestBytes += doc.ramBytesUsed(); + } + if (upsertRequest != null) { + childRequestBytes += upsertRequest.ramBytesUsed(); + } + return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + childRequestBytes; + } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 0cdecbf4306dc..17d29575f23c7 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -90,9 +90,11 @@ protected void shardOperationOnPrimary(Request request, IndexShard indexShard, } @Override - protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception { - maybeSyncTranslog(indexShard); - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + maybeSyncTranslog(replica); + return new ReplicaResult(); + }); } private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index ed0f6efaf4d58..3e003fc2898f9 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -165,12 +164,14 @@ protected void shardOperationOnPrimary( } @Override - protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException { - Objects.requireNonNull(request); - Objects.requireNonNull(replica); - replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); - replica.persistRetentionLeases(); - return new ReplicaResult(); + protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.persistRetentionLeases(); + return new ReplicaResult(); + }); } public static final class Request extends ReplicationRequest { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index b595a67932980..54a418fe673c7 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -25,6 +25,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; @@ -40,7 +41,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -79,7 +79,8 @@ public RetentionLeaseSyncAction( final IndicesService indicesService, final ThreadPool threadPool, final ShardStateAction shardStateAction, - final ActionFilters actionFilters) { + final ActionFilters actionFilters, + final WriteMemoryLimits writeMemoryLimits) { super( settings, ACTION_NAME, @@ -91,7 +92,7 @@ public RetentionLeaseSyncAction( actionFilters, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT, false); + ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits); } @Override @@ -146,7 +147,7 @@ public void handleException(TransportException e) { } @Override - protected void shardOperationOnPrimary(Request request, IndexShard primary, + protected void dispatchedShardOperationOnPrimary(Request request, IndexShard primary, ActionListener> listener) { ActionListener.completeWith(listener, () -> { assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); @@ -158,14 +159,15 @@ protected void shardOperationOnPrimary(Request request, IndexShard primary, } @Override - protected WriteReplicaResult shardOperationOnReplica( - final Request request, - final IndexShard replica) throws WriteStateException { - Objects.requireNonNull(request); - Objects.requireNonNull(replica); - replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); - replica.persistRetentionLeases(); - return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + protected void dispatchedShardOperationOnReplica(Request request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + replica.persistRetentionLeases(); + return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 9d02ecc259902..e4503c07e273f 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -30,6 +30,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; @@ -568,6 +569,7 @@ protected Node(final Environment initialEnvironment, new PersistentTasksClusterService(settings, registry, clusterService, threadPool); resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(); modules.add(b -> { b.bind(Node.class).toInstance(this); @@ -586,6 +588,7 @@ protected Node(final Environment initialEnvironment, b.bind(ScriptService.class).toInstance(scriptService); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); + b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java index 17e538f04c43b..bdec69131156a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -19,6 +19,9 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import java.io.IOException; @@ -28,6 +31,8 @@ */ public interface TransportChannel { + Logger logger = LogManager.getLogger(TransportChannel.class); + String getProfileName(); String getChannelType(); @@ -42,4 +47,17 @@ public interface TransportChannel { default Version getVersion() { return Version.CURRENT; } + + /** + * A helper method to send an exception and handle and log a subsequent exception + */ + static void sendErrorResponse(TransportChannel channel, String actionName, TransportRequest request, Exception e) { + try { + channel.sendResponse(e); + } catch (Exception sendException) { + sendException.addSuppressed(e); + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), sendException); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index e99d483787db3..3939cd98f5f88 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -120,7 +120,7 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, - null, null, mock(ActionFilters.class), null, null) { + null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits()) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, AtomicArray responses, Map indicesThatCannotBeCreated) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f2d617230c5c8..129a98459fdec 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -143,7 +143,7 @@ null, new ActionFilters(Collections.emptySet()), null, new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new IndexNameExpressionResolver() - ) + ), new WriteMemoryLimits() ); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index e8ed3abe2a602..95776586b962c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; @@ -79,7 +80,8 @@ class TestTransportBulkAction extends TransportBulkAction { TestTransportBulkAction() { super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null, null, new ActionFilters(Collections.emptySet()), new Resolver(), - new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver())); + new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()), + new WriteMemoryLimits()); } @Override @@ -121,38 +123,36 @@ public void tearDown() throws Exception { public void testDeleteNonExistingDocDoesNotCreateIndex() throws Exception { BulkRequest bulkRequest = new BulkRequest().add(new DeleteRequest("index").id("id")); - ActionTestUtils.execute(bulkAction, null, bulkRequest, ActionListener.wrap(response -> { - assertFalse(bulkAction.indexCreated); - BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems(); - assertEquals(bulkResponses.length, 1); - assertTrue(bulkResponses[0].isFailed()); - assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException); - assertEquals("index", bulkResponses[0].getFailure().getIndex()); - }, exception -> { - throw new AssertionError(exception); - })); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + + BulkResponse response = future.actionGet(); + assertFalse(bulkAction.indexCreated); + BulkItemResponse[] bulkResponses = ((BulkResponse) response).getItems(); + assertEquals(bulkResponses.length, 1); + assertTrue(bulkResponses[0].isFailed()); + assertTrue(bulkResponses[0].getFailure().getCause() instanceof IndexNotFoundException); + assertEquals("index", bulkResponses[0].getFailure().getIndex()); } public void testDeleteNonExistingDocExternalVersionCreatesIndex() throws Exception { BulkRequest bulkRequest = new BulkRequest() .add(new DeleteRequest("index").id("id").versionType(VersionType.EXTERNAL).version(0)); - ActionTestUtils.execute(bulkAction, null, bulkRequest, ActionListener.wrap(response -> { - assertTrue(bulkAction.indexCreated); - }, exception -> { - throw new AssertionError(exception); - })); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + assertTrue(bulkAction.indexCreated); } public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exception { BulkRequest bulkRequest = new BulkRequest() .add(new DeleteRequest("index2").id("id").versionType(VersionType.EXTERNAL_GTE).version(0)); - ActionTestUtils.execute(bulkAction, null, bulkRequest, ActionListener.wrap(response -> { - assertTrue(bulkAction.indexCreated); - }, exception -> { - throw new AssertionError(exception); - })); + PlainActionFuture future = PlainActionFuture.newFuture(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + assertTrue(bulkAction.indexCreated); } public void testGetIndexWriteRequest() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index e084a7a7bdcbc..89408f3d17f91 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -233,6 +233,7 @@ static class TestTransportBulkAction extends TransportBulkAction { actionFilters, indexNameExpressionResolver, autoCreateIndex, + new WriteMemoryLimits(), relativeTimeProvider); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index cfb94c3465dc5..6861fb4aa89a2 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; @@ -143,7 +144,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { when(indexServices.indexServiceSafe(eq(index))).thenReturn(indexService); final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService, - clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>())); + clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), + new WriteMemoryLimits()); assertThat(action.globalBlockLevel(), nullValue()); assertThat(action.indexBlockLevel(), nullValue()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index fd6cacf773932..25f27d2abfb3b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -921,14 +921,17 @@ public void testReplicasCounter() { final ReplicationTask task = maybeTask(); TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { + @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { - assertIndexShardCounter(1); - assertPhase(task, "replica"); - if (throwException) { - throw new ElasticsearchException("simulated"); - } - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assertIndexShardCounter(1); + assertPhase(task, "replica"); + if (throwException) { + throw new ElasticsearchException("simulated"); + } + return new ReplicaResult(); + }); } }; try { @@ -1057,12 +1060,14 @@ public void testRetryOnReplica() throws Exception { TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { - assertPhase(task, "replica"); - if (throwException.get()) { - throw new RetryOnReplicaException(shardId, "simulation"); - } - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assertPhase(task, "replica"); + if (throwException.get()) { + throw new RetryOnReplicaException(shardId, "simulation"); + } + return new ReplicaResult(); + }); } }; final PlainActionFuture listener = new PlainActionFuture<>(); @@ -1124,13 +1129,15 @@ public void testRetryOnReplicaWithRealTransport() throws Exception { TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { - assertPhase(task, "replica"); - if (throwException.get()) { - throw new RetryOnReplicaException(shardId, "simulation"); - } - calledSuccessfully.set(true); - return new ReplicaResult(); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assertPhase(task, "replica"); + if (throwException.get()) { + throw new RetryOnReplicaException(shardId, "simulation"); + } + calledSuccessfully.set(true); + return new ReplicaResult(); + }); } }; final PlainActionFuture listener = new PlainActionFuture<>(); @@ -1282,9 +1289,9 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, } @Override - protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { + protected void shardOperationOnReplica(Request request, IndexShard replica, ActionListener listener) { request.processedOnReplicas.incrementAndGet(); - return new ReplicaResult(); + listener.onResponse(new ReplicaResult()); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index e2ce61047c4d0..901bd28cb7591 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -446,14 +446,14 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2").getId(), - shard.routingEntry().currentNodeId()); + replica.routingEntry().currentNodeId()); executedOnReplica.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the replica shard - assertSame(replica, shard); - return new ReplicaResult(); + assertSame(replica, replica); + listener.onResponse(new ReplicaResult()); } } @@ -505,10 +505,10 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { assertNoBlocks("block must not exist when executing the operation on replica shard: it should have been blocked before"); - assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - return super.shardOperationOnReplica(shardRequest, shard); + assertThat(replica.getActiveOperationsCount(), greaterThan(0)); + super.shardOperationOnReplica(shardRequest, replica, listener); } private void assertNoBlocks(final String error) { @@ -551,9 +551,9 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, } @Override - protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { - assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount()); - return super.shardOperationOnReplica(shardRequest, shard); + protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener listener) { + assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, replica.getActiveOperationsCount()); + super.shardOperationOnReplica(shardRequest, replica, listener); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 42a696c3b2583..c1ef04b10d43d 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -137,7 +138,7 @@ public void testPrimaryNoRefreshCall() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -152,8 +153,9 @@ public void testReplicaNoRefreshCall() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - TransportWriteAction.WriteReplicaResult result = - testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNotNull(listener.response); @@ -166,7 +168,7 @@ public void testPrimaryImmediateRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -182,8 +184,9 @@ public void testReplicaImmediateRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - TransportWriteAction.WriteReplicaResult result = - testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNotNull(listener.response); @@ -197,7 +200,7 @@ public void testPrimaryWaitForRefresh() throws Exception { request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -221,7 +224,9 @@ public void testReplicaWaitForRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - TransportWriteAction.WriteReplicaResult result = testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); // Haven't responded yet @@ -240,7 +245,7 @@ public void testReplicaWaitForRefresh() throws Exception { public void testDocumentFailureInShardOperationOnPrimary() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(true, true); - testAction.shardOperationOnPrimary(request, indexShard, + testAction.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful)); @@ -252,8 +257,9 @@ public void testDocumentFailureInShardOperationOnPrimary() throws Exception { public void testDocumentFailureInShardOperationOnReplica() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(randomBoolean(), true); - TransportWriteAction.WriteReplicaResult result = - testAction.shardOperationOnReplica(request, indexShard); + final PlainActionFuture future = PlainActionFuture.newFuture(); + testAction.dispatchedShardOperationOnReplica(request, indexShard, future); + final TransportReplicationAction.ReplicaResult result = future.actionGet(); CapturingActionListener listener = new CapturingActionListener<>(); result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE)); assertNull(listener.response); @@ -360,7 +366,8 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF super(Settings.EMPTY, "internal:test", new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, - new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, + new WriteMemoryLimits()); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -369,7 +376,8 @@ protected TestAction(Settings settings, String actionName, TransportService tran ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, - new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); + new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false, + new WriteMemoryLimits()); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } @@ -381,7 +389,7 @@ protected TestResponse newResponseInstance(StreamInput in) throws IOException { } @Override - protected void shardOperationOnPrimary( + protected void dispatchedShardOperationOnPrimary( TestRequest request, IndexShard primary, ActionListener> listener) { ActionListener.completeWith(listener, () -> { if (withDocumentFailureOnPrimary) { @@ -393,14 +401,16 @@ protected void shardOperationOnPrimary( } @Override - protected WriteReplicaResult shardOperationOnReplica(TestRequest request, IndexShard replica) throws Exception { - final WriteReplicaResult replicaResult; - if (withDocumentFailureOnReplica) { - replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); - } else { - replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); - } - return replicaResult; + protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final WriteReplicaResult replicaResult; + if (withDocumentFailureOnReplica) { + replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); + } else { + replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); + } + return replicaResult; + }); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index c22011ac71b41..bc44ae0c1d1ba 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -116,7 +116,8 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { if (randomBoolean()) { action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); } else { - action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); + action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard, + ActionTestUtils.assertNoFailureListener(r -> {})); } if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index e380d3431b30f..0d8607ab01c76 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; @@ -149,7 +150,9 @@ public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateE final RetentionLeaseBackgroundSyncAction.Request request = new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); - final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard); + final PlainActionFuture listener = PlainActionFuture.newFuture(); + action.shardOperationOnReplica(request, indexShard, listener); + final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); // the retention leases on the shard should be persisted diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 63b013325c5ce..7cfaf06f3e581 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -20,9 +20,11 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -103,10 +105,11 @@ public void testRetentionLeaseSyncActionOnPrimary() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet())); + new ActionFilters(Collections.emptySet()), + new WriteMemoryLimits()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - action.shardOperationOnPrimary(request, indexShard, + action.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); @@ -139,12 +142,14 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet())); + new ActionFilters(Collections.emptySet()), + new WriteMemoryLimits()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - final TransportWriteAction.WriteReplicaResult result = - action.shardOperationOnReplica(request, indexShard); + PlainActionFuture listener = PlainActionFuture.newFuture(); + action.dispatchedShardOperationOnReplica(request, indexShard, listener); + final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); // the retention leases on the shard should be persisted @@ -176,7 +181,8 @@ public void testBlocks() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet())); + new ActionFilters(Collections.emptySet()), + new WriteMemoryLimits()); assertNull(action.indexBlockLevel()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 856cd0c940e56..2bdebbd1b63d9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -1467,7 +1468,8 @@ public void onFailure(final Exception e) { indicesService, threadPool, shardStateAction, - actionFilters)), + actionFilters, + new WriteMemoryLimits())), RetentionLeaseSyncer.EMPTY, client); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService); @@ -1482,6 +1484,7 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index actionFilters, indexNameExpressionResolver )); final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings); + final WriteMemoryLimits indexingMemoryLimits = new WriteMemoryLimits(); mappingUpdatedAction.setClient(client); actions.put(BulkAction.INSTANCE, new TransportBulkAction(threadPool, transportService, clusterService, @@ -1490,11 +1493,12 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), client), client, actionFilters, indexNameExpressionResolver, - new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver), + new WriteMemoryLimits() )); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), - actionFilters); + actionFilters, indexingMemoryLimits); actions.put(TransportShardBulkAction.TYPE, transportShardBulkAction); final RestoreService restoreService = new RestoreService( clusterService, repositoriesService, allocationService, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 48f41ef527de1..6918c4a96678a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -43,7 +44,8 @@ public TransportBulkShardOperationsAction( final IndicesService indicesService, final ThreadPool threadPool, final ShardStateAction shardStateAction, - final ActionFilters actionFilters) { + final ActionFilters actionFilters, + final WriteMemoryLimits writeMemoryLimits) { super( settings, BulkShardOperationsAction.NAME, @@ -55,11 +57,11 @@ public TransportBulkShardOperationsAction( actionFilters, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE, false); + ThreadPool.Names.WRITE, false, writeMemoryLimits); } @Override - protected void shardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, + protected void dispatchedShardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, ActionListener> listener) { if (logger.isTraceEnabled()) { logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry()); @@ -68,6 +70,11 @@ protected void shardOperationOnPrimary(BulkShardOperationsRequest request, Index request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger)); } + @Override + protected long primaryOperationSize(BulkShardOperationsRequest request) { + return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum(); + } + public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { @@ -158,12 +165,19 @@ public static WritePrimaryResult shardOperationOnReplica( - final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry()); - } - return shardOperationOnReplica(request, replica, logger); + protected void dispatchedShardOperationOnReplica(BulkShardOperationsRequest request, IndexShard replica, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + if (logger.isTraceEnabled()) { + logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry()); + } + return shardOperationOnReplica(request, replica, logger); + }); + } + + @Override + protected long replicaOperationSize(BulkShardOperationsRequest request) { + return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).sum(); } // public for testing purposes only