From dd105be2d4d8b32e4c621fbc5bcf02a1830f4123 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 16 Feb 2019 14:16:28 -0500 Subject: [PATCH 1/4] Use target checkpoint to verify sending ops in recovery --- .../recovery/RecoverySourceHandler.java | 6 +- .../recovery/RecoverySourceHandlerTests.java | 11 +- .../ESIndexLevelReplicationTestCase.java | 9 +- .../ShardFollowTaskReplicationTests.java | 573 +++++++++++------- 4 files changed, 370 insertions(+), 229 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e7a8fbfb523a1..f5763b7b41663 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -546,7 +546,6 @@ void phase2( final AtomicInteger skippedOps = new AtomicInteger(); final AtomicInteger totalSentOps = new AtomicInteger(); - final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1); final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch. final CheckedSupplier, IOException> readNextBatch = () -> { // We need to synchronized Snapshot#next() because it's called by different threads through sendBatch. @@ -568,7 +567,6 @@ void phase2( ops.add(operation); batchSizeInBytes += operation.estimateSize(); totalSentOps.incrementAndGet(); - requiredOpsTracker.markSeqNoAsCompleted(seqNo); // check if this request is past bytes threshold, and if so, send it off if (batchSizeInBytes >= chunkSizeInBytes) { @@ -586,10 +584,10 @@ void phase2( assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); - if (requiredOpsTracker.getCheckpoint() < endingSeqNo) { + if (targetLocalCheckpoint < endingSeqNo) { throw new IllegalStateException("translog replay failed to cover required sequence numbers" + " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" - + (requiredOpsTracker.getCheckpoint() + 1) + "]"); + + (targetLocalCheckpoint + 1) + "]"); } stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 8391827b2f83c..46a43d1dc1e22 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -62,6 +62,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; @@ -235,18 +236,19 @@ public void testSendSnapshotSendsOps() throws IOException { final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); final List shippedOps = new ArrayList<>(); - final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final AtomicReference checkpointTrackerHolder = new AtomicReference<>(); RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); - checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); - maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); + operations.forEach(op -> checkpointTrackerHolder.get().markSeqNoAsCompleted(op.seqNo())); + maybeExecuteAsync(() -> listener.onResponse(checkpointTrackerHolder.get().getCheckpoint())); } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); + checkpointTrackerHolder.set(new LocalCheckpointTracker(endingSeqNo, requiredStartingSeqNo - 1)); handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); @@ -257,7 +259,7 @@ public void indexTranslogOperations(List operations, int tot for (int i = 0; i < shippedOps.size(); i++) { assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } - assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get())); + assertThat(result.targetLocalCheckpoint, equalTo(checkpointTrackerHolder.get().getCheckpoint())); if (endingSeqNo >= requiredStartingSeqNo + 1) { // check that missing ops blows up List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker @@ -265,6 +267,7 @@ public void indexTranslogOperations(List operations, int tot List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); PlainActionFuture failedFuture = new PlainActionFuture<>(); expectThrows(IllegalStateException.class, () -> { + checkpointTrackerHolder.set(new LocalCheckpointTracker(endingSeqNo, requiredStartingSeqNo - 1)); handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture); failedFuture.actionGet(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c396cdfe84570..37fc1c748c189 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -267,9 +267,7 @@ public synchronized int startReplicas(int numOfReplicasToStart) throws IOExcepti } public void startPrimary() throws IOException { - final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); - primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); - primary.recoverFromStore(); + recoverPrimary(primary); HashSet activeIds = new HashSet<>(); activeIds.addAll(activeIds()); activeIds.add(primary.routingEntry().allocationId().getId()); @@ -302,6 +300,11 @@ assert shardRoutings().stream() updateAllocationIDsOnPrimary(); } + protected synchronized void recoverPrimary(IndexShard primary) { + final DiscoveryNode pNode = getDiscoveryNode(primary.routingEntry().currentNodeId()); + primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(), pNode, null)); + primary.recoverFromStore(); + } public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardPath, final String nodeId) throws IOException { final ShardRouting shardRouting = TestShardRouting.newShardRouting( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index dac1e70b86d8f..3b2dfaeaec296 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -7,6 +7,8 @@ import com.carrotsearch.hppc.LongHashSet; import com.carrotsearch.hppc.LongSet; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.IOContext; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -17,9 +19,15 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -27,13 +35,21 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.RestoreOnlyRepository; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -57,6 +73,8 @@ import java.util.function.LongConsumer; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -64,233 +82,241 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { public void testSimpleCcrReplication() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(randomInt(2)); - ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + try (ReplicationGroup leaderGroup = createLeaderGroup(randomInt(2))) { leaderGroup.startAll(); - int docCount = leaderGroup.appendDocs(randomInt(64)); - leaderGroup.assertAllEqual(docCount); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( + try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, randomInt(2))) { + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - docCount += leaderGroup.appendDocs(randomInt(128)); - leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size()); - }); - for (IndexShard shard : followerGroup) { - assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); - } - // Deletes should be replicated to the follower - List deleteDocIds = randomSubsetOf(indexedDocIds); - for (String deleteId : deleteDocIds) { - BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId)); - assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED)); + docCount += leaderGroup.appendDocs(randomInt(128)); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + for (IndexShard shard : followerGroup) { + assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); + } + // Deletes should be replicated to the follower + List deleteDocIds = randomSubsetOf(indexedDocIds); + for (String deleteId : deleteDocIds) { + BulkItemResponse resp = leaderGroup.delete(new DeleteRequest(index.getName(), "type", deleteId)); + assertThat(resp.getResponse().getResult(), equalTo(DocWriteResponse.Result.DELETED)); + } + leaderGroup.syncGlobalCheckpoint(); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size()); + }); + shardFollowTask.markAsCompleted(); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); } - leaderGroup.syncGlobalCheckpoint(); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size() - deleteDocIds.size()); - }); - shardFollowTask.markAsCompleted(); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); } } public void testAddRemoveShardOnLeader() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1)); - ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) { + try (ReplicationGroup leaderGroup = createLeaderGroup(1 + randomInt(1))) { leaderGroup.startAll(); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( + try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, randomInt(2))) { + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - int batches = between(0, 10); - int docCount = 0; - boolean hasPromotion = false; - for (int i = 0; i < batches; i++) { - docCount += leaderGroup.indexDocs(between(1, 5)); - if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) { - IndexShard closingReplica = randomFrom(leaderGroup.getReplicas()); - leaderGroup.removeReplica(closingReplica); - closingReplica.close("test", false); - closingReplica.store().close(); - } else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) { - IndexShard newPrimary = randomFrom(leaderGroup.getReplicas()); - leaderGroup.promoteReplicaToPrimary(newPrimary).get(); - hasPromotion = true; - } else if (randomInt(100) < 5) { - leaderGroup.addReplica(); - leaderGroup.startReplicas(1); + int batches = between(0, 10); + int docCount = 0; + boolean hasPromotion = false; + for (int i = 0; i < batches; i++) { + docCount += leaderGroup.indexDocs(between(1, 5)); + if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) { + IndexShard closingReplica = randomFrom(leaderGroup.getReplicas()); + leaderGroup.removeReplica(closingReplica); + closingReplica.close("test", false); + closingReplica.store().close(); + } else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) { + IndexShard newPrimary = randomFrom(leaderGroup.getReplicas()); + leaderGroup.promoteReplicaToPrimary(newPrimary).get(); + hasPromotion = true; + } else if (randomInt(100) < 5) { + leaderGroup.addReplica(); + leaderGroup.startReplicas(1); + } + leaderGroup.syncGlobalCheckpoint(); } - leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + assertThat(shardFollowTask.getFailure(), nullValue()); + int expectedDoc = docCount; + assertBusy(() -> followerGroup.assertAllEqual(expectedDoc)); + shardFollowTask.markAsCompleted(); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, hasPromotion == false); } - leaderGroup.assertAllEqual(docCount); - assertThat(shardFollowTask.getFailure(), nullValue()); - int expectedDoc = docCount; - assertBusy(() -> followerGroup.assertAllEqual(expectedDoc)); - shardFollowTask.markAsCompleted(); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, hasPromotion == false); } } public void testChangeLeaderHistoryUUID() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(0); - ReplicationGroup followerGroup = createFollowGroup(0)) { - leaderGroup.startAll(); - int docCount = leaderGroup.appendDocs(randomInt(64)); - leaderGroup.assertAllEqual(docCount); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( - followerGroup.getPrimary().getHistoryUUID(), - leaderSeqNoStats.getGlobalCheckpoint(), - leaderSeqNoStats.getMaxSeqNo(), - followerSeqNoStats.getGlobalCheckpoint(), - followerSeqNoStats.getMaxSeqNo()); - leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size()); - }); - - String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); - leaderGroup.reinitPrimaryShard(); - leaderGroup.getPrimary().store().bootstrapNewHistory(); - recoverShardFromStore(leaderGroup.getPrimary()); - String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); - - // force the global checkpoint on the leader to advance - leaderGroup.appendDocs(64); - - assertBusy(() -> { - assertThat(shardFollowTask.isStopped(), is(true)); - ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); - assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + - "], actual [" + newHistoryUUID + "]")); - }); + try (ReplicationGroup leaderGroup = createLeaderGroup(0)) { + try (ReplicationGroup followerGroup = createFollowGroup(leaderGroup, 0)) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + leaderGroup.reinitPrimaryShard(); + leaderGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(leaderGroup.getPrimary()); + String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); + assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "]")); + }); + } } } public void testChangeFollowerHistoryUUID() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(0); - ReplicationGroup followerGroup = createFollowGroup(0)) { + try (ReplicationGroup leaderGroup = createLeaderGroup(0)) { leaderGroup.startAll(); - int docCount = leaderGroup.appendDocs(randomInt(64)); - leaderGroup.assertAllEqual(docCount); - followerGroup.startAll(); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); - final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start( - followerGroup.getPrimary().getHistoryUUID(), - leaderSeqNoStats.getGlobalCheckpoint(), - leaderSeqNoStats.getMaxSeqNo(), - followerSeqNoStats.getGlobalCheckpoint(), - followerSeqNoStats.getMaxSeqNo()); - leaderGroup.syncGlobalCheckpoint(); - leaderGroup.assertAllEqual(docCount); - Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); - followerGroup.assertAllEqual(indexedDocIds.size()); - }); - - String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); - followerGroup.reinitPrimaryShard(); - followerGroup.getPrimary().store().bootstrapNewHistory(); - recoverShardFromStore(followerGroup.getPrimary()); - String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); - - // force the global checkpoint on the leader to advance - leaderGroup.appendDocs(64); - - assertBusy(() -> { - assertThat(shardFollowTask.isStopped(), is(true)); - ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); - assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + - "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated")); - }); + try(ReplicationGroup followerGroup = createFollowGroup(leaderGroup, 0)) { + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); + followerGroup.reinitPrimaryShard(); + followerGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(followerGroup.getPrimary()); + String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); + + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + ElasticsearchException failure = shardFollowTask.getStatus().getFatalException(); + assertThat(failure.getRootCause().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated")); + }); + } } } public void testRetryBulkShardOperations() throws Exception { - try (ReplicationGroup leaderGroup = createGroup(between(0, 1)); - ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) { + try (ReplicationGroup leaderGroup = createLeaderGroup(between(0, 1))) { leaderGroup.startAll(); - followerGroup.startAll(); - leaderGroup.appendDocs(between(10, 100)); - leaderGroup.refresh("test"); - for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) { - long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1; - Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(), - Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i); - for (IndexShard shard : leaderGroup) { - getEngine(shard).noOp(noOp); + try(ReplicationGroup followerGroup = createFollowGroup(leaderGroup, between(1, 3))) { + followerGroup.startAll(); + leaderGroup.appendDocs(between(10, 100)); + leaderGroup.refresh("test"); + for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) { + long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1; + Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(), + Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i); + for (IndexShard shard : leaderGroup) { + getEngine(shard).noOp(noOp); + } } - } - for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { - BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); - assertThat(resp.getFailure(), nullValue()); - } - leaderGroup.syncGlobalCheckpoint(); - IndexShard leadingPrimary = leaderGroup.getPrimary(); - // Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower - // but the primary of the follower crashed before these requests completed. - for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) { - long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint()); - long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint()); - int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo); - Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(), - fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); - - IndexShard followingPrimary = followerGroup.getPrimary(); - TransportWriteAction.WritePrimaryResult primaryResult = - TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(), - followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), - followingPrimary, logger); - for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) { - final PlainActionFuture permitFuture = new PlainActionFuture<>(); - replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(), - followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), - permitFuture, ThreadPool.Names.SAME, primaryResult); - try (Releasable ignored = permitFuture.get()) { - TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger); + for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { + BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); + assertThat(resp.getFailure(), nullValue()); + } + leaderGroup.syncGlobalCheckpoint(); + IndexShard leadingPrimary = leaderGroup.getPrimary(); + // Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower + // but the primary of the follower crashed before these requests completed. + for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) { + long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint()); + long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint()); + int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo); + Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(), + fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); + + IndexShard followingPrimary = followerGroup.getPrimary(); + TransportWriteAction.WritePrimaryResult primaryResult = + TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(), + followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + followingPrimary, logger); + for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) { + final PlainActionFuture permitFuture = new PlainActionFuture<>(); + replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(), + followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + permitFuture, ThreadPool.Names.SAME, primaryResult); + try (Releasable ignored = permitFuture.get()) { + TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger); + } } } - } - // A follow-task retries these requests while the primary-replica resync is happening on the follower. - followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas())); - ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); - SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); - shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(), - leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - try { - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); - }); - } finally { - shardFollowTask.markAsCompleted(); + // A follow-task retries these requests while the primary-replica resync is happening on the follower. + followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas())); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), + leadingPrimary.getGlobalCheckpoint(), + leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + try { + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup, true); + }); + } finally { + shardFollowTask.markAsCompleted(); + } } } } @@ -303,7 +329,17 @@ public void testAddNewFollowingReplica() throws Exception { operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1)); } Future recoveryFuture = null; - try (ReplicationGroup group = createFollowGroup(between(0, 1))) { + Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) + .build(); + IndexMetaData indexMetaData = buildIndexMetaData(between(0, 1), settings, indexMapping); + try (ReplicationGroup group = new ReplicationGroup(indexMetaData) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return new FollowingEngineFactory(); + } + }) { group.startAll(); while (operations.isEmpty() == false) { List bulkOps = randomSubsetOf(between(1, operations.size()), operations); @@ -330,35 +366,136 @@ public void testAddNewFollowingReplica() throws Exception { } } - @Override - protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { - Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) - .put(settings) - .build(); - if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) { - IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping); - return new ReplicationGroup(metaData) { + public void testSimpleRemoteRecovery() throws Exception { + try (ReplicationGroup leader = createLeaderGroup(between(0, 1))) { + leader.startAll(); + leader.appendDocs(between(0, 100)); + leader.flush(); + leader.syncGlobalCheckpoint(); + try (ReplicationGroup follower = createFollowGroup(leader, 0)) { + follower.startAll(); + ShardFollowNodeTask followTask = createShardFollowTask(leader, follower); + followTask.start( + follower.getPrimary().getHistoryUUID(), + leader.getPrimary().getGlobalCheckpoint(), + leader.getPrimary().seqNoStats().getMaxSeqNo(), + follower.getPrimary().getGlobalCheckpoint(), + follower.getPrimary().seqNoStats().getMaxSeqNo() + ); + leader.appendDocs(between(0, 100)); + if (randomBoolean()) { + follower.recoverReplica(follower.addReplica()); + } + assertBusy(() -> assertConsistentHistoryBetweenLeaderAndFollower(leader, follower, false)); + followTask.markAsCompleted(); + } + } + } - @Override - protected EngineFactory getEngineFactory(ShardRouting routing) { - return new FollowingEngineFactory(); + public void testRemoteRecoveryWithGapsInIndexCommit() throws Exception { + try (ReplicationGroup leader = createLeaderGroup(0)) { + leader.startAll(); + int numOps = between(0, 50); + List ops = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + final String id = Integer.toString(between(1, numOps)); + if (randomBoolean()) { + ops.add(new Translog.Index("_doc", id, i, 1L, 1L, "{}".getBytes(StandardCharsets.UTF_8), null, -1)); + } else { + ops.add(new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, 1L, 1L)); } - }; - } else { - return super.createGroup(replicas, newSettings); + } + Randomness.shuffle(ops); + int recoveredOps = 0; + for (int i = 0; i < ops.size(); i++) { + leader.getPrimary().advanceMaxSeqNoOfUpdatesOrDeletes(ops.get(i).seqNo()); + leader.getPrimary().applyTranslogOperation(ops.get(i), Engine.Operation.Origin.LOCAL_RESET); + if (randomInt(100) < 10) { + leader.getPrimary().flush(new FlushRequest()); + recoveredOps = i + 1; + } + } + leader.getPrimary().updateLocalCheckpointForShard(leader.getPrimary().routingEntry().allocationId().getId(), numOps - 1); + leader.syncGlobalCheckpoint(); + try (ReplicationGroup follower = createFollowGroup(leader, 0)) { + follower.startAll(); + try (Translog.Snapshot snapshot = follower.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + assertThat(snapshot.totalOperations(), equalTo(recoveredOps)); + } + ShardFollowNodeTask followTask = createShardFollowTask(leader, follower); + followTask.start( + follower.getPrimary().getHistoryUUID(), + leader.getPrimary().getGlobalCheckpoint(), + leader.getPrimary().seqNoStats().getMaxSeqNo(), + follower.getPrimary().getGlobalCheckpoint(), + follower.getPrimary().seqNoStats().getMaxSeqNo() + ); + if (randomBoolean()) { + leader.appendDocs(between(1, 20)); + } + if (randomBoolean()) { + follower.recoverReplica(follower.addReplica()); + } + if (randomBoolean()) { + leader.appendDocs(between(1, 20)); + } + assertBusy(() -> { + for (IndexShard shard : follower) { + assertThat(getShardDocUIDs(shard), equalTo(getShardDocUIDs(leader.getPrimary()))); + } + }); + followTask.markAsCompleted(); + } } } - private ReplicationGroup createFollowGroup(int replicas) throws IOException { - Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + private ReplicationGroup createLeaderGroup(int replicas) throws IOException { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) + .build(); + return createGroup(replicas, settings); + } + + private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException { + Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)); - return createGroup(replicas, settingsBuilder.build()); + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) + .build(); + IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping); + return new ReplicationGroup(indexMetaData) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return new FollowingEngineFactory(); + } + @Override + protected synchronized void recoverPrimary(IndexShard primary) { + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); + ShardRouting routing = ShardRoutingHelper.newWithRestoreSource(primary.routingEntry(), + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); + primary.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null)); + primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) { + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, + IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + try { + IndexShard leader = leaderGroup.getPrimary(); + Lucene.cleanLuceneIndex(primary.store().directory()); + try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { + Store.MetadataSnapshot sourceSnapshot = leader.store().getMetadata(sourceCommit.getIndexCommit()); + for (StoreFileMetaData md : sourceSnapshot) { + primary.store().directory().copyFrom( + leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); + } + } + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }); + } + }; } private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { @@ -483,7 +620,7 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le final List> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream() .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); final Set> operationsOnLeader = new HashSet<>(); - try (Translog.Snapshot snapshot = leader.getPrimary().getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType())); @@ -497,13 +634,13 @@ private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup le .map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList()); assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader)); final Set> operationsOnFollower = new HashSet<>(); - try (Translog.Snapshot snapshot = followingShard.getHistoryOperations("test", 0)) { + try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType())); } } - assertThat(operationsOnFollower, equalTo(operationsOnLeader)); + assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader)); } } From 01e2e952f5a0b13684d748bf3ea964329c1c60bb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 19 Feb 2019 17:01:54 -0500 Subject: [PATCH 2/4] remove check --- .../recovery/RecoverySourceHandler.java | 38 +----------- .../recovery/RecoverySourceHandlerTests.java | 35 +++-------- .../ShardFollowTaskReplicationTests.java | 60 ------------------- 3 files changed, 12 insertions(+), 121 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f5763b7b41663..6bca848a361fa 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -158,14 +158,12 @@ public void recoverToTarget(ActionListener listener) { final Closeable retentionLock = shard.acquireRetentionLock(); resources.add(retentionLock); final long startingSeqNo; - final long requiredSeqNoRangeStart; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()); final SendFileResult sendFileResult; if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); - requiredSeqNoRangeStart = startingSeqNo; sendFileResult = SendFileResult.EMPTY; } else { final Engine.IndexCommitRef phase1Snapshot; @@ -174,9 +172,6 @@ public void recoverToTarget(ActionListener listener) { } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } - // We must have everything above the local checkpoint in the commit - requiredSeqNoRangeStart = - Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will // still filter out legacy operations without seqNo. startingSeqNo = 0; @@ -194,8 +189,6 @@ public void recoverToTarget(ActionListener listener) { } } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" - + startingSeqNo + "]"; final StepListener prepareEngineStep = new StepListener<>(); // For a sequence based recovery, the target can keep its local translog @@ -213,13 +206,7 @@ public void recoverToTarget(ActionListener listener) { shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); if (logger.isTraceEnabled()) { - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); logger.trace("snapshot translog for recovery; current size is [{}]", shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); } @@ -232,15 +219,8 @@ public void recoverToTarget(ActionListener listener) { final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); final RetentionLeases retentionLeases = shard.getRetentionLeases(); - phase2( - startingSeqNo, - requiredSeqNoRangeStart, - endingSeqNo, - phase2Snapshot, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - sendSnapshotStep); + phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, + retentionLeases, sendSnapshotStep); sendSnapshotStep.whenComplete( r -> IOUtils.close(phase2Snapshot), e -> { @@ -518,7 +498,6 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A * * @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all * ops should be sent - * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary @@ -527,22 +506,16 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A */ void phase2( final long startingSeqNo, - final long requiredSeqNoRangeStart, final long endingSeqNo, final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp, final long maxSeqNoOfUpdatesOrDeletes, final RetentionLeases retentionLeases, final ActionListener listener) throws IOException { - assert requiredSeqNoRangeStart <= endingSeqNo + 1: - "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; - assert startingSeqNo <= requiredSeqNoRangeStart : - "startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart; if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " + - "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); + logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]"); final AtomicInteger skippedOps = new AtomicInteger(); final AtomicInteger totalSentOps = new AtomicInteger(); @@ -584,11 +557,6 @@ void phase2( assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get()); - if (targetLocalCheckpoint < endingSeqNo) { - throw new IllegalStateException("translog replay failed to cover required sequence numbers" + - " (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is [" - + (targetLocalCheckpoint + 1) + "]"); - } stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase2]: took [{}]", tookTime); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 46a43d1dc1e22..ae6eae8656403 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -62,7 +62,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; @@ -98,7 +97,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.zip.CRC32; import static java.util.Collections.emptyMap; @@ -232,24 +230,22 @@ public void testSendSnapshotSendsOps() throws IOException { operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true))); } final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1); - final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); - final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1); + final long endingSeqNo = randomLongBetween(startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1); final List shippedOps = new ArrayList<>(); - final AtomicReference checkpointTrackerHolder = new AtomicReference<>(); + final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); - operations.forEach(op -> checkpointTrackerHolder.get().markSeqNoAsCompleted(op.seqNo())); - maybeExecuteAsync(() -> listener.onResponse(checkpointTrackerHolder.get().getCheckpoint())); + checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); + maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); } }; RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); - checkpointTrackerHolder.set(new LocalCheckpointTracker(endingSeqNo, requiredStartingSeqNo - 1)); - handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), + handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); RecoverySourceHandler.SendSnapshotResult result = future.actionGet(); @@ -259,20 +255,7 @@ public void indexTranslogOperations(List operations, int tot for (int i = 0; i < shippedOps.size(); i++) { assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs))); } - assertThat(result.targetLocalCheckpoint, equalTo(checkpointTrackerHolder.get().getCheckpoint())); - if (endingSeqNo >= requiredStartingSeqNo + 1) { - // check that missing ops blows up - List requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker - .filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList()); - List opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps); - PlainActionFuture failedFuture = new PlainActionFuture<>(); - expectThrows(IllegalStateException.class, () -> { - checkpointTrackerHolder.set(new LocalCheckpointTracker(endingSeqNo, requiredStartingSeqNo - 1)); - handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip), - randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture); - failedFuture.actionGet(); - }); - } + assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get())); } public void testSendSnapshotStopOnError() throws Exception { @@ -302,7 +285,7 @@ public void indexTranslogOperations(List operations, int tot PlainActionFuture future = new PlainActionFuture<>(); final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); - handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), + handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); if (wasFailed.get()) { assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index")); @@ -501,11 +484,11 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A } @Override - void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, ActionListener listener) throws IOException { phase2Called.set(true); - super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, + super.phase2(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 3b2dfaeaec296..0f82bfab10287 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -7,7 +7,6 @@ import com.carrotsearch.hppc.LongHashSet; import com.carrotsearch.hppc.LongSet; -import org.apache.lucene.index.Term; import org.apache.lucene.store.IOContext; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -23,7 +22,6 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; @@ -35,7 +33,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -392,63 +389,6 @@ public void testSimpleRemoteRecovery() throws Exception { } } - public void testRemoteRecoveryWithGapsInIndexCommit() throws Exception { - try (ReplicationGroup leader = createLeaderGroup(0)) { - leader.startAll(); - int numOps = between(0, 50); - List ops = new ArrayList<>(); - for (int i = 0; i < numOps; i++) { - final String id = Integer.toString(between(1, numOps)); - if (randomBoolean()) { - ops.add(new Translog.Index("_doc", id, i, 1L, 1L, "{}".getBytes(StandardCharsets.UTF_8), null, -1)); - } else { - ops.add(new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, 1L, 1L)); - } - } - Randomness.shuffle(ops); - int recoveredOps = 0; - for (int i = 0; i < ops.size(); i++) { - leader.getPrimary().advanceMaxSeqNoOfUpdatesOrDeletes(ops.get(i).seqNo()); - leader.getPrimary().applyTranslogOperation(ops.get(i), Engine.Operation.Origin.LOCAL_RESET); - if (randomInt(100) < 10) { - leader.getPrimary().flush(new FlushRequest()); - recoveredOps = i + 1; - } - } - leader.getPrimary().updateLocalCheckpointForShard(leader.getPrimary().routingEntry().allocationId().getId(), numOps - 1); - leader.syncGlobalCheckpoint(); - try (ReplicationGroup follower = createFollowGroup(leader, 0)) { - follower.startAll(); - try (Translog.Snapshot snapshot = follower.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { - assertThat(snapshot.totalOperations(), equalTo(recoveredOps)); - } - ShardFollowNodeTask followTask = createShardFollowTask(leader, follower); - followTask.start( - follower.getPrimary().getHistoryUUID(), - leader.getPrimary().getGlobalCheckpoint(), - leader.getPrimary().seqNoStats().getMaxSeqNo(), - follower.getPrimary().getGlobalCheckpoint(), - follower.getPrimary().seqNoStats().getMaxSeqNo() - ); - if (randomBoolean()) { - leader.appendDocs(between(1, 20)); - } - if (randomBoolean()) { - follower.recoverReplica(follower.addReplica()); - } - if (randomBoolean()) { - leader.appendDocs(between(1, 20)); - } - assertBusy(() -> { - for (IndexShard shard : follower) { - assertThat(getShardDocUIDs(shard), equalTo(getShardDocUIDs(leader.getPrimary()))); - } - }); - followTask.markAsCompleted(); - } - } - } - private ReplicationGroup createLeaderGroup(int replicas) throws IOException { Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) From 81bf67b07d37ae1f19548d6ffbfbaa343fe94799 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 19 Feb 2019 17:10:44 -0500 Subject: [PATCH 3/4] await test --- .../test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index e0f71fe45155a..d9b75f416b38d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -114,7 +114,6 @@ public class IndexFollowingIT extends CcrIntegTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949") public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); int numberOfReplicas = between(0, 1); @@ -221,7 +220,6 @@ public void testFollowIndex() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949") public void testFollowIndexWithConcurrentMappingChanges() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), From ebcebd761d18689849e5a4e0a1743c6bc1952ca8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 23 Feb 2019 15:58:25 -0500 Subject: [PATCH 4/4] Remove waitForOpsToComplete in IndexShard and Engine --- .../elasticsearch/index/engine/Engine.java | 8 ----- .../index/engine/InternalEngine.java | 5 ---- .../index/engine/ReadOnlyEngine.java | 4 --- .../elasticsearch/index/shard/IndexShard.java | 10 ------- .../index/engine/EngineTestCase.java | 10 +++++++ .../index/engine/FollowingEngineTests.java | 30 +++++++++---------- 6 files changed, 25 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index dbe779864fe47..de393881c896b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -808,14 +808,6 @@ public final CommitStats commitStats() { */ public abstract long getLocalCheckpoint(); - /** - * Waits for all operations up to the provided sequence number to complete. - * - * @param seqNo the sequence number that the checkpoint must advance to before this method returns - * @throws InterruptedException if the thread was interrupted while blocking on the condition - */ - public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException; - /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 32354ab4b16d7..fb8ea628d4c9d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2415,11 +2415,6 @@ public long getLocalCheckpoint() { return localCheckpointTracker.getCheckpoint(); } - @Override - public void waitForOpsToComplete(long seqNo) throws InterruptedException { - localCheckpointTracker.waitForOpsToComplete(seqNo); - } - /** * Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c464a34e78b01..d5277a9f8683c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -331,10 +331,6 @@ public long getLocalCheckpoint() { return seqNoStats.getLocalCheckpoint(); } - @Override - public void waitForOpsToComplete(long seqNo) { - } - @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7a5ec6bd28685..06bb9243201f1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2042,16 +2042,6 @@ public void syncRetentionLeases() { } } - /** - * Waits for all operations up to the provided sequence number to complete. - * - * @param seqNo the sequence number that the checkpoint must advance to before this method returns - * @throws InterruptedException if the thread was interrupted while blocking on the condition - */ - public void waitForOpsToComplete(final long seqNo) throws InterruptedException { - getEngine().waitForOpsToComplete(seqNo); - } - /** * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group. diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index a6765e4e44fa6..e7b3f39747124 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1109,6 +1109,16 @@ public static Translog getTranslog(Engine engine) { return internalEngine.getTranslog(); } + /** + * Waits for all operations up to the provided sequence number to complete in the given internal engine. + * + * @param seqNo the sequence number that the checkpoint must advance to before this method returns + * @throws InterruptedException if the thread was interrupted while blocking on the condition + */ + public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws InterruptedException { + engine.getLocalCheckpointTracker().waitForOpsToComplete(seqNo); + } + public static boolean hasSnapshottedCommits(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 69fa23bd3fbcd..b8d44f76026f2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -338,7 +338,7 @@ public void testBasicOptimization() throws Exception { for (int i = 0; i < numDocs; i++) { leader.index(indexForPrimary(Integer.toString(i))); } - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); @@ -351,7 +351,7 @@ public void testBasicOptimization() throws Exception { leader.delete(deleteForPrimary(Integer.toString(i))); } } - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); @@ -363,7 +363,7 @@ public void testBasicOptimization() throws Exception { docIds.add(docId); leader.index(indexForPrimary(docId)); } - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); @@ -379,7 +379,7 @@ public void testOptimizeAppendOnly() throws Exception { runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps)); }); } @@ -397,13 +397,13 @@ public void testOptimizeMultipleVersions() throws Exception { Randomness.shuffle(ops); runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); final List appendOps = new ArrayList<>(); for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) { appendOps.add(indexForPrimary("append-" + i)); } EngineTestCase.concurrentlyApplyOps(appendOps, leader); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size())); }); } @@ -411,19 +411,19 @@ public void testOptimizeMultipleVersions() throws Exception { public void testOptimizeSingleDocSequentially() throws Exception { runFollowTest((leader, follower) -> { leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); leader.delete(deleteForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); }); } @@ -433,20 +433,20 @@ public void testOptimizeSingleDocConcurrently() throws Exception { Randomness.shuffle(ops); runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); long numOptimized = follower.getNumberOfOptimizedIndexing(); leader.delete(deleteForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); leader.index(indexForPrimary("id")); - follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + EngineTestCase.waitForOpsToComplete(follower, leader.getLocalCheckpoint()); assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); }); } @@ -473,7 +473,7 @@ private void runFollowTest(CheckedBiConsumer