From 3c24dffeecc3ceb6fe27ac83a20c9d98a79f341a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 1 Feb 2019 14:08:14 -0700 Subject: [PATCH 01/13] WIP --- .../index/shard/StoreRecovery.java | 5 + .../xpack/ccr/IndexFollowingIT.java | 161 +++++++++++------- 2 files changed, 109 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index feb48ef85d1ba..87d44f6a91ccd 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -467,6 +467,11 @@ private void restore(final IndexShard indexShard, final Repository repository, f store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + if (maxSeqNo != localCheckpoint) { + int i = 0; + } + logger.error("FUCK: " + localCheckpoint); final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 74c44704e2e1c..fddcfb3413d15 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -75,9 +75,13 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; @@ -199,72 +203,115 @@ public void testFollowIndex() throws Exception { } public void testFollowIndexWithConcurrentMappingChanges() throws Exception { - final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureLeaderYellow("index1"); + for (int n = 0; n < 10; ++n) { + logger.error("FUCK " + n); + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); - final int firstBatchNumDocs = randomIntBetween(2, 64); - logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - for (int i = 0; i < firstBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } + int nThreads = 4; + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + AtomicBoolean isRunning = new AtomicBoolean(true); + char[] chars = "abcde".toCharArray(); + int docsPerChar = 200; + try { + CountDownLatch firstTwoLatch = new CountDownLatch(nThreads); + CountDownLatch doneLatch = new CountDownLatch(nThreads); + for (int i = 0; i < nThreads; ++i) { + final int initialDocId = 100000 * (i + 1); + executorService.execute(() -> { + int currentDocId = initialDocId; + for (char c : chars) { + if (isRunning.get() == false) { + break; + } + if (c == 'c') { + firstTwoLatch.countDown(); + } + for (int j = 0; j < docsPerChar; j++) { + long valueToPutInDoc = randomLongBetween(0, 50000); + final String source; + if (randomBoolean()) { + source = String.format(Locale.ROOT, "{\"%c\":%d, \"f\": %d}", c, valueToPutInDoc, currentDocId); + } else { + source = String.format(Locale.ROOT, "{\"%c\":\"%d\", \"f\": %d}", c, valueToPutInDoc, currentDocId); + } + if (isRunning.get() == false) { + break; + } + leaderClient().prepareIndex("index1", "doc", Long.toString(currentDocId)) + .setSource(source, XContentType.JSON).get(); + currentDocId++; + } + if (randomBoolean()) { + leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); + } + } + doneLatch.countDown(); + }); + } - AtomicBoolean isRunning = new AtomicBoolean(true); + firstTwoLatch.await(); - // Concurrently index new docs with mapping changes - Thread thread = new Thread(() -> { - int docID = 10000; - char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); - for (char c : chars) { - if (isRunning.get() == false) { - break; - } - final String source; - long valueToPutInDoc = randomLongBetween(0, 50000); - if (randomBoolean()) { - source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc); - } else { - source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc); - } - for (int i = 1; i < 10; i++) { - if (isRunning.get() == false) { - break; - } - leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get(); - if (rarely()) { - leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); - } - } - leaderClient().admin().indices().prepareFlush("index1").setForce(true).setWaitIfOngoing(true).get(); - } - }); - thread.start(); + final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); - followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + ensureFollowerGreen("index2"); - ensureFollowerGreen("index2"); + doneLatch.await(); - for (int i = 0; i < firstBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); - } + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } - final int secondBatchNumDocs = randomIntBetween(2, 64); - logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); - } + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } - isRunning.set(false); - thread.join(); + for (int i = 0; i < nThreads; ++i) { + int currentDocId = 100000 * (i + 1); + for (int j = 0; j < chars.length * docsPerChar; ++j) { + try { + assertBusy(assertExpectedDocumentRunnable(currentDocId++)); + } catch (AssertionError e) { + final Map followerDocsPerShard = new HashMap<>(); + final ShardStats[] followerShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : followerShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + followerDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + int f = 0; + } + } + } + } finally { + isRunning.set(false); + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.SECONDS); + } + afterTest(); + } } public void testFollowIndexWithoutWaitForComplete() throws Exception { From e7b6e7c798cf0f90167ee42d13956341f2d673e9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 1 Feb 2019 16:17:33 -0700 Subject: [PATCH 02/13] WIP --- .../org/elasticsearch/index/shard/StoreRecovery.java | 10 ++++------ .../org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 3 +-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 87d44f6a91ccd..75ae78d9444cc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -464,16 +464,14 @@ private void restore(final IndexShard indexShard, final Repository repository, f repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); final Store store = indexShard.store(); - store.bootstrapNewHistory(); +// store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - if (maxSeqNo != localCheckpoint) { - int i = 0; - } - logger.error("FUCK: " + localCheckpoint); + logger.error("Max Sequence Number: " + maxSeqNo); + logger.error("Local Checkpoint: " + localCheckpoint); final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index fddcfb3413d15..1c084c79f8875 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -81,7 +81,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; @@ -204,7 +203,7 @@ public void testFollowIndex() throws Exception { public void testFollowIndexWithConcurrentMappingChanges() throws Exception { for (int n = 0; n < 10; ++n) { - logger.error("FUCK " + n); + logger.error("Follow Iteration: " + n); final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); From 58b125ec3528f822357a44279d9f0912f0ebc09f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 1 Feb 2019 16:25:26 -0700 Subject: [PATCH 03/13] WIP --- .../main/java/org/elasticsearch/index/shard/StoreRecovery.java | 1 + .../test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 75ae78d9444cc..9a9efe82293c1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -476,6 +476,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); +// indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint()); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); } catch (Exception e) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 1c084c79f8875..ddb40a4e11f81 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -202,7 +202,7 @@ public void testFollowIndex() throws Exception { } public void testFollowIndexWithConcurrentMappingChanges() throws Exception { - for (int n = 0; n < 10; ++n) { + for (int n = 0; n < 5; ++n) { logger.error("Follow Iteration: " + n); final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), From 07b8a6de6c9e4f684c76bd1b817fccd9dffef2fb Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 1 Feb 2019 19:22:45 -0700 Subject: [PATCH 04/13] Changes --- .../xpack/ccr/IndexFollowingIT.java | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index ddb40a4e11f81..cfa702e325c04 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -218,7 +218,7 @@ public void testFollowIndexWithConcurrentMappingChanges() throws Exception { } leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); - int nThreads = 4; + int nThreads = 6; ExecutorService executorService = Executors.newFixedThreadPool(nThreads); AtomicBoolean isRunning = new AtomicBoolean(true); char[] chars = "abcde".toCharArray(); @@ -288,20 +288,7 @@ public void testFollowIndexWithConcurrentMappingChanges() throws Exception { for (int i = 0; i < nThreads; ++i) { int currentDocId = 100000 * (i + 1); for (int j = 0; j < chars.length * docsPerChar; ++j) { - try { - assertBusy(assertExpectedDocumentRunnable(currentDocId++)); - } catch (AssertionError e) { - final Map followerDocsPerShard = new HashMap<>(); - final ShardStats[] followerShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : followerShardStats) { - if (shardStats.getShardRouting().primary()) { - long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - followerDocsPerShard.put(shardStats.getShardRouting().shardId(), value); - } - } - int f = 0; - } + assertBusy(assertExpectedDocumentRunnable(currentDocId++)); } } } finally { From 40b400e362d577ba18d13ed0155604ce345cf854 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Sun, 3 Feb 2019 13:53:12 +0100 Subject: [PATCH 05/13] adapt bootstrapNewHistory --- .../org/elasticsearch/index/shard/StoreRecovery.java | 7 +++---- .../java/org/elasticsearch/index/store/Store.java | 11 ++++++----- .../snapshots/SourceOnlySnapshotRepository.java | 3 ++- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 9a9efe82293c1..5225ec1e7b111 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -397,9 +397,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe assert indexShouldExists; store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm()); + indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } else if (indexShouldExists) { if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { @@ -464,7 +464,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); final Store store = indexShard.store(); -// store.bootstrapNewHistory(); + store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); @@ -476,7 +476,6 @@ private void restore(final IndexShard indexShard, final Repository repository, f assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); -// indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint()); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 66e3e4d5558d8..73ac8a65d3007 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1428,26 +1428,27 @@ public void bootstrapNewHistory() throws IOException { try { Map userData = readLastCommittedSegmentsInfo().getUserData(); final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); - bootstrapNewHistory(maxSeqNo); + final long localCheckpoint = Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + bootstrapNewHistory(localCheckpoint, maxSeqNo); } finally { metadataLock.writeLock().unlock(); } } /** - * Marks an existing lucene index with a new history uuid and sets the given maxSeqNo as the local checkpoint + * Marks an existing lucene index with a new history uuid and sets the given local checkpoint * as well as the maximum sequence number. - * This is used to make sure no existing shard will recovery from this index using ops based recovery. + * This is used to make sure no existing shard will recover from this index using ops based recovery. * @see SequenceNumbers#LOCAL_CHECKPOINT_KEY * @see SequenceNumbers#MAX_SEQ_NO */ - public void bootstrapNewHistory(long maxSeqNo) throws IOException { + public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException { metadataLock.writeLock().lock(); try (IndexWriter writer = newAppendingIndexWriter(directory, null)) { final Map map = new HashMap<>(); map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); updateCommitData(writer, map); } finally { metadataLock.writeLock().unlock(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 18e96619ec822..3e36fc5977491 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -128,7 +128,8 @@ protected void closeInternal() { snapshot.syncSnapshot(snapshotIndexCommit); // we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo(); - tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc()); + final long maxDoc = segmentInfos.totalMaxDoc(); + tempStore.bootstrapNewHistory(maxDoc, maxDoc); store.incRef(); try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) { IndexCommit indexCommit = reader.getIndexCommit(); From bb71b3db84fe92a9f4b46f25fac7ee7cc0f3eafe Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 4 Feb 2019 10:08:29 +0100 Subject: [PATCH 06/13] Add test for FollowEngine --- .../index/shard/StoreRecovery.java | 3 - .../index/shard/IndexShardTests.java | 128 ++------------- .../index/shard/RestoreOnlyRepository.java | 146 ++++++++++++++++++ .../engine/FollowEngineIndexShardTests.java | 78 ++++++++++ 4 files changed, 234 insertions(+), 121 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 5225ec1e7b111..f30a31bc9aa9b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -466,10 +466,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f final Store store = indexShard.store(); store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - logger.error("Max Sequence Number: " + maxSeqNo); - logger.error("Local Checkpoint: " + localCheckpoint); final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 12a7fad466e29..a21fab0d02fdd 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; @@ -46,8 +45,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -62,7 +59,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; @@ -107,7 +103,6 @@ import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreUtils; @@ -121,12 +116,8 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; -import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.FieldMaskingReader; @@ -143,7 +134,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -174,7 +164,6 @@ import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -2162,6 +2151,8 @@ public void testRestoreShard() throws IOException { IndexShard target = newStartedShard(true); indexDoc(source, "_doc", "0"); + EngineTestCase.generateNewSeqNo(source.getEngine()); + indexDoc(source, "_doc", "2"); if (randomBoolean()) { source.refresh("test"); } @@ -2197,16 +2188,18 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio } } })); - assertThat(target.getLocalCheckpoint(), equalTo(0L)); - assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L)); - assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L)); + assertThat(target.getLocalCheckpoint(), equalTo(2L)); + assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard( - target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L)); + target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(2L)); - assertDocs(target, "0"); + assertDocs(target, "0", "2"); - closeShards(source, target); + closeShard(source, false); + closeShards(target); } public void testSearcherWrapperIsUsed() throws IOException { @@ -3131,107 +3124,6 @@ private Result indexOnReplicaWithGaps( return new Result(localCheckpoint, max); } - /** A dummy repository for testing which just needs restore overridden */ - private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { - private final String indexName; - - RestoreOnlyRepository(String indexName) { - this.indexName = indexName; - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public RepositoryMetaData getMetadata() { - return null; - } - - @Override - public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { - return null; - } - - @Override - public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { - return null; - } - - @Override - public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { - return null; - } - - @Override - public RepositoryData getRepositoryData() { - Map> map = new HashMap<>(); - map.put(new IndexId(indexName, "blah"), emptySet()); - return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList()); - } - - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { - } - - @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, - int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState) { - return null; - } - - @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { - } - - @Override - public long getSnapshotThrottleTimeInNanos() { - return 0; - } - - @Override - public long getRestoreThrottleTimeInNanos() { - return 0; - } - - @Override - public String startVerification() { - return null; - } - - @Override - public void endVerification(String verificationToken) { - } - - @Override - public boolean isReadOnly() { - return false; - } - - @Override - public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus) { - } - - @Override - public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { - return null; - } - - @Override - public void verify(String verificationToken, DiscoveryNode localNode) { - } - } - public void testIsSearchIdle() throws Exception { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java new file mode 100644 index 0000000000000..11bdfb7bcc741 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -0,0 +1,146 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotShardFailure; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Collections.emptySet; +import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; + +/** A dummy repository for testing which just needs restore overridden */ +public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { + private final String indexName; + + public RestoreOnlyRepository(String indexName) { + this.indexName = indexName; + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public RepositoryMetaData getMetadata() { + return null; + } + + @Override + public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { + return null; + } + + @Override + public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { + return null; + } + + @Override + public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { + return null; + } + + @Override + public RepositoryData getRepositoryData() { + Map> map = new HashMap<>(); + map.put(new IndexId(indexName, "blah"), emptySet()); + return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map, Collections.emptyList()); + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { + } + + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState) { + return null; + } + + @Override + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { + } + + @Override + public long getSnapshotThrottleTimeInNanos() { + return 0; + } + + @Override + public long getRestoreThrottleTimeInNanos() { + return 0; + } + + @Override + public String startVerification() { + return null; + } + + @Override + public void endVerification(String verificationToken) { + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus) { + } + + @Override + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + return null; + } + + @Override + public void verify(String verificationToken, DiscoveryNode localNode) { + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index 8d3c0c3b472aa..1326f0ebc79bb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -5,27 +5,46 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.apache.lucene.store.IOContext; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.RestoreOnlyRepository; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrSettings; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.CountDownLatch; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class FollowEngineIndexShardTests extends IndexShardTestCase { @@ -76,4 +95,63 @@ public void testDoNotFillGaps() throws Exception { closeShards(indexShard); } + public void testRestoreShard() throws IOException { + final Settings sourceSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + final IndexShard source = newStartedShard(true, sourceSettings); + final Settings targetSettings = Settings.builder() + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + IndexShard target = newStartedShard(true, targetSettings, new FollowingEngineFactory()); + assertThat(IndexShardTestCase.getEngine(target), instanceOf(FollowingEngine.class)); + + indexDoc(source, "_doc", "0"); + EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(source)); + indexDoc(source, "_doc", "2"); + if (randomBoolean()) { + source.refresh("test"); + } + flushShard(source); // only flush source + ShardRouting routing = ShardRoutingHelper.initWithSameId(target.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.INSTANCE); + final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); + routing = ShardRoutingHelper.newWithRestoreSource(routing, + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); + target = reinitShard(target, routing); + Store sourceStore = source.store(); + Store targetStore = target.store(); + + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); + assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + @Override + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { + try { + cleanLuceneIndex(targetStore.directory()); + for (String file : sourceStore.directory().listAll()) { + if (file.equals("write.lock") || file.startsWith("extra")) { + continue; + } + targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + })); + assertThat(target.getLocalCheckpoint(), equalTo(0L)); + assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); + assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); + + assertDocs(target, "0", "2"); + + closeShard(source, false); + closeShards(target); + } + } From b4c85e2d81117dead55226d84af102e05c636914 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 4 Feb 2019 20:23:19 +0100 Subject: [PATCH 07/13] add tests --- .../elasticsearch/test/BackgroundIndexer.java | 6 + .../ccr/action/TransportPutFollowAction.java | 18 +- .../elasticsearch/xpack/CcrIntegTestCase.java | 69 +++++ .../xpack/ccr/IndexFollowingIT.java | 281 ++++++++---------- 4 files changed, 202 insertions(+), 172 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java index eabb05a537ca7..ed3a836d2c506 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java @@ -52,6 +52,7 @@ public class BackgroundIndexer implements AutoCloseable { private final Logger logger = LogManager.getLogger(getClass()); final Thread[] writers; + final Client client; final CountDownLatch stopLatch; final CopyOnWriteArrayList failures; final AtomicBoolean stop = new AtomicBoolean(false); @@ -122,6 +123,7 @@ public BackgroundIndexer(final String index, final String type, final Client cli if (random == null) { random = RandomizedTest.getRandom(); } + this.client = client; useAutoGeneratedIDs = random.nextBoolean(); failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; @@ -316,6 +318,10 @@ public void close() throws Exception { stop(); } + public Client getClient() { + return client; + } + /** * Returns the ID set of all documents indexed by this indexer run */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 27f3b60fb5291..4746858041960 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -217,17 +217,13 @@ private void initiateFollowing( final PutFollowAction.Request request, final ActionListener listener) { assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; - activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()}, - request.waitForActiveShards(), request.timeout(), result -> { - if (result) { - client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( - r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), - listener::onFailure - )); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, listener::onFailure); + client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( + r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()}, + request.waitForActiveShards(), request.timeout(), result -> + listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())), + listener::onFailure), + listener::onFailure + )); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 2f34315b46e69..2bdbbad6861a7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -58,6 +59,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -94,12 +96,15 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.empty; @@ -553,6 +558,70 @@ protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index fol }); } + /** + * Waits until at least a give number of document is visible for searchers + * + * @param numDocs number of documents to wait for + * @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. Will be first checked for documents indexed. + * This saves on unneeded searches. + * @return the actual number of docs seen. + */ + public long waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws InterruptedException { + // indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED. + return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer); + } + + /** + * Waits until at least a give number of document is visible for searchers + * + * @param numDocs number of documents to wait for + * @param maxWaitTime if not progress have been made during this time, fail the test + * @param maxWaitTimeUnit the unit in which maxWaitTime is specified + * @param indexer Will be first checked for documents indexed. + * This saves on unneeded searches. + * @return the actual number of docs seen. + */ + public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final BackgroundIndexer indexer) + throws InterruptedException { + final AtomicLong lastKnownCount = new AtomicLong(-1); + long lastStartCount = -1; + BooleanSupplier testDocs = () -> { + lastKnownCount.set(indexer.totalIndexedDocs()); + if (lastKnownCount.get() >= numDocs) { + try { + long count = indexer.getClient().prepareSearch() + .setTrackTotalHits(true) + .setSize(0) + .setQuery(matchAllQuery()) + .get() + .getHits().getTotalHits().value; + + if (count == lastKnownCount.get()) { + // no progress - try to refresh for the next time + indexer.getClient().admin().indices().prepareRefresh().get(); + } + lastKnownCount.set(count); + } catch (Exception e) { // count now acts like search and barfs if all shards failed... + logger.debug("failed to executed count", e); + return false; + } + logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); + } else { + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); + } + return lastKnownCount.get() >= numDocs; + }; + + while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { + if (lastStartCount == lastKnownCount.get()) { + // we didn't make any progress + fail("failed to reach " + numDocs + "docs"); + } + lastStartCount = lastKnownCount.get(); + } + return lastKnownCount.get(); + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index cfa702e325c04..dc8aadb1c4a2f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -54,6 +53,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; @@ -75,9 +75,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -112,69 +109,141 @@ public void testFollowIndex() throws Exception { } else { firstBatchNumDocs = randomIntBetween(10, 64); } - final int flushPoint = (int) (firstBatchNumDocs * 0.75); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - BulkRequestBuilder bulkRequestBuilder = leaderClient().prepareBulk(); - for (int i = 0; i < flushPoint; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - IndexRequest indexRequest = new IndexRequest("index1", "doc", Integer.toString(i)) - .source(source, XContentType.JSON) - .timeout(TimeValue.timeValueSeconds(1)); - bulkRequestBuilder.add(indexRequest); + try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, + randomIntBetween(1, 5))) { + waitForDocs(randomInt(firstBatchNumDocs), indexer); + leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); + waitForDocs(firstBatchNumDocs, indexer); + indexer.assertNoFailures(); + + boolean waitOnAll = randomBoolean(); + + final PutFollowAction.Request followRequest; + if (waitOnAll) { + followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); + } else { + followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); + } + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); + ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).get().getIndices().get("index2"); + for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + if (waitOnAll) { + assertTrue(shardHealth.isPrimaryActive()); + assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + } else { + assertTrue(shardHealth.isPrimaryActive()); + } + } + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + + pauseFollow("index2"); + followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); + final int secondBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); + indexer.continueIndexing(secondBatchNumDocs); + + final Map secondBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] secondBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : secondBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); + + for (String docId : indexer.getIds()) { + assertBusy(() -> { + final GetResponse getResponse = followerClient().prepareGet("index2", "_doc", docId).get(); + assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists()); + }); + } + pauseFollow("index2"); + assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } - bulkRequestBuilder.get(); + } - leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get(); + public void testFollowIndexWithConcurrentMappingChanges() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); - // Index some docs after the flush that might be recovered in the normal index following operations - for (int i = flushPoint; i < firstBatchNumDocs; i++) { + final int firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - boolean waitOnAll = randomBoolean(); - - final PutFollowAction.Request followRequest; - if (waitOnAll) { - followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); - } else { - followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); - } - PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - assertTrue(response.isFollowIndexCreated()); - assertTrue(response.isFollowIndexShardsAcked()); - assertTrue(response.isIndexFollowingStarted()); + AtomicBoolean isRunning = new AtomicBoolean(true); - ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); - ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2"); - for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { - if (waitOnAll) { - assertTrue(shardHealth.isPrimaryActive()); - assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); - } else { - assertTrue(shardHealth.isPrimaryActive()); + // Concurrently index new docs with mapping changes + Thread thread = new Thread(() -> { + int docID = 10000; + char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); + for (char c : chars) { + if (isRunning.get() == false) { + break; + } + final String source; + long valueToPutInDoc = randomLongBetween(0, 50000); + if (randomBoolean()) { + source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc); + } else { + source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc); + } + for (int i = 1; i < 10; i++) { + if (isRunning.get() == false) { + break; + } + leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); + } + } + leaderClient().admin().indices().prepareFlush("index1").setForce(true).setWaitIfOngoing(true).get(); } - } + }); + thread.start(); - final Map firstBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] firstBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : firstBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); - } - } + final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + ensureFollowerGreen("index2"); for (int i = 0; i < firstBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - pauseFollow("index2"); - followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { @@ -182,122 +251,12 @@ public void testFollowIndex() throws Exception { leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - final Map secondBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] secondBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : secondBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); - } - } - - assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); - for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - pauseFollow("index2"); - assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); - } - - public void testFollowIndexWithConcurrentMappingChanges() throws Exception { - for (int n = 0; n < 5; ++n) { - logger.error("Follow Iteration: " + n); - final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureLeaderYellow("index1"); - - final int firstBatchNumDocs = randomIntBetween(2, 64); - logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); - for (int i = 0; i < firstBatchNumDocs; i++) { - final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); - leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - } - leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); - - int nThreads = 6; - ExecutorService executorService = Executors.newFixedThreadPool(nThreads); - AtomicBoolean isRunning = new AtomicBoolean(true); - char[] chars = "abcde".toCharArray(); - int docsPerChar = 200; - try { - CountDownLatch firstTwoLatch = new CountDownLatch(nThreads); - CountDownLatch doneLatch = new CountDownLatch(nThreads); - for (int i = 0; i < nThreads; ++i) { - final int initialDocId = 100000 * (i + 1); - executorService.execute(() -> { - int currentDocId = initialDocId; - for (char c : chars) { - if (isRunning.get() == false) { - break; - } - if (c == 'c') { - firstTwoLatch.countDown(); - } - for (int j = 0; j < docsPerChar; j++) { - long valueToPutInDoc = randomLongBetween(0, 50000); - final String source; - if (randomBoolean()) { - source = String.format(Locale.ROOT, "{\"%c\":%d, \"f\": %d}", c, valueToPutInDoc, currentDocId); - } else { - source = String.format(Locale.ROOT, "{\"%c\":\"%d\", \"f\": %d}", c, valueToPutInDoc, currentDocId); - } - if (isRunning.get() == false) { - break; - } - leaderClient().prepareIndex("index1", "doc", Long.toString(currentDocId)) - .setSource(source, XContentType.JSON).get(); - currentDocId++; - } - if (randomBoolean()) { - leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); - } - } - doneLatch.countDown(); - }); - } - - firstTwoLatch.await(); - - final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); - followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - - ensureFollowerGreen("index2"); - doneLatch.await(); - - final Map firstBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] firstBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : firstBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); - } - } - - assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); - - for (int i = 0; i < firstBatchNumDocs; i++) { - assertBusy(assertExpectedDocumentRunnable(i)); - } - - for (int i = 0; i < nThreads; ++i) { - int currentDocId = 100000 * (i + 1); - for (int j = 0; j < chars.length * docsPerChar; ++j) { - assertBusy(assertExpectedDocumentRunnable(currentDocId++)); - } - } - } finally { - isRunning.set(false); - executorService.shutdown(); - executorService.awaitTermination(30, TimeUnit.SECONDS); - } - afterTest(); - } + isRunning.set(false); + thread.join(); } public void testFollowIndexWithoutWaitForComplete() throws Exception { From 2699d33e62c7931747b41b90f760eb1ac16a1a32 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 4 Feb 2019 20:26:05 +0100 Subject: [PATCH 08/13] cleanups --- .../main/java/org/elasticsearch/index/shard/StoreRecovery.java | 2 +- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index f30a31bc9aa9b..d15de54c54e99 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -466,7 +466,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f final Store store = indexShard.store(); store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); - final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a21fab0d02fdd..d2c16a7ee41e2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2151,7 +2151,7 @@ public void testRestoreShard() throws IOException { IndexShard target = newStartedShard(true); indexDoc(source, "_doc", "0"); - EngineTestCase.generateNewSeqNo(source.getEngine()); + EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history indexDoc(source, "_doc", "2"); if (randomBoolean()) { source.refresh("test"); From 6b0a2a7ff5dbe9414435d2a780551fd3905b2563 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 4 Feb 2019 17:04:07 -0600 Subject: [PATCH 09/13] WIP --- .../src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 2bdbbad6861a7..01d759383c8b3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; From 681de0e1ae5083fed910d330e90b4cf5682a756e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 5 Feb 2019 09:40:19 +0100 Subject: [PATCH 10/13] fix test --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d2c16a7ee41e2..66ac074b5990a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2148,7 +2148,8 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc public void testRestoreShard() throws IOException { final IndexShard source = newStartedShard(true); - IndexShard target = newStartedShard(true); + final IndexShard target = newStartedShard(true, Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build()); indexDoc(source, "_doc", "0"); EngineTestCase.generateNewSeqNo(source.getEngine()); // create a gap in the history From 6105bd33d98c0d50c2dc186c32b9cd73d1e2337d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 5 Feb 2019 09:43:18 +0100 Subject: [PATCH 11/13] no final --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 66ac074b5990a..2851f43b1b990 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2148,7 +2148,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc public void testRestoreShard() throws IOException { final IndexShard source = newStartedShard(true); - final IndexShard target = newStartedShard(true, Settings.builder() + IndexShard target = newStartedShard(true, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), source.indexSettings().isSoftDeleteEnabled()).build()); indexDoc(source, "_doc", "0"); From 1274346dbafb6cee49b6080b59c7999d91f96097 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Feb 2019 10:46:42 -0600 Subject: [PATCH 12/13] checkstyle --- .../elasticsearch/xpack/ccr/action/TransportPutFollowAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 9281a1ad1d6ef..18218656d26a2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.repository.CcrRepository; -import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; From 49143083da74a455e5f6e9ce8589e6058fb73ba3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Feb 2019 11:18:35 -0600 Subject: [PATCH 13/13] Fix --- .../xpack/ccr/action/TransportPutFollowAction.java | 7 ++++++- .../java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 18218656d26a2..aa94071ac1d35 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; @@ -217,7 +218,11 @@ private void initiateFollowing( final PutFollowAction.Request request, final ActionListener listener) { assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; - client.execute(ResumeFollowAction.INSTANCE, request, ActionListener.wrap( + FollowParameters parameters = request.getParameters(); + ResumeFollowAction.Request resumeFollowRequest = new ResumeFollowAction.Request(); + resumeFollowRequest.setFollowerIndex(request.getFollowerIndex()); + resumeFollowRequest.setParameters(new FollowParameters(parameters)); + client.execute(ResumeFollowAction.INSTANCE, resumeFollowRequest, ActionListener.wrap( r -> activeShardsObserver.waitForActiveShards(new String[]{request.getFollowerIndex()}, request.waitForActiveShards(), request.timeout(), result -> listener.onResponse(new PutFollowAction.Response(true, result, r.isAcknowledged())), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 80e4e16c10b2e..80bded6a5d1d3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -171,7 +171,7 @@ public void testFollowIndex() throws Exception { } pauseFollow("index2"); - followerClient().execute(ResumeFollowAction.INSTANCE, followRequest).get(); + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); indexer.continueIndexing(secondBatchNumDocs);