Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,10 @@ public void updatePrimaryTerm(final long newPrimaryTerm) {
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
latch::await,
() -> {
latch.await();
getEngine().fillSeqNoGaps(newPrimaryTerm);
},
e -> failShard("exception during primary term transition", e));
primaryTerm = newPrimaryTerm;
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@
import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.VersionType.EXTERNAL;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -141,6 +143,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -388,6 +391,70 @@ public void onFailure(Exception e) {
closeShards(indexShard);
}

public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {
final IndexShard indexShard = newStartedShard(false);

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, "test", null, new ParseContext.Document(), new BytesArray("{}"), null);
if (!rarely()) {
final Term uid = new Term("_id", doc.id());
final Engine.Index index =
new Engine.Index(uid, doc, i, indexShard.getPrimaryTerm(), 1, EXTERNAL, REPLICA, System.nanoTime(), -1, false);
indexShard.index(index);
max = i;
} else {
gap = true;
}
}

final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
TestShardRouting.newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);

/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
* gaps are filled.
*/
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquirePrimaryOperationPermit(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
latch.countDown();
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.GENERIC);

latch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo));
closeShards(indexShard);
}

public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
Expand Down Expand Up @@ -1172,7 +1239,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
test = otherShard.prepareIndexOnReplica(
SourceToParse.source(shard.shardId().getIndexName(), test.type(), test.id(), test.source(),
XContentType.JSON),
1, 1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
1, 1, 1, EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
otherShard.index(test);

final ShardRouting primaryShardRouting = shard.routingEntry();
Expand Down