Skip to content

Commit 1c71393

Browse files
committed
This is not the commit message you're looking for
May the git push --force be with you if you require a more elaborative commit message.
1 parent 3164819 commit 1c71393

File tree

6 files changed

+25
-11
lines changed

6 files changed

+25
-11
lines changed

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.index.engine.EngineClosedException;
5151
import org.elasticsearch.index.engine.VersionConflictEngineException;
5252
import org.elasticsearch.index.mapper.MapperParsingException;
53+
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
5354
import org.elasticsearch.index.seqno.SequenceNumbersService;
5455
import org.elasticsearch.index.shard.IndexShard;
5556
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -184,6 +185,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
184185
break;
185186
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
186187
}
188+
187189
// update the bulk item request because update request execution can mutate the bulk item request
188190
request.items()[requestIndex] = replicaRequest;
189191
if (operationResult == null) { // in case of noop update operation
@@ -346,6 +348,10 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
346348
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
347349
break;
348350
}
351+
assert (replicaRequest.request() instanceof IndexRequest
352+
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
353+
(replicaRequest.request() instanceof DeleteRequest
354+
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
349355
// successful operation
350356
break; // out of retry loop
351357
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
@@ -368,10 +374,10 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I
368374
switch (docWriteRequest.opType()) {
369375
case CREATE:
370376
case INDEX:
371-
operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
377+
operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica);
372378
break;
373379
case DELETE:
374-
operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
380+
operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica);
375381
break;
376382
default:
377383
throw new IllegalStateException("Unexpected request operation type on replica: "

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,9 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
178178
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
179179
// norelease
180180
/*
181-
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit. These means that we there
182-
* might be operations greater than the local checkpoint that will not be replayed. Here we force the local checkpoint to
183-
* the maximum sequence number in the commit (at the potential expense of correctness).
181+
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
182+
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
183+
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
184184
*/
185185
while (seqNoService.getLocalCheckpoint() < seqNoService.getMaxSeqNo()) {
186186
final long next = seqNoService.getLocalCheckpoint() + 1;
@@ -795,7 +795,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
795795
seqNo = delete.seqNo();
796796
}
797797

798-
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
798+
if (conflict) {
799799
// skip executing delete because of version conflict on recovery
800800
deleteResult = new DeleteResult(expectedVersion, seqNo, true);
801801
} else {

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ public synchronized long getCheckpoint() {
149149
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
150150
*/
151151
synchronized void updateCheckpointOnReplica(long globalCheckpoint) {
152+
/*
153+
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
154+
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
155+
* replica shards). In these cases, the local knowledge of the global checkpoint could be higher than sync from the lagging primary.
156+
*/
152157
if (this.globalCheckpoint <= globalCheckpoint) {
153158
this.globalCheckpoint = globalCheckpoint;
154159
logger.trace("global checkpoint updated from primary to [{}]", globalCheckpoint);

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,8 @@ public void finalizeRecovery() {
391391
StopWatch stopWatch = new StopWatch().start();
392392
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
393393
cancellableThreads.execute(() -> {
394-
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
395394
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId());
395+
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint());
396396
});
397397

398398
if (request.isPrimaryRelocation()) {

core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public interface RecoveryTargetHandler {
3939
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;
4040

4141
/**
42-
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage
43-
* collection of tombstone files, and updates the global checkpoint.
42+
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
43+
* updates the global checkpoint.
4444
*
4545
* @param globalCheckpoint the global checkpoint on the recovery source
4646
*/

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2990,6 +2990,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() {
29902990
}
29912991
}
29922992

2993+
final boolean exists = operations.get(operations.size() - 1) instanceof Engine.Index;
29932994
Randomness.shuffle(operations);
29942995

29952996
for (final Engine.Operation operation : operations) {
@@ -3001,6 +3002,8 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() {
30013002
}
30023003

30033004
assertThat(engine.seqNoService().getLocalCheckpoint(), equalTo((long) (numberOfOperations - 1)));
3005+
final Engine.GetResult result = engine.get(new Engine.Get(true, uid));
3006+
assertThat(result.exists(), equalTo(exists));
30043007
}
30053008

30063009
private Engine.Index indexOperation(final Term uid, final ParsedDocument doc, final int seqNo, final int version) {
@@ -3010,7 +3013,7 @@ private Engine.Index indexOperation(final Term uid, final ParsedDocument doc, fi
30103013
seqNo,
30113014
1,
30123015
version,
3013-
VersionType.INTERNAL,
3016+
VersionType.EXTERNAL,
30143017
Engine.Operation.Origin.PEER_RECOVERY,
30153018
System.nanoTime(),
30163019
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
@@ -3025,7 +3028,7 @@ private Engine.Delete deleteOperation(final String type, final String id, final
30253028
seqNo,
30263029
1,
30273030
version,
3028-
VersionType.INTERNAL,
3031+
VersionType.EXTERNAL,
30293032
Engine.Operation.Origin.PEER_RECOVERY,
30303033
System.nanoTime());
30313034
}

0 commit comments

Comments
 (0)