Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}

/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ public void markSeqNoAsCompleted(final long seqNo) {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}

/**
* Resets the local checkpoint to the specified value.
*
* @param localCheckpoint the local checkpoint to reset to
*/
public void resetLocalCheckpoint(final long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

/**
* The current sequence number stats.
*
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,19 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;

Expand Down Expand Up @@ -236,4 +237,23 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte

thread.join();
}

public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat, I did not know about this method

for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
maxSeqNo = i;
}
}

final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
}
131 changes: 111 additions & 20 deletions core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -142,7 +143,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -405,26 +405,10 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
max = i;
} else {
gap = true;
}
}
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}
final int maxSeqNo = result.maxSeqNo;
final boolean gap = result.gap;

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
Expand Down Expand Up @@ -626,6 +610,12 @@ public void onFailure(Exception e) {
}
newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
}
final long expectedLocalCheckpoint;
if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
expectedLocalCheckpoint = newGlobalCheckPoint;
}
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
Expand All @@ -637,6 +627,7 @@ public void onFailure(Exception e) {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
Expand Down Expand Up @@ -697,6 +688,7 @@ private void finish() {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
Expand All @@ -707,6 +699,56 @@ private void finish() {
closeShards(indexShard);
}

public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final long globalCheckpointOnReplica =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.primaryTerm + 1,
globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
releasable.close();
latch.countDown();
}

@Override
public void onFailure(final Exception e) {

}
},
ThreadPool.Names.SAME);

latch.await();
if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO
&& globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
} else {
assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
}

// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));

closeShards(indexShard);
}

public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -1966,6 +2008,55 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

class Result {
private final int localCheckpoint;
private final int maxSeqNo;
private final boolean gap;

Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
this.localCheckpoint = localCheckpoint;
this.maxSeqNo = maxSeqNo;
this.gap = gap;
}
}

/**
* Index on the specified shard while introducing sequence number gaps.
*
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
for (int i = offset + 1; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
}
max = i;
} else {
gap = true;
}
}
assert localCheckpoint == indexShard.getLocalCheckpoint();
assert !gap || (localCheckpoint != max);
return new Result(localCheckpoint, max, gap);
}

/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;
Expand Down