Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1623,10 +1623,12 @@ public interface Warmer {
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log.
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog() throws IOException;
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal();
recoverFromTranslogInternal(recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -394,11 +394,12 @@ public void skipTranslogRecovery() {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private void recoverFromTranslogInternal() throws IOException {
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog();
getEngine().recoverFromTranslog(Long.MAX_VALUE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,21 +577,27 @@ public long getLastSyncedGlobalCheckpoint() {
*/
public Snapshot newSnapshot() throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
return newSnapshotFromGen(getMinFileGeneration());
return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE);
}
}

public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change this to take a TranslogGeneration with the uuid instead of just the long minGeneration? It's not using that uuid anywhere here AFAICS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method might be interpreted as a range of translog generations or a range of sequence numbers if the parameter is a tuple of Longs. I changed to TranslogGeneration to avoid this issue. I will revert this change if you don't like it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good though. less likely to misuse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense.

try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (minGeneration < getMinFileGeneration()) {
throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " +
final long fromFileGen = fromGeneration.translogFileGeneration;
if (fromFileGen < getMinFileGeneration()) {
throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " +
"Min referenced generation is [" + getMinFileGeneration() + "]");
}
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= minGeneration)
.filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo)
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
final Snapshot snapshot = newMultiSnapshot(snapshots);
if (upToSeqNo == Long.MAX_VALUE) {
return snapshot;
} else {
return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo);
}
}
}

Expand Down Expand Up @@ -926,7 +932,59 @@ default int overriddenOperations() {
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
*/
Translog.Operation next() throws IOException;
}

/**
* A filtered snapshot consisting of only operations whose sequence numbers are in the given range
* between {@code fromSeqNo} (inclusive) and {@code toSeqNo} (inclusive). This filtered snapshot
* shares the same underlying resources with the {@code delegate} snapshot, therefore we should not
* use the {@code delegate} after passing it to this filtered snapshot.
*/
static final class SeqNoFilterSnapshot implements Snapshot {
private final Snapshot delegate;
private int filteredOpsCount;
private final long fromSeqNo; // inclusive
private final long toSeqNo; // inclusive

SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
this.delegate = delegate;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
}

@Override
public int totalOperations() {
return delegate.totalOperations();
}

@Override
public int skippedOperations() {
return filteredOpsCount + delegate.skippedOperations();
}

@Override
public int overriddenOperations() {
return delegate.overriddenOperations();
}

@Override
public Operation next() throws IOException {
Translog.Operation op;
while ((op = delegate.next()) != null) {
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
return op;
} else {
filteredOpsCount++;
}
}
return null;
}

@Override
public void close() throws IOException {
delegate.close();
}
}

/**
Expand Down
Loading