Skip to content

Commit 36b9edb

Browse files
committed
Introduce translog no-op
As the translog evolves towards a full operations log as part of the sequence numbers push, there is a need for the translog to be able to represent operations for which a sequence number was assigned, but the operation did not mutate the index. Examples of how this can arise are operations that fail after the sequence number is assigned, and gaps in this history that arise when an operation is assigned a sequence number but the operation never completed (e.g., a node crash). It is important that these operations appear in the history so that they can be replicated and replayed during recovery as otherwise the history will be incomplete and local checkpoints will not be able to advance. This commit introduces a no-op to the translog to set the stage for these efforts.
1 parent 5e68b63 commit 36b9edb

File tree

37 files changed

+485
-180
lines changed

37 files changed

+485
-180
lines changed

core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ public void writeVLong(long i) throws IOException {
223223
writeByte((byte) i);
224224
}
225225

226+
public static int lengthVLong(long i) {
227+
assert i >= 0;
228+
return 1 + (int) Math.floor(Math.log(i) / Math.log(2)) / 7;
229+
}
230+
226231
/**
227232
* Writes a long in a variable-length format. Writes between one and ten bytes.
228233
* Values are remapped by sliding the sign bit into the lsb and then encoded as an unsigned number

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.common.Nullable;
4848
import org.elasticsearch.common.bytes.BytesReference;
4949
import org.elasticsearch.common.collect.ImmutableOpenMap;
50-
import org.elasticsearch.common.collect.Tuple;
5150
import org.elasticsearch.common.io.stream.StreamInput;
5251
import org.elasticsearch.common.io.stream.StreamOutput;
5352
import org.elasticsearch.common.io.stream.Writeable;
@@ -296,6 +295,8 @@ public Condition newCondition() {
296295
*/
297296
public abstract DeleteResult delete(final Delete delete);
298297

298+
public abstract NoOpResult noOp(final NoOp noOp);
299+
299300
/**
300301
* Base class for index and delete operation results
301302
* Holds result meta data (e.g. translog location, updated version)
@@ -382,6 +383,7 @@ void freeze() {
382383
}
383384

384385
public static class IndexResult extends Result {
386+
385387
private final boolean created;
386388

387389
public IndexResult(long version, long seqNo, boolean created) {
@@ -397,9 +399,11 @@ public IndexResult(Exception failure, long version, long seqNo) {
397399
public boolean isCreated() {
398400
return created;
399401
}
402+
400403
}
401404

402405
public static class DeleteResult extends Result {
406+
403407
private final boolean found;
404408

405409
public DeleteResult(long version, long seqNo, boolean found) {
@@ -415,6 +419,19 @@ public DeleteResult(Exception failure, long version, long seqNo) {
415419
public boolean isFound() {
416420
return found;
417421
}
422+
423+
}
424+
425+
static class NoOpResult extends Result {
426+
427+
NoOpResult(long seqNo) {
428+
super(Operation.TYPE.NO_OP, 0, seqNo);
429+
}
430+
431+
NoOpResult(long seqNo, Exception failure) {
432+
super(Operation.TYPE.NO_OP, failure, 0, seqNo);
433+
}
434+
418435
}
419436

420437
/**
@@ -910,7 +927,7 @@ public abstract static class Operation {
910927

911928
/** type of operation (index, delete), subclasses use static types */
912929
public enum TYPE {
913-
INDEX, DELETE;
930+
INDEX, DELETE, NO_OP;
914931

915932
private final String lowercase;
916933

@@ -1114,6 +1131,50 @@ TYPE operationType() {
11141131
public int estimatedSizeInBytes() {
11151132
return (uid().field().length() + uid().text().length()) * 2 + 20;
11161133
}
1134+
1135+
}
1136+
1137+
public static class NoOp extends Operation {
1138+
1139+
private final String reason;
1140+
1141+
public String reason() {
1142+
return reason;
1143+
}
1144+
1145+
public NoOp(
1146+
final Term uid,
1147+
final long seqNo,
1148+
final long primaryTerm,
1149+
final long version,
1150+
final VersionType versionType,
1151+
final Origin origin,
1152+
final long startTime,
1153+
final String reason) {
1154+
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
1155+
this.reason = reason;
1156+
}
1157+
1158+
@Override
1159+
public String type() {
1160+
throw new UnsupportedOperationException();
1161+
}
1162+
1163+
@Override
1164+
String id() {
1165+
throw new UnsupportedOperationException();
1166+
}
1167+
1168+
@Override
1169+
TYPE operationType() {
1170+
return TYPE.NO_OP;
1171+
}
1172+
1173+
@Override
1174+
public int estimatedSizeInBytes() {
1175+
return 2 * reason.length() + StreamOutput.lengthVLong(seqNo()) + StreamOutput.lengthVLong(primaryTerm());
1176+
}
1177+
11171178
}
11181179

11191180
public static class Get {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private void recoverFromTranslog(TranslogRecoveryPerformer handler) throws IOExc
277277
}
278278
// flush if we recovered something or if we have references to older translogs
279279
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
280-
assert pendingTranslogRecovery.get(): "translogRecovery is not pending but should be";
280+
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
281281
pendingTranslogRecovery.set(false); // we are good - now we can commit
282282
if (opsRecovered > 0) {
283283
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
@@ -375,7 +375,7 @@ private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
375375
* specified global checkpoint.
376376
*
377377
* @param globalCheckpoint the global checkpoint to use
378-
* @param indexWriter the index writer (for the Lucene commit point)
378+
* @param indexWriter the index writer (for the Lucene commit point)
379379
* @return the sequence number stats
380380
*/
381381
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
@@ -434,7 +434,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
434434
if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
435435
Uid uid = Uid.createUid(get.uid().text());
436436
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
437-
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
437+
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
438438
}
439439
refresh("realtime_get");
440440
}
@@ -532,7 +532,7 @@ public IndexResult index(Index index) {
532532
*
533533
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
534534
* or throws Exception if the failure caused the engine to fail (e.g. out of disk, lucene tragic event)
535-
*
535+
* <p>
536536
* Note: pkg-private for testing
537537
*/
538538
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
@@ -577,7 +577,7 @@ private boolean canOptimizeAddDocument(Index index) {
577577
case PEER_RECOVERY:
578578
case REPLICA:
579579
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
580-
: "version: " + index.version() + " type: " + index.versionType();
580+
: "version: " + index.version() + " type: " + index.versionType();
581581
return true;
582582
case LOCAL_TRANSLOG_RECOVERY:
583583
assert index.isRetry();
@@ -596,10 +596,10 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
596596
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ". seq no: " + seqNo;
597597
} else if (origin == Operation.Origin.PRIMARY) {
598598
// sequence number should not be set when operation origin is primary
599-
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "primary ops should never an assigned seq no. got: " + seqNo;
599+
assert seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no. got: " + seqNo;
600600
} else {
601601
// sequence number should be set when operation origin is not primary
602-
assert seqNo >= 0 : "replica ops should an assigned seq no. origin: " + origin +
602+
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no. origin: " + origin +
603603
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated();
604604
}
605605
return true;
@@ -651,7 +651,7 @@ private IndexResult innerIndex(Index index) throws IOException {
651651
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
652652
break;
653653
}
654-
} while(maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
654+
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
655655
index.getAutoGeneratedIdTimestamp()) == false);
656656
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
657657
} else {
@@ -859,6 +859,34 @@ private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, Ve
859859
return found;
860860
}
861861

862+
@Override
863+
public NoOpResult noOp(final NoOp noOp) {
864+
NoOpResult noOpResult;
865+
try (final ReleasableLock ignored = readLock.acquire()) {
866+
noOpResult = innerNoOp(noOp);
867+
} catch (final Exception e) {
868+
noOpResult = new NoOpResult(noOp.seqNo(), e);
869+
}
870+
return noOpResult;
871+
}
872+
873+
private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
874+
assert noOp.seqNo() > SequenceNumbersService.NO_OPS_PERFORMED;
875+
final long seqNo = noOp.seqNo();
876+
try {
877+
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
878+
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
879+
noOpResult.setTranslogLocation(location);
880+
noOpResult.setTook(System.nanoTime() - noOp.startTime());
881+
noOpResult.freeze();
882+
return noOpResult;
883+
} finally {
884+
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
885+
seqNoService().markSeqNoAsCompleted(seqNo);
886+
}
887+
}
888+
}
889+
862890
@Override
863891
public void refresh(String source) throws EngineException {
864892
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing

core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public DeleteResult delete(Delete delete) {
116116
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
117117
}
118118

119+
@Override
120+
public NoOpResult noOp(NoOp noOp) {
121+
throw new UnsupportedOperationException(shardId + " no-op operation not allowed on shadow engine");
122+
}
123+
119124
@Override
120125
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
121126
throw new UnsupportedOperationException(shardId + " sync commit operation not allowed on shadow engine");

core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.index.VersionType;
2526
import org.elasticsearch.index.engine.Engine;
2627
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
2728
import org.elasticsearch.index.mapper.DocumentMapperForType;
2829
import org.elasticsearch.index.mapper.MapperException;
2930
import org.elasticsearch.index.mapper.MapperService;
3031
import org.elasticsearch.index.mapper.Mapping;
3132
import org.elasticsearch.index.mapper.Uid;
33+
import org.elasticsearch.index.seqno.SequenceNumbersService;
3234
import org.elasticsearch.index.translog.Translog;
3335
import org.elasticsearch.rest.RestStatus;
3436

@@ -94,6 +96,7 @@ public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throw
9496
}
9597
}
9698
}
99+
97100
return opsRecovered;
98101
}
99102

@@ -158,22 +161,28 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio
158161
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
159162
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
160163
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
161-
if (logger.isTraceEnabled()) {
162-
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
163-
}
164+
logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id());
164165
index(engine, engineIndex);
165166
break;
166167
case DELETE:
167168
Translog.Delete delete = (Translog.Delete) operation;
168169
Uid uid = Uid.createUid(delete.uid().text());
169-
if (logger.isTraceEnabled()) {
170-
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id());
171-
}
170+
logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), uid.type(), uid.id());
172171
final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.seqNo(),
173172
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
174173
origin, System.nanoTime());
175174
delete(engine, engineDelete);
176175
break;
176+
case NO_OP:
177+
final Translog.NoOp noOp = (Translog.NoOp) operation;
178+
final long seqNo = noOp.seqNo();
179+
final long primaryTerm = noOp.primaryTerm();
180+
final String reason = noOp.reason();
181+
logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
182+
final Engine.NoOp engineNoOp =
183+
new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason);
184+
noOp(engine, engineNoOp);
185+
break;
177186
default:
178187
throw new IllegalStateException("No operation defined for [" + operation + "]");
179188
}
@@ -206,6 +215,9 @@ protected void delete(Engine engine, Engine.Delete engineDelete) {
206215
engine.delete(engineDelete);
207216
}
208217

218+
protected void noOp(Engine engine, Engine.NoOp engineNoOp) {
219+
engine.noOp(engineNoOp);
220+
}
209221

210222
/**
211223
* Called once for every processed operation by this recovery performer.

0 commit comments

Comments
 (0)