Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,52 @@ public static SeqNoStats loadSeqNoStatsFromLuceneCommit(
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
}

/**
* Compute the minimum of the given current minimum sequence number and the specified sequence number, accounting for the fact that the
* current minimum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current minimum sequence number is not
* {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number
* must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param minSeqNo the current minimum sequence number
* @param seqNo the specified sequence number
* @return the new minimum sequence number
*/
public static long min(final long minSeqNo, final long seqNo) {
if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
return seqNo;
} else if (minSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
return seqNo;
} else {
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence number must be assigned");
}
return Math.min(minSeqNo, seqNo);
}
}

/**
* Compute the maximum of the given current maximum sequence number and the specified sequence number, accounting for the fact that the
* current maximum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current maximum sequence number is not
* {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number
* must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
*
* @param maxSeqNo the current maximum sequence number
* @param seqNo the specified sequence number
* @return the new maximum sequence number
*/
public static long max(final long maxSeqNo, final long seqNo) {
if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) {
return seqNo;
} else if (maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
return seqNo;
} else {
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence number must be assigned");
}
return Math.max(maxSeqNo, seqNo);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.translog;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;

import java.io.IOException;
Expand Down Expand Up @@ -52,7 +51,9 @@ public long getGeneration() {

public abstract long sizeInBytes();

public abstract int totalOperations();
public abstract int totalOperations();

abstract Checkpoint getCheckpoint();

public final long getFirstOperationOffset() {
return firstOperationOffset;
Expand All @@ -76,7 +77,7 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO
}

public Translog.Snapshot newSnapshot() {
return new TranslogSnapshot(generation, channel, path, firstOperationOffset, sizeInBytes(), totalOperations());
return new TranslogSnapshot(this, sizeInBytes());
}

/**
Expand Down
101 changes: 58 additions & 43 deletions core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import org.apache.lucene.codecs.CodecUtil;
Expand All @@ -35,11 +36,13 @@
import java.nio.file.OpenOption;
import java.nio.file.Path;

class Checkpoint {
final class Checkpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++


final long offset;
final int numOps;
final long generation;
final long minSeqNo;
final long maxSeqNo;
final long globalCheckpoint;

private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
Expand All @@ -52,6 +55,8 @@ class Checkpoint {
+ Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES // generation
+ Long.BYTES // minimum sequence number, introduced in 6.0.0
+ Long.BYTES // maximum sequence number, introduced in 6.0.0
+ Long.BYTES // global checkpoint, introduced in 6.0.0
+ CodecUtil.footerLength();

Expand All @@ -62,66 +67,78 @@ class Checkpoint {
+ Long.BYTES // generation
+ CodecUtil.footerLength();

static final int LEGACY_NON_CHECKSUMMED_FILE_LENGTH = Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES; // generation

Checkpoint(long offset, int numOps, long generation, long globalCheckpoint) {
/**
* Create a new translog checkpoint.
*
* @param offset the current offset in the translog
* @param numOps the current number of operations in the translog
* @param generation the current translog generation
* @param minSeqNo the current minimum sequence number of all operations in the translog
* @param maxSeqNo the current maximum sequence number of all operations in the translog
* @param globalCheckpoint the last-known global checkpoint
*/
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) {
assert minSeqNo <= maxSeqNo;
this.offset = offset;
this.numOps = numOps;
this.generation = generation;
this.minSeqNo = minSeqNo;
this.maxSeqNo = maxSeqNo;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth checking that minSeqNo <= maxSeqNo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe something like minSeqNo <= maxSeqNo || minSeqNo == Long.MAX_VALUE && maxSeqNo == Long.MIN_VALUE if we want to keep the merging easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for assertions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed e9370c84925e00d11d74e6785004c9de7f9feefc.

this.globalCheckpoint = globalCheckpoint;
}

private void write(DataOutput out) throws IOException {
out.writeLong(offset);
out.writeInt(numOps);
out.writeLong(generation);
out.writeLong(minSeqNo);
out.writeLong(maxSeqNo);
out.writeLong(globalCheckpoint);
}

static Checkpoint readChecksummedV2(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong());
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint);
}

// reads a checksummed checkpoint introduced in ES 5.0.0
static Checkpoint readChecksummedV1(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
}

// reads checkpoint from ES < 5.0.0
static Checkpoint readNonChecksummed(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
// reads a checksummed checkpoint introduced in ES 5.0.0
static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint);
}

@Override
public String toString() {
return "Checkpoint{" +
"offset=" + offset +
", numOps=" + numOps +
", translogFileGeneration=" + generation +
", generation=" + generation +
", minSeqNo=" + minSeqNo +
", maxSeqNo=" + maxSeqNo +
", globalCheckpoint=" + globalCheckpoint +
'}';
}

public static Checkpoint read(Path path) throws IOException {
try (Directory dir = new SimpleFSDirectory(path.getParent())) {
try (IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) {
if (indexInput.length() == LEGACY_NON_CHECKSUMMED_FILE_LENGTH) {
// OLD unchecksummed file that was written < ES 5.0.0
return Checkpoint.readNonChecksummed(indexInput);
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
CodecUtil.checksumEntireFile(indexInput);
final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION);
if (fileVersion == INITIAL_VERSION) {
assert indexInput.length() == V1_FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV5_0_0(indexInput);
} else {
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
CodecUtil.checksumEntireFile(indexInput);
final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION);
if (fileVersion == INITIAL_VERSION) {
assert indexInput.length() == V1_FILE_SIZE;
return Checkpoint.readChecksummedV1(indexInput);
} else {
assert fileVersion == CURRENT_VERSION;
assert indexInput.length() == FILE_SIZE;
return Checkpoint.readChecksummedV2(indexInput);
}
assert fileVersion == CURRENT_VERSION : fileVersion;
assert indexInput.length() == FILE_SIZE : indexInput.length();
return Checkpoint.readCheckpointV6_0_0(indexInput);
}
}
}
Expand Down Expand Up @@ -159,30 +176,28 @@ public synchronized byte[] toByteArray() {

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Checkpoint that = (Checkpoint) o;

if (offset != that.offset) {
return false;
}
if (numOps != that.numOps) {
return false;
}
return generation == that.generation;

if (offset != that.offset) return false;
if (numOps != that.numOps) return false;
if (generation != that.generation) return false;
if (minSeqNo != that.minSeqNo) return false;
if (maxSeqNo != that.maxSeqNo) return false;
return globalCheckpoint == that.globalCheckpoint;
}

@Override
public int hashCode() {
int result = Long.hashCode(offset);
result = 31 * result + numOps;
result = 31 * result + Long.hashCode(generation);
result = 31 * result + Long.hashCode(minSeqNo);
result = 31 * result + Long.hashCode(maxSeqNo);
result = 31 * result + Long.hashCode(globalCheckpoint);
return result;
}

}
28 changes: 14 additions & 14 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public Translog(
logger.debug("wipe translog location - creating new translog");
Files.createDirectories(location);
final long generation = 1;
Checkpoint checkpoint = new Checkpoint(0, 0, generation, globalCheckpointSupplier.getAsLong());
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong());
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false);
Expand Down Expand Up @@ -400,13 +400,13 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
}

/**
* Adds a delete / index operations to the transaction log.
* Adds an operation to the transaction log.
*
* @see org.elasticsearch.index.translog.Translog.Operation
* @see Index
* @see org.elasticsearch.index.translog.Translog.Delete
* @param operation the operation to add
* @return the location of the operation in the translog
* @throws IOException if adding the operation to the translog resulted in an I/O exception
*/
public Location add(Operation operation) throws IOException {
public Location add(final Operation operation) throws IOException {
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
try {
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
Expand All @@ -419,22 +419,21 @@ public Location add(Operation operation) throws IOException {
out.writeInt(operationSize);
out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes);
return location;
return current.add(bytes, operation.seqNo());
}
} catch (AlreadyClosedException | IOException ex) {
} catch (final AlreadyClosedException | IOException ex) {
try {
closeOnTragicEvent(ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (Exception e) {
} catch (final Exception e) {
try {
closeOnTragicEvent(e);
} catch (Exception inner) {
} catch (final Exception inner) {
e.addSuppressed(inner);
}
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
Expand Down Expand Up @@ -1222,14 +1221,15 @@ public int hashCode() {


public enum Durability {

/**
* Async durability - translogs are synced based on a time interval.
*/
ASYNC,
/**
* Request durability - translogs are synced for each high level request (bulk, index, delete)
*/
REQUEST;
REQUEST

}

Expand Down
Loading