From fb1c11107a8782819e7946afade98425cfdc90f0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 25 Jun 2020 14:11:23 -0600 Subject: [PATCH 01/10] Count coordinating and primary bytes as write byte This is a follow-up to #57573. This commit combines coordinating and primary bytes under the same "write" bucket. Double accounting is prevented by only accounting the bytes at either the reroute phase or the primary phase. TransportBulkAction calls execute directly, so the operations handler is skipped and the bytes are not double accounted. --- .../action/bulk/WriteMemoryLimitsIT.java | 25 ++++++-------- .../action/bulk/TransportBulkAction.java | 2 +- .../action/bulk/WriteMemoryLimits.java | 34 +++++++------------ .../TransportResyncReplicationAction.java | 10 ++++++ .../replication/TransportWriteAction.java | 22 ++++++++++-- .../index/seqno/RetentionLeaseSyncAction.java | 5 +++ 6 files changed, 58 insertions(+), 40 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java index 276d3013ed928..2ec37b84bf7bb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -134,12 +134,10 @@ public void testWriteBytesAreIncremented() throws Exception { WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); - assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize)); - assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(bulkRequestSize, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); // Block the replica Write thread pool @@ -172,8 +170,9 @@ public void testWriteBytesAreIncremented() throws Exception { final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); final long secondBulkShardRequestSize = request.ramBytesUsed(); - assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(), + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); latchBlockingReplication.countDown(); @@ -181,12 +180,10 @@ public void testWriteBytesAreIncremented() throws Exception { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCoordinatingBytes()); - assertEquals(0, primaryWriteLimits.getPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertEquals(0, primaryWriteLimits.getWriteBytes()); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index e443d0292f250..c37683b30ff12 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -162,7 +162,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes); + final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java index 84c702f110622..29371f0795088 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -25,34 +25,24 @@ public class WriteMemoryLimits { - private final AtomicLong coordinatingBytes = new AtomicLong(0); - private final AtomicLong primaryBytes = new AtomicLong(0); - private final AtomicLong replicaBytes = new AtomicLong(0); + private final AtomicLong writeBytes = new AtomicLong(0); + private final AtomicLong replicaWriteBytes = new AtomicLong(0); - public Releasable markCoordinatingOperationStarted(long bytes) { - coordinatingBytes.addAndGet(bytes); - return () -> coordinatingBytes.getAndAdd(-bytes); + public Releasable markWriteOperationStarted(long bytes) { + writeBytes.addAndGet(bytes); + return () -> writeBytes.getAndAdd(-bytes); } - public long getCoordinatingBytes() { - return coordinatingBytes.get(); + public long getWriteBytes() { + return writeBytes.get(); } - public Releasable markPrimaryOperationStarted(long bytes) { - primaryBytes.addAndGet(bytes); - return () -> primaryBytes.getAndAdd(-bytes); + public Releasable markReplicaWriteStarted(long bytes) { + replicaWriteBytes.getAndAdd(bytes); + return () -> replicaWriteBytes.getAndAdd(-bytes); } - public long getPrimaryBytes() { - return primaryBytes.get(); - } - - public Releasable markReplicaOperationStarted(long bytes) { - replicaBytes.getAndAdd(bytes); - return () -> replicaBytes.getAndAdd(-bytes); - } - - public long getReplicaBytes() { - return replicaBytes.get(); + public long getReplicaWriteBytes() { + return replicaWriteBytes.get(); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 638371f414111..1f30df151745f 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran writeMemoryLimits); } + @Override + protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener listener) { + assert false : "use TransportResyncReplicationAction#sync"; + } + @Override protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException { return new ResyncReplicationResponse(in); @@ -86,6 +91,11 @@ public ClusterBlockLevel indexBlockLevel() { return null; } + @Override + protected boolean rerouteBypassed() { + return true; + } + @Override protected void dispatchedShardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary, ActionListener> listener) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 8407dba506a29..0e8d9fd4dd6a1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,12 +80,28 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request)); + if (rerouteBypassed() == false) { + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + } else { + return () -> {}; + } } @Override protected Releasable checkPrimaryLimits(Request request) { - return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request)); + if (rerouteBypassed()) { + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + } else { + return () -> {}; + } + } + + /** + * Some actions bypass the reroute phase and directly call the primary action. If this is the case, we + * need to mark the WRITE bytes when the primary request is received. + */ + protected boolean rerouteBypassed() { + return false; } protected long primaryOperationSize(Request request) { @@ -94,7 +110,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request)); + return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request)); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 54a418fe673c7..9b45cf5b9e140 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -146,6 +146,11 @@ public void handleException(TransportException e) { } } + @Override + protected boolean rerouteBypassed() { + return true; + } + @Override protected void dispatchedShardOperationOnPrimary(Request request, IndexShard primary, ActionListener> listener) { From ce0cb30956ea226b7bff627afe6706500ef3d06d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 26 Jun 2020 12:35:03 -0600 Subject: [PATCH 02/10] Changes --- .../elasticsearch/test/InternalTestCluster.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1fbfd11603ace..f3d8a18c99c8f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1164,19 +1164,14 @@ private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes(); - if (coordinatingBytes > 0) { - throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node [" + final long writeBytes = writeMemoryLimits.getWriteBytes(); + if (writeBytes > 0) { + throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long primaryBytes = writeMemoryLimits.getPrimaryBytes(); - if (primaryBytes > 0) { - throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node [" - + nodeAndClient.name + "]."); - } - final long replicaBytes = writeMemoryLimits.getReplicaBytes(); - if (replicaBytes > 0) { - throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node [" + final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + if (replicaWriteBytes > 0) { + throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } } From c86d2162d902f900962cc56f5191aac480d263c5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 26 Jun 2020 19:24:35 -0600 Subject: [PATCH 03/10] Changes --- .../action/bulk/WriteMemoryLimitsIT.java | 50 +++++++++++++------ .../TransportResyncReplicationAction.java | 4 +- .../replication/TransportWriteAction.java | 17 +++---- .../index/seqno/RetentionLeaseSyncAction.java | 4 +- .../TransportBulkShardOperationsAction.java | 17 +++++++ 5 files changed, 64 insertions(+), 28 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java index 2ec37b84bf7bb..a7cb2f5260fde 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -42,9 +43,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1) public class WriteMemoryLimitsIT extends ESIntegTestCase { + public static final String INDEX_NAME = "test"; + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -70,13 +73,12 @@ protected int numberOfShards() { } public void testWriteBytesAreIncremented() throws Exception { - final String index = "test"; - assertAcked(prepareCreate(index, Settings.builder() + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); - ensureGreen(index); + ensureGreen(INDEX_NAME); - IndicesStatsResponse response = client().admin().indices().prepareStats(index).get(); + IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get(); String primaryId = Stream.of(response.getShards()) .map(ShardStats::getShardRouting) .filter(ShardRouting::primary) @@ -89,8 +91,10 @@ public void testWriteBytesAreIncremented() throws Exception { .findAny() .get() .currentNodeId(); - String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName(); - String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName(); final CountDownLatch replicationSendPointReached = new CountDownLatch(1); final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); @@ -117,7 +121,7 @@ public void testWriteBytesAreIncremented() throws Exception { final BulkRequest bulkRequest = new BulkRequest(); int totalRequestSize = 0; for (int i = 0; i < 80; ++i) { - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); totalRequestSize += request.ramBytesUsed(); assertTrue(request.ramBytesUsed() > request.source().length()); @@ -128,16 +132,19 @@ public void testWriteBytesAreIncremented() throws Exception { final long bulkShardRequestSize = totalRequestSize; try { - final ActionFuture successFuture = client(replicaName).bulk(bulkRequest); + final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); - assertEquals(bulkRequestSize, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); // Block the replica Write thread pool @@ -160,18 +167,31 @@ public void testWriteBytesAreIncremented() throws Exception { newActionsSendPointReached.await(); latchBlockingReplicationSend.countDown(); - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); final BulkRequest secondBulkRequest = new BulkRequest(); secondBulkRequest.add(request); - ActionFuture secondFuture = client(replicaName).bulk(secondBulkRequest); + // Use the primary or the replica data node as the coordinating node this time + boolean usePrimaryAsCoordinatingNode = randomBoolean(); + final ActionFuture secondFuture; + if (usePrimaryAsCoordinatingNode) { + secondFuture = client(primaryName).bulk(secondBulkRequest); + } else { + secondFuture = client(replicaName).bulk(secondBulkRequest); + } final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); final long secondBulkShardRequestSize = request.ramBytesUsed(); - assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + if (usePrimaryAsCoordinatingNode) { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + } else { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + } + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); @@ -184,6 +204,8 @@ public void testWriteBytesAreIncremented() throws Exception { assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); assertEquals(0, replicaWriteLimits.getWriteBytes()); assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 1f30df151745f..1159081f49e30 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -92,8 +92,8 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected boolean rerouteBypassed() { - return true; + protected boolean supportsRerouteAction() { + return false; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 0e8d9fd4dd6a1..cb2e9be7535c7 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,19 +80,16 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe @Override protected Releasable checkOperationLimits(Request request) { - if (rerouteBypassed() == false) { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); - } else { - return () -> {}; - } + assert supportsRerouteAction() : "checkOperationLimits should be be called if reroute not supported by action"; + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } @Override protected Releasable checkPrimaryLimits(Request request) { - if (rerouteBypassed()) { - return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); - } else { + if (supportsRerouteAction()) { return () -> {}; + } else { + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } } @@ -100,8 +97,8 @@ protected Releasable checkPrimaryLimits(Request request) { * Some actions bypass the reroute phase and directly call the primary action. If this is the case, we * need to mark the WRITE bytes when the primary request is received. */ - protected boolean rerouteBypassed() { - return false; + protected boolean supportsRerouteAction() { + return true; } protected long primaryOperationSize(Request request) { diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 9b45cf5b9e140..f808070070f0e 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -147,8 +147,8 @@ public void handleException(TransportException e) { } @Override - protected boolean rerouteBypassed() { - return true; + protected boolean supportsRerouteAction() { + return false; } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 6918c4a96678a..512d300d25b3f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; @@ -25,6 +26,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException; @@ -36,6 +38,8 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction { + private final WriteMemoryLimits writeMemoryLimits; + @Inject public TransportBulkShardOperationsAction( final Settings settings, @@ -58,6 +62,19 @@ public TransportBulkShardOperationsAction( BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits); + this.writeMemoryLimits = writeMemoryLimits; + } + + @Override + protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { + // This is executed on the follower coordinator node and we need to mark the bytes. + Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + try { + super.doExecute(task, request, releasingListener); + } catch (Exception e) { + releasingListener.onFailure(e); + } } @Override From 695cffccb56735f4e3e07bde9e493b23cc467fe5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 29 Jun 2020 16:33:18 -0600 Subject: [PATCH 04/10] Changes --- .../support/replication/TransportReplicationAction.java | 2 +- .../action/support/replication/TransportWriteAction.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1c05b596d97f0..810af645009f8 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -584,7 +584,7 @@ public void onResponse(Releasable releasable) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); })); - // TODO: Evaludate if we still need to catch this exception + // TODO: Evaluate if we still need to catch this exception } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index cb2e9be7535c7..d22771eed2fa2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -86,7 +86,9 @@ protected Releasable checkOperationLimits(Request request) { @Override protected Releasable checkPrimaryLimits(Request request) { - if (supportsRerouteAction()) { + // If reroute is supported, the parent task is the reroute task. If the node-id is the same, we have + // already accounted the bytes. + if (supportsRerouteAction() && request.getParentTask().getNodeId().equals(clusterService.localNode().getId())) { return () -> {}; } else { return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); From 14dafcf32932d4fb154ecf529304502452b10943 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 29 Jun 2020 17:19:53 -0600 Subject: [PATCH 05/10] Changes --- .../TransportResyncReplicationAction.java | 5 ---- .../TransportReplicationAction.java | 27 +++++++++++++++---- .../replication/TransportWriteAction.java | 17 +++--------- .../index/seqno/RetentionLeaseSyncAction.java | 5 ---- 4 files changed, 26 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 1159081f49e30..74ddcf54b3212 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -91,11 +91,6 @@ public ClusterBlockLevel indexBlockLevel() { return null; } - @Override - protected boolean supportsRerouteAction() { - return false; - } - @Override protected void dispatchedShardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary, ActionListener> listener) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 810af645009f8..e443734c4d2eb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionResponse; @@ -285,7 +286,7 @@ protected Releasable checkOperationLimits(final Request request) { } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - Releasable releasable = checkPrimaryLimits(request.getRequest()); + Releasable releasable = checkPrimaryLimits(request.getRequest(), request.rerouteWasLocal()); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); @@ -296,7 +297,7 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, } } - protected Releasable checkPrimaryLimits(final Request request) { + protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) { return () -> {}; } @@ -371,8 +372,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), - primaryRequest.getPrimaryTerm()), - transportOptions, + primaryRequest.getPrimaryTerm()), transportOptions, new ActionListenerResponseHandler<>(onCompletionListener, reader) { @Override public void handleResponse(Response response) { @@ -750,7 +750,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, - new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()))); + new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true)); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -1102,19 +1102,31 @@ public static class ConcreteShardRequest extends Tra private final String targetAllocationID; private final long primaryTerm; private final R request; + private final boolean rerouteWasLocal; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { targetAllocationID = in.readString(); primaryTerm = in.readVLong(); + // TODO: Change after backport + if (in.getVersion().onOrAfter(Version.CURRENT)) { + rerouteWasLocal = in.readBoolean(); + } else { + rerouteWasLocal = false; + } request = requestReader.read(in); } public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { + this(request, targetAllocationID, primaryTerm, false); + } + + public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean rerouteWasLocal) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; + this.rerouteWasLocal = rerouteWasLocal; } @Override @@ -1145,9 +1157,14 @@ public String getDescription() { public void writeTo(StreamOutput out) throws IOException { out.writeString(targetAllocationID); out.writeVLong(primaryTerm); + out.writeBoolean(rerouteWasLocal); request.writeTo(out); } + public boolean rerouteWasLocal() { + return rerouteWasLocal; + } + public R getRequest() { return request; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index d22771eed2fa2..037147266be54 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,29 +80,20 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe @Override protected Releasable checkOperationLimits(Request request) { - assert supportsRerouteAction() : "checkOperationLimits should be be called if reroute not supported by action"; return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } @Override - protected Releasable checkPrimaryLimits(Request request) { - // If reroute is supported, the parent task is the reroute task. If the node-id is the same, we have - // already accounted the bytes. - if (supportsRerouteAction() && request.getParentTask().getNodeId().equals(clusterService.localNode().getId())) { + protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) { + // If the reroute this primary request was submitted by a reroute on this local node, we have already + // accounted the bytes. + if (rerouteWasLocal) { return () -> {}; } else { return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } } - /** - * Some actions bypass the reroute phase and directly call the primary action. If this is the case, we - * need to mark the WRITE bytes when the primary request is received. - */ - protected boolean supportsRerouteAction() { - return true; - } - protected long primaryOperationSize(Request request) { return 0; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index f808070070f0e..54a418fe673c7 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -146,11 +146,6 @@ public void handleException(TransportException e) { } } - @Override - protected boolean supportsRerouteAction() { - return false; - } - @Override protected void dispatchedShardOperationOnPrimary(Request request, IndexShard primary, ActionListener> listener) { From f8249eee8b4cacb21a021547a75219b9d3d77308 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 29 Jun 2020 21:59:10 -0600 Subject: [PATCH 06/10] Fix --- .../support/replication/TransportReplicationAction.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e443734c4d2eb..aebf8cdd88e9b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1108,7 +1108,7 @@ public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) t targetAllocationID = in.readString(); primaryTerm = in.readVLong(); // TODO: Change after backport - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { rerouteWasLocal = in.readBoolean(); } else { rerouteWasLocal = false; @@ -1157,7 +1157,10 @@ public String getDescription() { public void writeTo(StreamOutput out) throws IOException { out.writeString(targetAllocationID); out.writeVLong(primaryTerm); - out.writeBoolean(rerouteWasLocal); + // TODO: Change after backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(rerouteWasLocal); + } request.writeTo(out); } From 0e452d54815c0dce366ebad08415d1a57fadab7e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 30 Jun 2020 11:22:10 -0600 Subject: [PATCH 07/10] Naming --- .../TransportReplicationAction.java | 30 ++++++++----------- .../replication/TransportWriteAction.java | 2 +- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index aebf8cdd88e9b..6d5ae27949b73 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionResponse; @@ -286,7 +285,7 @@ protected Releasable checkOperationLimits(final Request request) { } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - Releasable releasable = checkPrimaryLimits(request.getRequest(), request.rerouteWasLocal()); + Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute()); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); @@ -1102,17 +1101,14 @@ public static class ConcreteShardRequest extends Tra private final String targetAllocationID; private final long primaryTerm; private final R request; - private final boolean rerouteWasLocal; + // Indicates if this primary shard request originated by a reroute on this local node. + private final boolean sentFromLocalReroute; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { + // sendFromLocalReroute targetAllocationID = in.readString(); primaryTerm = in.readVLong(); - // TODO: Change after backport - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - rerouteWasLocal = in.readBoolean(); - } else { - rerouteWasLocal = false; - } + sentFromLocalReroute = false; request = requestReader.read(in); } @@ -1120,13 +1116,13 @@ public ConcreteShardRequest(R request, String targetAllocationID, long primaryTe this(request, targetAllocationID, primaryTerm, false); } - public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean rerouteWasLocal) { + public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; - this.rerouteWasLocal = rerouteWasLocal; + this.sentFromLocalReroute = sentFromLocalReroute; } @Override @@ -1155,17 +1151,17 @@ public String getDescription() { @Override public void writeTo(StreamOutput out) throws IOException { + // If sentFromLocalReroute is marked true, then this request should just be looped back through + // the local transport. It should never be serialized to be sent over the wire. If it is sent over + // the wire, then it was NOT sent from a local reroute. + assert sentFromLocalReroute == false; out.writeString(targetAllocationID); out.writeVLong(primaryTerm); - // TODO: Change after backport - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeBoolean(rerouteWasLocal); - } request.writeTo(out); } - public boolean rerouteWasLocal() { - return rerouteWasLocal; + public boolean sentFromLocalReroute() { + return sentFromLocalReroute; } public R getRequest() { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 037147266be54..dc5c790b2cf9e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -85,7 +85,7 @@ protected Releasable checkOperationLimits(Request request) { @Override protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) { - // If the reroute this primary request was submitted by a reroute on this local node, we have already + // If this primary request was submitted by a reroute performed on this local node, we have already // accounted the bytes. if (rerouteWasLocal) { return () -> {}; From 8b2d3ce6585948f1d232b37b8e0b50b4cea7376e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 30 Jun 2020 14:44:28 -0600 Subject: [PATCH 08/10] Changes --- .../xpack/ccr/LocalIndexFollowingIT.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 8ff18c4763e24..dfa561a27151c 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,12 +6,16 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.CcrSingleNodeTestCase; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -23,6 +27,8 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.StreamSupport; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -74,6 +80,64 @@ public void testFollowIndex() throws Exception { ensureEmptyWriteBuffers(); } + public void testWriteLimitsIncremented() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, Collections.emptyMap()); + assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader"); + + // Use a sufficiently small number of docs to ensure that they are well below the number of docs that + // can be sent in a single TransportBulkShardOperationsAction + final long firstBatchNumDocs = randomIntBetween(10, 20); + long sourceSize = 0; + for (int i = 0; i < firstBatchNumDocs; i++) { + BytesArray source = new BytesArray("{}"); + sourceSize += source.length(); + client().prepareIndex("leader").setSource(source, XContentType.JSON).get(); + } + + ThreadPool nodeThreadPool = getInstanceFromNode(ThreadPool.class); + ThreadPool.Info writeInfo = StreamSupport.stream(nodeThreadPool.info().spliterator(), false) + .filter(i -> i.getName().equals(ThreadPool.Names.WRITE)).findAny().get(); + int numberOfThreads = writeInfo.getMax(); + CountDownLatch threadBlockedLatch = new CountDownLatch(numberOfThreads); + CountDownLatch blocker = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; ++i) { + nodeThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + threadBlockedLatch.countDown(); + blocker.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + threadBlockedLatch.await(); + + try { + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + final long finalSourceSize = sourceSize; + assertBusy(() -> { + // The actual write bytes will be greater due to other request fields. However, this test is + // just spot checking that the bytes are incremented at all. + assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + }); + blocker.countDown(); + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs)); + }); + ensureEmptyWriteBuffers(); + } finally { + if (blocker.getCount() > 0) { + blocker.countDown(); + } + } + + } + public void testRemoveRemoteConnection() throws Exception { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName("my_pattern"); From 0b1738277fdc5e868496a8cca1ccbec27c4b3760 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 30 Jun 2020 14:44:44 -0600 Subject: [PATCH 09/10] Changes --- .../java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index dfa561a27151c..8b55572d19b8b 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -10,7 +10,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; From a5c98a2794719b1cee71aa8ed0e0d61177c33ba7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 1 Jul 2020 07:41:26 -0600 Subject: [PATCH 10/10] Remove comment --- .../action/support/replication/TransportReplicationAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6d5ae27949b73..bce9517e3ef64 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1105,7 +1105,6 @@ public static class ConcreteShardRequest extends Tra private final boolean sentFromLocalReroute; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { - // sendFromLocalReroute targetAllocationID = in.readString(); primaryTerm = in.readVLong(); sentFromLocalReroute = false;