Skip to content

Commit 9d10ffd

Browse files
authored
Goodbye, Translog Views (#25962)
During peer recoveries, we need to copy over lucene files and replay the operations they miss from the source translog. Guaranteeing that translog files are not cleaned up has seen many iterations overtime. Back in the old 1.0 days, recoveries went through the Engine and actively prevented both translog cleaning and lucene commits. We then moved to a notion called Translog Views, which allowed the recovery code to "acquire" a view into the translog which is then guaranteed to be kept around until the view is closed. The Engine code was free to commit lucene and do what it ever it wanted without coordinating with recoveries. Translog file deletion logic was based on reference counting on the file level. Those counters were incremented when a view was acquired but also when the view was used to create a `Snapshot` that allowed you to read operations from the files. At some point we removed the file based counting complexity in favor of constructs on the Translog level that just keep track of "open" views and the minimum translog generation they refer to. To do so, Views had to be kept around until the last snapshot that was made from them was consumed. This was fine in recovery code but lead to [a subtle bug](#25862) in the [Primary Replica Resyncer](#25862). Concurrently, we have developed the notion of a `TranslogDeletionPolicy` which is responsible for the liveness aspect of translog files. This class makes it very simple to take translog Snapshot into account for keep translog files around, allowing people that just need a snapshot to just take a snapshot and not worry about views and such. Recovery code which actually does need a view can now prevent trimming by acquiring a simple retention lock (a `Closable`). This removes the need for the notion of a View.
1 parent 0848ffd commit 9d10ffd

File tree

18 files changed

+558
-438
lines changed

18 files changed

+558
-438
lines changed

core/src/main/java/org/elasticsearch/action/ActionListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27-
import java.util.concurrent.atomic.AtomicBoolean;
2827
import java.util.function.Consumer;
2928

3029
/**

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
238238
try (ReleasableLock ignored = writeLock.acquire()) {
239239
ensureOpen();
240240
final long localCheckpoint = seqNoService().getLocalCheckpoint();
241-
try (Translog.View view = getTranslog().newView()) {
242-
final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1);
241+
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
243242
Translog.Operation operation;
244243
while ((operation = snapshot.next()) != null) {
245244
if (operation.seqNo() > localCheckpoint) {
@@ -325,9 +324,8 @@ public InternalEngine recoverFromTranslog() throws IOException {
325324
private void recoverFromTranslogInternal() throws IOException {
326325
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
327326
final int opsRecovered;
328-
try {
329-
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
330-
Translog.Snapshot snapshot = translog.newSnapshot(translogGen);
327+
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
328+
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
331329
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
332330
} catch (Exception e) {
333331
throw new EngineException(shardId, "failed to recover from translog", e);

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
import org.elasticsearch.search.suggest.completion.CompletionStats;
131131
import org.elasticsearch.threadpool.ThreadPool;
132132

133+
import java.io.Closeable;
133134
import java.io.IOException;
134135
import java.io.PrintStream;
135136
import java.nio.channels.ClosedByInterruptException;
@@ -1555,10 +1556,9 @@ public void onSettingsChanged() {
15551556
}
15561557
}
15571558

1558-
public Translog.View acquireTranslogView() {
1559+
public Closeable acquireTranslogRetentionLock() {
15591560
Engine engine = getEngine();
1560-
assert engine.getTranslog() != null : "translog must not be null";
1561-
return engine.getTranslog().newView();
1561+
return engine.getTranslog().acquireRetentionLock();
15621562
}
15631563

15641564
public List<Segment> segments(boolean verbose) {

core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -78,39 +78,45 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
7878
this.chunkSize = chunkSize;
7979
}
8080

81-
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) {
82-
final Translog.View view = indexShard.acquireTranslogView();
83-
ActionListener<ResyncTask> wrappedListener = new ActionListener<ResyncTask>() {
84-
@Override
85-
public void onResponse(ResyncTask resyncTask) {
86-
try {
87-
view.close();
88-
} catch (IOException e) {
89-
onFailure(e);
81+
public void resync(final IndexShard indexShard, final ActionListener<ResyncTask> listener) {
82+
ActionListener<ResyncTask> resyncListener = null;
83+
try {
84+
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
85+
Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
86+
resyncListener = new ActionListener<ResyncTask>() {
87+
@Override
88+
public void onResponse(final ResyncTask resyncTask) {
89+
try {
90+
snapshot.close();
91+
listener.onResponse(resyncTask);
92+
} catch (final Exception e) {
93+
onFailure(e);
94+
}
9095
}
91-
listener.onResponse(resyncTask);
92-
}
9396

94-
@Override
95-
public void onFailure(Exception e) {
96-
try {
97-
view.close();
98-
} catch (IOException inner) {
99-
e.addSuppressed(inner);
97+
@Override
98+
public void onFailure(final Exception e) {
99+
try {
100+
snapshot.close();
101+
} catch (final Exception inner) {
102+
e.addSuppressed(inner);
103+
} finally {
104+
listener.onFailure(e);
105+
}
100106
}
101-
listener.onFailure(e);
102-
}
103-
};
104-
try {
105-
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
106-
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
107+
};
107108
ShardId shardId = indexShard.shardId();
108109

109110
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
110111
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
111112
// Also fail the resync early if the shard is shutting down
112113
Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
113114

115+
@Override
116+
public synchronized void close() throws IOException {
117+
snapshot.close();
118+
}
119+
114120
@Override
115121
public synchronized int totalOperations() {
116122
return snapshot.totalOperations();
@@ -127,11 +133,14 @@ public synchronized Translog.Operation next() throws IOException {
127133
return snapshot.next();
128134
}
129135
};
130-
131136
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
132-
startingSeqNo, wrappedListener);
137+
startingSeqNo, resyncListener);
133138
} catch (Exception e) {
134-
wrappedListener.onFailure(e);
139+
if (resyncListener != null) {
140+
resyncListener.onFailure(e);
141+
} else {
142+
listener.onFailure(e);
143+
}
135144
}
136145
}
137146

core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO
7777
return size;
7878
}
7979

80-
public Translog.Snapshot newSnapshot() {
80+
public TranslogSnapshot newSnapshot() {
8181
return new TranslogSnapshot(this, sizeInBytes());
8282
}
8383

core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.index.translog;
2121

22+
import java.io.Closeable;
2223
import java.io.IOException;
2324
import java.util.Arrays;
2425

@@ -27,16 +28,18 @@
2728
*/
2829
final class MultiSnapshot implements Translog.Snapshot {
2930

30-
private final Translog.Snapshot[] translogs;
31+
private final TranslogSnapshot[] translogs;
3132
private final int totalOperations;
33+
private final Closeable onClose;
3234
private int index;
3335

3436
/**
3537
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
3638
*/
37-
MultiSnapshot(Translog.Snapshot[] translogs) {
39+
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
3840
this.translogs = translogs;
39-
totalOperations = Arrays.stream(translogs).mapToInt(Translog.Snapshot::totalOperations).sum();
41+
totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
42+
this.onClose = onClose;
4043
index = 0;
4144
}
4245

@@ -49,12 +52,17 @@ public int totalOperations() {
4952
@Override
5053
public Translog.Operation next() throws IOException {
5154
for (; index < translogs.length; index++) {
52-
final Translog.Snapshot current = translogs[index];
55+
final TranslogSnapshot current = translogs[index];
5356
Translog.Operation op = current.next();
5457
if (op != null) { // if we are null we move to the next snapshot
5558
return op;
5659
}
5760
}
5861
return null;
5962
}
63+
64+
@Override
65+
public void close() throws IOException {
66+
onClose.close();
67+
}
6068
}

0 commit comments

Comments
 (0)