From 9635997a5a37190a083debc3e3c153cef234c51b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Sep 2018 09:48:25 -0400 Subject: [PATCH 1/3] CCR: replicates max seq_no of updates to follower This commit replicates the max_seq_no_of_updates on the leading index to the primaries of the following index via ShardFollowNodeTask. The max_seq_of_updates is then transmitted to the replicas of the follower via replication requests (that's BulkShardOperationsRequest). Relates #33656 --- .../xpack/ccr/action/ShardChangesAction.java | 25 ++++- .../xpack/ccr/action/ShardFollowNodeTask.java | 10 +- .../ccr/action/ShardFollowTasksExecutor.java | 4 +- .../bulk/BulkShardOperationsRequest.java | 11 +- .../TransportBulkShardOperationsAction.java | 14 ++- .../xpack/ccr/ShardChangesIT.java | 101 +++++++++++++++++- .../ccr/action/ShardChangesResponseTests.java | 2 + .../ShardFollowNodeTaskRandomTests.java | 7 +- .../ccr/action/ShardFollowNodeTaskTests.java | 5 +- .../ShardFollowTaskReplicationTests.java | 14 ++- .../action/bulk/BulkShardOperationsTests.java | 3 +- 11 files changed, 175 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 937ca0a009613..0cf700dee4efe 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -207,6 +207,12 @@ public long getMaxSeqNo() { return maxSeqNo; } + private long maxSeqNoOfUpdatesOrDeletes; + + public long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes; + } + private Translog.Operation[] operations; public Translog.Operation[] getOperations() { @@ -220,11 +226,13 @@ public Translog.Operation[] getOperations() { final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, + final long maxSeqNoOfUpdatesOrDeletes, final Translog.Operation[] operations) { this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; this.operations = operations; } @@ -234,6 +242,7 @@ public void readFrom(final StreamInput in) throws IOException { mappingVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); + maxSeqNoOfUpdatesOrDeletes = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -243,6 +252,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(mappingVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); + out.writeZLong(maxSeqNoOfUpdatesOrDeletes); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -254,12 +264,13 @@ public boolean equals(final Object o) { return mappingVersion == that.mappingVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && + maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes && Arrays.equals(operations, that.operations); } @Override public int hashCode() { - return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations)); } } @@ -294,7 +305,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.getMaxOperationCount(), request.getExpectedHistoryUUID(), request.getMaxOperationSizeInBytes()); - return getResponse(mappingVersion, seqNoStats, operations); + // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. + final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); + return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations); } @Override @@ -358,7 +371,8 @@ private void globalCheckpointAdvancementFailure( final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); - listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); + final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); + listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY)); } catch (final Exception caught) { caught.addSuppressed(e); listener.onFailure(caught); @@ -433,8 +447,9 @@ static Translog.Operation[] getOperations(IndexShard indexShard, return operations.toArray(EMPTY_OPERATIONS_ARRAY); } - static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) { - return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, + final long maxSeqNoOfUpdates, final Translog.Operation[] operations) { + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 777efdd654b4f..516e022500b29 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -56,6 +57,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long leaderGlobalCheckpoint; private long leaderMaxSeqNo; + private volatile long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; private long lastRequestedSeqNo; private long followerGlobalCheckpoint = 0; private long followerMaxSeqNo = 0; @@ -262,6 +264,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar onOperationsFetched(response.getOperations()); leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo()); + leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.max(leaderMaxSeqNoOfUpdatesOrDeletes, response.getMaxSeqNoOfUpdatesOrDeletes()); final long newFromSeqNo; if (response.getOperations().length == 0) { newFromSeqNo = from; @@ -296,8 +299,9 @@ private void sendBulkShardOperationsRequest(List operations) } private void sendBulkShardOperationsRequest(List operations, AtomicInteger retryCounter) { + assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated"; final long startTime = relativeTimeProvider.getAsLong(); - innerSendBulkShardOperationsRequest(operations, + innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, response -> { synchronized (ShardFollowNodeTask.this) { totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); @@ -383,8 +387,8 @@ private static boolean shouldRetry(Exception e) { // These methods are protected for testing purposes: protected abstract void innerUpdateMapping(LongConsumer handler, Consumer errorHandler); - protected abstract void innerSendBulkShardOperationsRequest( - List operations, Consumer handler, Consumer errorHandler); + protected abstract void innerSendBulkShardOperationsRequest(List operations, long leaderMaxSeqNoOfUpdatesOrDeletes, + Consumer handler, Consumer errorHandler); protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, Consumer errorHandler); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index d473091f80c31..c5dab5360daeb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -133,9 +133,11 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro @Override protected void innerSendBulkShardOperationsRequest( final List operations, + final long maxSeqNoOfUpdatesOrDeletes, final Consumer handler, final Consumer errorHandler) { - final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + final BulkShardOperationsRequest request = new BulkShardOperationsRequest( + params.getFollowShardId(), operations, maxSeqNoOfUpdatesOrDeletes); followerClient.execute(BulkShardOperationsAction.INSTANCE, request, ActionListener.wrap(response -> handler.accept(response), errorHandler)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java index c28789fb580a8..80efba7831e74 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java @@ -17,29 +17,37 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest { private List operations; + private long maxSeqNoOfUpdatesOrDeletes; public BulkShardOperationsRequest() { } - public BulkShardOperationsRequest(final ShardId shardId, final List operations) { + public BulkShardOperationsRequest(ShardId shardId, List operations, long maxSeqNoOfUpdatesOrDeletes) { super(shardId); setRefreshPolicy(RefreshPolicy.NONE); this.operations = operations; + this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; } public List getOperations() { return operations; } + public long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes; + } + @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); + maxSeqNoOfUpdatesOrDeletes = in.readZLong(); operations = in.readList(Translog.Operation::readOperation); } @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); + out.writeZLong(maxSeqNoOfUpdatesOrDeletes); out.writeVInt(operations.size()); for (Translog.Operation operation : operations) { Translog.Operation.writeOperation(out, operation); @@ -50,6 +58,7 @@ public void writeTo(final StreamOutput out) throws IOException { public String toString() { return "BulkShardOperationsRequest{" + "operations=" + operations.size()+ + ", maxSeqNoUpdates=" + maxSeqNoOfUpdatesOrDeletes + ", shardId=" + shardId + ", timeout=" + timeout + ", index='" + index + '\'' + diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 14f01b7754957..a2d11b3aea6e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -60,13 +61,15 @@ public TransportBulkShardOperationsAction( @Override protected WritePrimaryResult shardOperationOnPrimary( final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { - return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger); + return shardOperationOnPrimary( + request.shardId(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } // public for testing purposes only public static WritePrimaryResult shardOperationOnPrimary( final ShardId shardId, final List sourceOperations, + final long maxSeqNoOfUpdatesOrDeletes, final IndexShard primary, final Logger logger) throws IOException { final List targetOperations = sourceOperations.stream().map(operation -> { @@ -103,16 +106,19 @@ public static WritePrimaryResult= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]"; + primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY); - final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations); + final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest( + shardId, targetOperations, maxSeqNoOfUpdatesOrDeletes); return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } @Override protected WriteReplicaResult shardOperationOnReplica( final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { + assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() : + "mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]"; final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA); return new WriteReplicaResult<>(request, location, null, replica, logger); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 78715654a05e3..248c1ed0c54e0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -30,11 +31,15 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -59,8 +64,10 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -221,6 +228,7 @@ public void testFollowIndex() throws Exception { assertBusy(assertExpectedDocumentRunnable(i)); } unfollowIndex("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards); } public void testSyncMappings() throws Exception { @@ -258,6 +266,7 @@ public void testSyncMappings() throws Exception { assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); unfollowIndex("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 2); } public void testNoMappingDefined() throws Exception { @@ -284,7 +293,8 @@ public void testNoMappingDefined() throws Exception { } public void testFollowIndex_backlog() throws Exception { - String leaderIndexSettings = getIndexSettings(between(1, 5), between(0, 1), + int numberOfShards = between(1, 5); + String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); BulkProcessor.Listener listener = new BulkProcessor.Listener() { @@ -334,6 +344,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertSameDocCount("index1", "index2"); unfollowIndex("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards); } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337") @@ -379,6 +390,7 @@ public void testFollowIndexAndCloseNode() throws Exception { assertSameDocCount("index1", "index2"); unfollowIndex("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 3); } public void testFollowIndexWithNestedField() throws Exception { @@ -419,6 +431,7 @@ public void testFollowIndexWithNestedField() throws Exception { }); } unfollowIndex("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1); } public void testUnfollowNonExistingIndex() { @@ -482,6 +495,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception { assertBusy(assertExpectedDocumentRunnable(i)); } unfollowIndex("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1); } public void testDontFollowTheWrongIndex() throws Exception { @@ -529,6 +543,53 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { "this setting is managed via a dedicated API")); } + public void testTransferMaxSeqNoOfUpdates() throws Exception { + int numberOfReplicas = between(0, 2); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); + String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON).get()); + FollowIndexAction.Request followRequest = createFollowRequest("leader-index", "follower-index"); + CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + ensureGreen("leader-index", "follower-index"); + int numDocs = between(1, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("leader-index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + } + atLeastDocsIndexed("follower-index", between(1, numDocs)); + for (String node : internalCluster().nodesInclude("follower-index")) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("follower-index"), 1)); + if (shard != null) { + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); + } + } + AtomicLong maxSeqNoOfUpdates = new AtomicLong(-1L); + for (int i = 0; i < numDocs; i++) { + long seqNo = numDocs + i; + if (randomBoolean()) { + client().prepareIndex("leader-index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); + maxSeqNoOfUpdates.set(seqNo); + } else if (randomBoolean()) { + client().prepareDelete("leader-index", "doc", Integer.toString(i)).get(); + maxSeqNoOfUpdates.set(seqNo); + } + } + logger.info("--> waiting for max_seq_no_of_updates to be replicated"); + assertBusy(() -> { + for (String node : internalCluster().nodesInclude("follower-index")) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("follower-index"), 1)); + if (shard != null) { + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNoOfUpdates.get())); + } + } + }); + assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("leader-index"), resolveIndex("follower-index"), 1); + unfollowIndex("follower-index"); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); @@ -715,6 +776,44 @@ private void assertSameDocCount(String index1, String index2) throws Exception { }, 60, TimeUnit.SECONDS); } + private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index followerIndex, int numberOfShards) throws Exception { + assertBusy(() -> { + long[] msuOnLeader = new long[numberOfShards]; + for (int i = 0; i < msuOnLeader.length; i++) { + msuOnLeader[i] = SequenceNumbers.UNASSIGNED_SEQ_NO; + } + Set leaderNodes = internalCluster().nodesInclude(leaderIndex.getName()); + for (String leaderNode : leaderNodes) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, leaderNode); + for (int i = 0; i < numberOfShards; i++) { + IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i)); + if (shard != null) { + try { + msuOnLeader[i] = SequenceNumbers.max(msuOnLeader[i], shard.getMaxSeqNoOfUpdatesOrDeletes()); + } catch (AlreadyClosedException ignored) { + return; + } + } + } + } + + Set followerNodes = internalCluster().nodesInclude(followerIndex.getName()); + for (String followerNode : followerNodes) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, followerNode); + for (int i = 0; i < numberOfShards; i++) { + IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i)); + if (shard != null) { + try { + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(msuOnLeader[i])); + } catch (AlreadyClosedException ignored) { + + } + } + } + } + }); + } + public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) { FollowIndexAction.Request request = new FollowIndexAction.Request(); request.setLeaderIndex(leaderIndex); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index 04cf45f9d2c79..a99e930188cf0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -15,6 +15,7 @@ protected ShardChangesAction.Response createTestInstance() { final long mappingVersion = randomNonNegativeLong(); final long leaderGlobalCheckpoint = randomNonNegativeLong(); final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE); + final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(-1, Long.MAX_VALUE); final int numOps = randomInt(8); final Translog.Operation[] operations = new Translog.Operation[numOps]; for (int i = 0; i < numOps; i++) { @@ -24,6 +25,7 @@ protected ShardChangesAction.Response createTestInstance() { mappingVersion, leaderGlobalCheckpoint, leaderMaxSeqNo, + maxSeqNoOfUpdatesOrDeletes, operations ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index e7d0987223bb9..1ff5e6c799bf5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -112,6 +112,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro @Override protected void innerSendBulkShardOperationsRequest( List operations, + long maxSeqNoOfUpdates, Consumer handler, Consumer errorHandler) { for(Translog.Operation op : operations) { @@ -157,7 +158,8 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co assert from >= testRun.finalExpectedGlobalCheckpoint; final long globalCheckpoint = tracker.getCheckpoint(); final long maxSeqNo = tracker.getMaxSeqNo(); - handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, new Translog.Operation[0])); + handler.accept(new ShardChangesAction.Response( + 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0])); } }; threadPool.generic().execute(task); @@ -231,6 +233,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, + randomNonNegativeLong(), ops.toArray(EMPTY)) ) ); @@ -253,6 +256,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, + randomNonNegativeLong(), EMPTY ); item.add(new TestResponse(null, mappingVersion, response)); @@ -269,6 +273,7 @@ private static TestRun createTestRun(long startSeqNo, long startMappingVersion, mappingVersion, localLeaderGCP, localLeaderGCP, + randomNonNegativeLong(), ops.toArray(EMPTY) ); item.add(new TestResponse(null, mappingVersion, response)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 7e813ae4cf67e..68ed46d9541a9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -407,7 +407,7 @@ public void testReceiveNothingExpectedSomething() { assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); shardChangesRequests.clear(); - task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, new Translog.Operation[0])); + task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0])); assertThat(shardChangesRequests.size(), equalTo(1)); assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); @@ -714,6 +714,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro @Override protected void innerSendBulkShardOperationsRequest( final List operations, + final long maxSeqNoOfUpdates, final Consumer handler, final Consumer errorHandler) { bulkShardOperationRequests.add(operations); @@ -749,6 +750,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con mappingVersions.poll(), leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), + randomNonNegativeLong(), operations ); handler.accept(response); @@ -785,6 +787,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, + randomNonNegativeLong(), ops.toArray(new Translog.Operation[0]) ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 0bb263d3c4406..2009d74f7c707 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -240,10 +240,12 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro @Override protected void innerSendBulkShardOperationsRequest( final List operations, + final long maxSeqNoOfUpdates, final Consumer handler, final Consumer errorHandler) { Runnable task = () -> { - BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + BulkShardOperationsRequest request = new BulkShardOperationsRequest( + params.getFollowShardId(), operations, maxSeqNoOfUpdates); ActionListener listener = ActionListener.wrap(handler::accept, errorHandler); new CCRAction(request, listener, followerGroup).execute(); }; @@ -262,8 +264,10 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co for (IndexShard indexShard : indexShards) { try { final SeqNoStats seqNoStats = indexShard.seqNoStats(); + final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); if (from > seqNoStats.getGlobalCheckpoint()) { - handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, ShardChangesAction.EMPTY_OPERATIONS_ARRAY)); + handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, + maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY)); return; } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, @@ -273,6 +277,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co 1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), + maxSeqNoOfUpdatesOrDeletes, ops ); handler.accept(response); @@ -315,6 +320,9 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le for (IndexShard followingShard : follower) { assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps)); } + for (IndexShard followingShard : follower) { + assertThat(followingShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getPrimary().getMaxSeqNoOfUpdatesOrDeletes())); + } } class CCRAction extends ReplicationAction { @@ -327,7 +335,7 @@ class CCRAction extends ReplicationAction result = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(), - primary, logger); + request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 4c6c0c060e45a..88e6d4113d318 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -59,7 +59,8 @@ public void testPrimaryTermFromFollower() throws IOException { } final TransportWriteAction.WritePrimaryResult result = - TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger); + TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, + numOps - 1, followerPrimary, logger); try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); From adc5ae9a469ccaeb5960198949360bc7f448ec8b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Sep 2018 18:16:13 -0400 Subject: [PATCH 2/3] pass max_seq_no_of_updates --- .../xpack/ccr/action/ShardFollowNodeTask.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 516e022500b29..45daff9b0368e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -57,7 +57,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long leaderGlobalCheckpoint; private long leaderMaxSeqNo; - private volatile long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; private long lastRequestedSeqNo; private long followerGlobalCheckpoint = 0; private long followerMaxSeqNo = 0; @@ -203,7 +203,7 @@ private synchronized void coordinateWrites() { numConcurrentWrites++; LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), ops.get(ops.size() - 1).seqNo(), ops.size()); - sendBulkShardOperationsRequest(ops); + sendBulkShardOperationsRequest(ops, leaderMaxSeqNoOfUpdatesOrDeletes, new AtomicInteger(0)); } } @@ -294,11 +294,8 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar } } - private void sendBulkShardOperationsRequest(List operations) { - sendBulkShardOperationsRequest(operations, new AtomicInteger(0)); - } - - private void sendBulkShardOperationsRequest(List operations, AtomicInteger retryCounter) { + private void sendBulkShardOperationsRequest(List operations, long leaderMaxSeqNoOfUpdatesOrDeletes, + AtomicInteger retryCounter) { assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated"; final long startTime = relativeTimeProvider.getAsLong(); innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, @@ -315,7 +312,8 @@ private void sendBulkShardOperationsRequest(List operations, totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); numberOfFailedBulkOperations++; } - handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter)); + handleFailure(e, retryCounter, + () -> sendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, retryCounter)); } ); } From c82b3d0b32e0dfe57e4186b0e76e0da921a3988b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Sep 2018 18:17:01 -0400 Subject: [PATCH 3/3] remove test --- .../xpack/ccr/ShardChangesIT.java | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 248c1ed0c54e0..c491b0231beb5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -67,7 +67,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -543,53 +542,6 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { "this setting is managed via a dedicated API")); } - public void testTransferMaxSeqNoOfUpdates() throws Exception { - int numberOfReplicas = between(0, 2); - internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); - String leaderIndexSettings = getIndexSettings(1, numberOfReplicas, - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON).get()); - FollowIndexAction.Request followRequest = createFollowRequest("leader-index", "follower-index"); - CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); - client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); - ensureGreen("leader-index", "follower-index"); - int numDocs = between(1, 100); - for (int i = 0; i < numDocs; i++) { - client().prepareIndex("leader-index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); - } - atLeastDocsIndexed("follower-index", between(1, numDocs)); - for (String node : internalCluster().nodesInclude("follower-index")) { - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("follower-index"), 1)); - if (shard != null) { - assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); - } - } - AtomicLong maxSeqNoOfUpdates = new AtomicLong(-1L); - for (int i = 0; i < numDocs; i++) { - long seqNo = numDocs + i; - if (randomBoolean()) { - client().prepareIndex("leader-index", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get(); - maxSeqNoOfUpdates.set(seqNo); - } else if (randomBoolean()) { - client().prepareDelete("leader-index", "doc", Integer.toString(i)).get(); - maxSeqNoOfUpdates.set(seqNo); - } - } - logger.info("--> waiting for max_seq_no_of_updates to be replicated"); - assertBusy(() -> { - for (String node : internalCluster().nodesInclude("follower-index")) { - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexShard shard = indicesService.getShardOrNull(new ShardId(resolveIndex("follower-index"), 1)); - if (shard != null) { - assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNoOfUpdates.get())); - } - } - }); - assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("leader-index"), resolveIndex("follower-index"), 1); - unfollowIndex("follower-index"); - } - private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();