From 8f7b6184b332c83e5c2aad809d5f30e0fcc53396 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 29 May 2017 19:55:44 -0400 Subject: [PATCH 1/2] Fill gaps on primary promotion When a primary is promoted, it could have gaps in its history due to concurrency and in-flight operations when it was serving as a replica. This commit fills the gaps in the history of the promoted shard after all operations from the previous term have drained, and future operations are blocked. This commit does not handle replicating the no-ops that fill the gaps to any remaining replicas, that is the responsibility of the primary/replica sync that we are laying the ground work for. --- .../elasticsearch/index/shard/IndexShard.java | 5 +- .../index/shard/IndexShardTests.java | 68 ++++++++++++++++++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6a79026125243..13abf5537859a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -369,7 +369,10 @@ public void updatePrimaryTerm(final long newPrimaryTerm) { indexShardOperationPermits.asyncBlockOperations( 30, TimeUnit.MINUTES, - latch::await, + () -> { + latch.await(); + getEngine().fillSeqNoGaps(newPrimaryTerm); + }, e -> failShard("exception during primary term transition", e)); primaryTerm = newPrimaryTerm; latch.countDown(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e7aa3c61b4e81..12e57f824e8ca 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -131,7 +131,9 @@ import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.VersionType.EXTERNAL; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsString; @@ -141,6 +143,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; /** @@ -388,6 +391,69 @@ public void onFailure(Exception e) { closeShards(indexShard); } + public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { + final IndexShard indexShard = newStartedShard(false); + + // most of the time this is large enough that most of the time there will be at least one gap + final int operations = 1024 - scaledRandomIntBetween(0, 1024); + int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED); + boolean gap = false; + for (int i = 0; i < operations; i++) { + final String id = Integer.toString(i); + final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null); + if (rarely()) { + gap = true; + final Term uid = new Term("_id", doc.id()); + final Engine.Index index = + new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false); + indexShard.index(index); + max = i; + } + } + + final int maxSeqNo = max; + if (gap) { + assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo))); + } + + // promote the replica + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + TestShardRouting.newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + + /* + * This operation completing means that the delay operation executed as part of increasing the primary term has completed and the + * gaps are filled. + */ + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC); + + latch.await(); + assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo)); + closeShards(indexShard); + } + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -1172,7 +1238,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException { test = otherShard.prepareIndexOnReplica( SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(), XContentType.JSON), - 1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); + 1, 1, 1, EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); otherShard.index(test); final ShardRouting primaryShardRouting = shard.routingEntry(); From 184fabaa88b23d145f9fc9ff3497dc43580b92ea Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 12:16:30 -0400 Subject: [PATCH 2/2] Not! --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 12e57f824e8ca..38cac70b5e3e7 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -401,13 +401,14 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception { for (int i = 0; i < operations; i++) { final String id = Integer.toString(i); final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null); - if (rarely()) { - gap = true; + if (!rarely()) { final Term uid = new Term("_id", doc.id()); final Engine.Index index = new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false); indexShard.index(index); max = i; + } else { + gap = true; } }