Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService().getLocalCheckpoint();
try (Translog.View view = getTranslog().newView()) {
final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
Expand Down Expand Up @@ -325,9 +324,8 @@ public InternalEngine recoverFromTranslog() throws IOException {
private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
Translog.Snapshot snapshot = translog.newSnapshot(translogGen);
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
Expand Down Expand Up @@ -1555,10 +1556,9 @@ public void onSettingsChanged() {
}
}

public Translog.View acquireTranslogView() {
public Closeable acquireTranslogRetentionLock() {
Engine engine = getEngine();
assert engine.getTranslog() != null : "translog must not be null";
return engine.getTranslog().newView();
return engine.getTranslog().acquireRetentionLock();
}

public List<Segment> segments(boolean verbose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,39 +78,45 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
this.chunkSize = chunkSize;
}

public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) {
final Translog.View view = indexShard.acquireTranslogView();
ActionListener<ResyncTask> wrappedListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(ResyncTask resyncTask) {
try {
view.close();
} catch (IOException e) {
onFailure(e);
public void resync(final IndexShard indexShard, final ActionListener<ResyncTask> listener) {
ActionListener<ResyncTask> resyncListener = null;
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
resyncListener = new ActionListener<ResyncTask>() {
@Override
public void onResponse(final ResyncTask resyncTask) {
try {
snapshot.close();
listener.onResponse(resyncTask);
} catch (final Exception e) {
onFailure(e);
}
}
listener.onResponse(resyncTask);
}

@Override
public void onFailure(Exception e) {
try {
view.close();
} catch (IOException inner) {
e.addSuppressed(inner);
@Override
public void onFailure(final Exception e) {
try {
snapshot.close();
} catch (final Exception inner) {
e.addSuppressed(inner);
} finally {
listener.onFailure(e);
}
}
listener.onFailure(e);
}
};
try {
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
};
ShardId shardId = indexShard.shardId();

// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {

@Override
public synchronized void close() throws IOException {
snapshot.close();
}

@Override
public synchronized int totalOperations() {
return snapshot.totalOperations();
Expand All @@ -127,11 +133,14 @@ public synchronized Translog.Operation next() throws IOException {
return snapshot.next();
}
};

resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
startingSeqNo, wrappedListener);
startingSeqNo, resyncListener);
} catch (Exception e) {
wrappedListener.onFailure(e);
if (resyncListener != null) {
resyncListener.onFailure(e);
} else {
listener.onFailure(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO
return size;
}

public Translog.Snapshot newSnapshot() {
public TranslogSnapshot newSnapshot() {
return new TranslogSnapshot(this, sizeInBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.translog;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;

Expand All @@ -27,16 +28,18 @@
*/
final class MultiSnapshot implements Translog.Snapshot {

private final Translog.Snapshot[] translogs;
private final TranslogSnapshot[] translogs;
private final int totalOperations;
private final Closeable onClose;
private int index;

/**
* Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order.
*/
MultiSnapshot(Translog.Snapshot[] translogs) {
MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) {
this.translogs = translogs;
totalOperations = Arrays.stream(translogs).mapToInt(Translog.Snapshot::totalOperations).sum();
totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum();
this.onClose = onClose;
index = 0;
}

Expand All @@ -49,12 +52,17 @@ public int totalOperations() {
@Override
public Translog.Operation next() throws IOException {
for (; index < translogs.length; index++) {
final Translog.Snapshot current = translogs[index];
final TranslogSnapshot current = translogs[index];
Translog.Operation op = current.next();
if (op != null) { // if we are null we move to the next snapshot
return op;
}
}
return null;
}

@Override
public void close() throws IOException {
onClose.close();
}
}
Loading