diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index c3950e1012acc..885fdfc9e6553 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -57,4 +57,52 @@ public static SeqNoStats loadSeqNoStatsFromLuceneCommit( return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); } + /** + * Compute the minimum of the given current minimum sequence number and the specified sequence number, accounting for the fact that the + * current minimum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or + * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current minimum sequence number is not + * {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number + * must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. + * + * @param minSeqNo the current minimum sequence number + * @param seqNo the specified sequence number + * @return the new minimum sequence number + */ + public static long min(final long minSeqNo, final long seqNo) { + if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) { + return seqNo; + } else if (minSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + return seqNo; + } else { + if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence number must be assigned"); + } + return Math.min(minSeqNo, seqNo); + } + } + + /** + * Compute the maximum of the given current maximum sequence number and the specified sequence number, accounting for the fact that the + * current maximum sequence number could be {@link SequenceNumbersService#NO_OPS_PERFORMED} or + * {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. When the current maximum sequence number is not + * {@link SequenceNumbersService#NO_OPS_PERFORMED} nor {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}, the specified sequence number + * must not be {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}. + * + * @param maxSeqNo the current maximum sequence number + * @param seqNo the specified sequence number + * @return the new maximum sequence number + */ + public static long max(final long maxSeqNo, final long seqNo) { + if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) { + return seqNo; + } else if (maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + return seqNo; + } else { + if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence number must be assigned"); + } + return Math.max(maxSeqNo, seqNo); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index bf61febb741eb..6f392c195fd19 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.translog; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import java.io.IOException; @@ -52,7 +51,9 @@ public long getGeneration() { public abstract long sizeInBytes(); - public abstract int totalOperations(); + public abstract int totalOperations(); + + abstract Checkpoint getCheckpoint(); public final long getFirstOperationOffset() { return firstOperationOffset; @@ -76,7 +77,7 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO } public Translog.Snapshot newSnapshot() { - return new TranslogSnapshot(generation, channel, path, firstOperationOffset, sizeInBytes(), totalOperations()); + return new TranslogSnapshot(this, sizeInBytes()); } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index 6f5d7fa841449..ce5cc8e76010b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.index.translog; import org.apache.lucene.codecs.CodecUtil; @@ -35,11 +36,13 @@ import java.nio.file.OpenOption; import java.nio.file.Path; -class Checkpoint { +final class Checkpoint { final long offset; final int numOps; final long generation; + final long minSeqNo; + final long maxSeqNo; final long globalCheckpoint; private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before @@ -52,6 +55,8 @@ class Checkpoint { + Integer.BYTES // ops + Long.BYTES // offset + Long.BYTES // generation + + Long.BYTES // minimum sequence number, introduced in 6.0.0 + + Long.BYTES // maximum sequence number, introduced in 6.0.0 + Long.BYTES // global checkpoint, introduced in 6.0.0 + CodecUtil.footerLength(); @@ -62,14 +67,23 @@ class Checkpoint { + Long.BYTES // generation + CodecUtil.footerLength(); - static final int LEGACY_NON_CHECKSUMMED_FILE_LENGTH = Integer.BYTES // ops - + Long.BYTES // offset - + Long.BYTES; // generation - - Checkpoint(long offset, int numOps, long generation, long globalCheckpoint) { + /** + * Create a new translog checkpoint. + * + * @param offset the current offset in the translog + * @param numOps the current number of operations in the translog + * @param generation the current translog generation + * @param minSeqNo the current minimum sequence number of all operations in the translog + * @param maxSeqNo the current maximum sequence number of all operations in the translog + * @param globalCheckpoint the last-known global checkpoint + */ + Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) { + assert minSeqNo <= maxSeqNo; this.offset = offset; this.numOps = numOps; this.generation = generation; + this.minSeqNo = minSeqNo; + this.maxSeqNo = maxSeqNo; this.globalCheckpoint = globalCheckpoint; } @@ -77,21 +91,27 @@ private void write(DataOutput out) throws IOException { out.writeLong(offset); out.writeInt(numOps); out.writeLong(generation); + out.writeLong(minSeqNo); + out.writeLong(maxSeqNo); out.writeLong(globalCheckpoint); } - static Checkpoint readChecksummedV2(DataInput in) throws IOException { - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong()); + static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) { + final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint); } - // reads a checksummed checkpoint introduced in ES 5.0.0 - static Checkpoint readChecksummedV1(DataInput in) throws IOException { - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO); + static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException { + return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); } - // reads checkpoint from ES < 5.0.0 - static Checkpoint readNonChecksummed(DataInput in) throws IOException { - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO); + // reads a checksummed checkpoint introduced in ES 5.0.0 + static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException { + final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint); } @Override @@ -99,7 +119,9 @@ public String toString() { return "Checkpoint{" + "offset=" + offset + ", numOps=" + numOps + - ", translogFileGeneration=" + generation + + ", generation=" + generation + + ", minSeqNo=" + minSeqNo + + ", maxSeqNo=" + maxSeqNo + ", globalCheckpoint=" + globalCheckpoint + '}'; } @@ -107,21 +129,16 @@ public String toString() { public static Checkpoint read(Path path) throws IOException { try (Directory dir = new SimpleFSDirectory(path.getParent())) { try (IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) { - if (indexInput.length() == LEGACY_NON_CHECKSUMMED_FILE_LENGTH) { - // OLD unchecksummed file that was written < ES 5.0.0 - return Checkpoint.readNonChecksummed(indexInput); + // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here. + CodecUtil.checksumEntireFile(indexInput); + final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION); + if (fileVersion == INITIAL_VERSION) { + assert indexInput.length() == V1_FILE_SIZE : indexInput.length(); + return Checkpoint.readCheckpointV5_0_0(indexInput); } else { - // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here. - CodecUtil.checksumEntireFile(indexInput); - final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION); - if (fileVersion == INITIAL_VERSION) { - assert indexInput.length() == V1_FILE_SIZE; - return Checkpoint.readChecksummedV1(indexInput); - } else { - assert fileVersion == CURRENT_VERSION; - assert indexInput.length() == FILE_SIZE; - return Checkpoint.readChecksummedV2(indexInput); - } + assert fileVersion == CURRENT_VERSION : fileVersion; + assert indexInput.length() == FILE_SIZE : indexInput.length(); + return Checkpoint.readCheckpointV6_0_0(indexInput); } } } @@ -159,23 +176,17 @@ public synchronized byte[] toByteArray() { @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; Checkpoint that = (Checkpoint) o; - if (offset != that.offset) { - return false; - } - if (numOps != that.numOps) { - return false; - } - return generation == that.generation; - + if (offset != that.offset) return false; + if (numOps != that.numOps) return false; + if (generation != that.generation) return false; + if (minSeqNo != that.minSeqNo) return false; + if (maxSeqNo != that.maxSeqNo) return false; + return globalCheckpoint == that.globalCheckpoint; } @Override @@ -183,6 +194,10 @@ public int hashCode() { int result = Long.hashCode(offset); result = 31 * result + numOps; result = 31 * result + Long.hashCode(generation); + result = 31 * result + Long.hashCode(minSeqNo); + result = 31 * result + Long.hashCode(maxSeqNo); + result = 31 * result + Long.hashCode(globalCheckpoint); return result; } + } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d78c27ee653fe..a818e2c4634a1 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -198,7 +198,7 @@ public Translog( logger.debug("wipe translog location - creating new translog"); Files.createDirectories(location); final long generation = 1; - Checkpoint checkpoint = new Checkpoint(0, 0, generation, globalCheckpointSupplier.getAsLong()); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong()); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); @@ -400,13 +400,13 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { } /** - * Adds a delete / index operations to the transaction log. + * Adds an operation to the transaction log. * - * @see org.elasticsearch.index.translog.Translog.Operation - * @see Index - * @see org.elasticsearch.index.translog.Translog.Delete + * @param operation the operation to add + * @return the location of the operation in the translog + * @throws IOException if adding the operation to the translog resulted in an I/O exception */ - public Location add(Operation operation) throws IOException { + public Location add(final Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); @@ -419,22 +419,21 @@ public Location add(Operation operation) throws IOException { out.writeInt(operationSize); out.seek(end); final ReleasablePagedBytesReference bytes = out.bytes(); - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - Location location = current.add(bytes); - return location; + return current.add(bytes, operation.seqNo()); } - } catch (AlreadyClosedException | IOException ex) { + } catch (final AlreadyClosedException | IOException ex) { try { closeOnTragicEvent(ex); - } catch (Exception inner) { + } catch (final Exception inner) { ex.addSuppressed(inner); } throw ex; - } catch (Exception e) { + } catch (final Exception e) { try { closeOnTragicEvent(e); - } catch (Exception inner) { + } catch (final Exception inner) { e.addSuppressed(inner); } throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); @@ -1222,6 +1221,7 @@ public int hashCode() { public enum Durability { + /** * Async durability - translogs are synced based on a time interval. */ @@ -1229,7 +1229,7 @@ public enum Durability { /** * Request durability - translogs are synced for each high level request (bulk, index, delete) */ - REQUEST; + REQUEST } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index d696b815b9764..9057207501cdf 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -41,28 +41,42 @@ * an immutable translog filereader */ public class TranslogReader extends BaseTranslogReader implements Closeable { + private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f; private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00; - - private final int totalOperations; protected final long length; + private final int totalOperations; + private final Checkpoint checkpoint; protected final AtomicBoolean closed = new AtomicBoolean(false); /** - * Create a reader of translog file channel. The length parameter should be consistent with totalOperations and point - * at the end of the last operation in this snapshot. + * Create a translog writer against the specified translog file channel. + * + * @param checkpoint the translog checkpoint + * @param channel the translog file channel to open a translog reader against + * @param path the path to the translog + * @param firstOperationOffset the offset to the first operation */ - public TranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) { - super(generation, channel, path, firstOperationOffset); - this.length = length; - this.totalOperations = totalOperations; + TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset) { + super(checkpoint.generation, channel, path, firstOperationOffset); + this.length = checkpoint.offset; + this.totalOperations = checkpoint.numOps; + this.checkpoint = checkpoint; } /** - * Given a file, opens an {@link TranslogReader}, taking of checking and validating the file header. + * Given a file channel, opens a {@link TranslogReader}, taking care of checking and validating the file header. + * + * @param channel the translog file channel + * @param path the path to the translog + * @param checkpoint the translog checkpoint + * @param translogUUID the tranlog UUID + * @return a new TranslogReader + * @throws IOException if any of the file operations resulted in an I/O exception */ - public static TranslogReader open(FileChannel channel, Path path, Checkpoint checkpoint, String translogUUID) throws IOException { + public static TranslogReader open( + final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException { try { InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel)); // don't close @@ -116,7 +130,10 @@ public static TranslogReader open(FileChannel channel, Path path, Checkpoint che throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref + " this translog file belongs to a different translog. path:" + path); } - return new TranslogReader(checkpoint.generation, channel, path, ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES, checkpoint.offset, checkpoint.numOps); + final long firstOperationOffset = + ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; + return new TranslogReader(checkpoint, channel, path, firstOperationOffset); + default: throw new TranslogCorruptedException("No known translog stream version: " + version + " path:" + path); } @@ -138,6 +155,11 @@ public int totalOperations() { return totalOperations; } + @Override + final Checkpoint getCheckpoint() { + return checkpoint; + } + /** * reads an operation at the given position into the given buffer. */ diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index ffbe1002eb146..908cf511db03b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -23,12 +23,11 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Path; final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot { private final int totalOperations; + private final Checkpoint checkpoint; protected final long length; private final ByteBuffer reusableBuffer; @@ -37,13 +36,13 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap private BufferedChecksumStreamInput reuse; /** - * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point - * at the end of the last operation in this snapshot. + * Create a snapshot of translog file channel. */ - TranslogSnapshot(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) { - super(generation, channel, path, firstOperationOffset); + TranslogSnapshot(final BaseTranslogReader reader, final long length) { + super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset); this.length = length; - this.totalOperations = totalOperations; + this.totalOperations = reader.totalOperations(); + this.checkpoint = reader.getCheckpoint(); this.reusableBuffer = ByteBuffer.allocate(1024); readOperations = 0; position = firstOperationOffset; @@ -55,6 +54,11 @@ public int totalOperations() { return totalOperations; } + @Override + Checkpoint getCheckpoint() { + return checkpoint; + } + @Override public Translog.Operation next() throws IOException { if (readOperations < totalOperations) { diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 785566b976c23..b4400f60b812c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import java.io.BufferedOutputStream; @@ -60,13 +62,16 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { /* the total offset of this file including the bytes written to the file as well as into the buffer */ private volatile long totalOffset; + private volatile long minSeqNo; + private volatile long maxSeqNo; + private final LongSupplier globalCheckpointSupplier; protected final AtomicBoolean closed = new AtomicBoolean(false); // lock order synchronized(syncLock) -> synchronized(this) private final Object syncLock = new Object(); - public TranslogWriter( + private TranslogWriter( final ChannelFactory channelFactory, final ShardId shardId, final Checkpoint initialCheckpoint, @@ -80,6 +85,10 @@ public TranslogWriter( this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); this.lastSyncedCheckpoint = initialCheckpoint; this.totalOffset = initialCheckpoint.offset; + assert initialCheckpoint.minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.minSeqNo; + this.minSeqNo = initialCheckpoint.minSeqNo; + assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo; + this.maxSeqNo = initialCheckpoint.maxSeqNo; this.globalCheckpointSupplier = globalCheckpointSupplier; } @@ -115,10 +124,9 @@ public static TranslogWriter create( writeHeader(out, ref); channel.force(true); final Checkpoint checkpoint = - writeCheckpoint(channelFactory, headerLength, 0, globalCheckpointSupplier.getAsLong(), file.getParent(), fileGeneration); - final TranslogWriter writer = - new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier); - return writer; + Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong()); + writeCheckpoint(channelFactory, file.getParent(), checkpoint); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -151,21 +159,42 @@ private synchronized void closeWithTragicEvent(Exception exception) throws IOExc /** * add the given bytes to the translog and return the location they were written at */ - public synchronized Translog.Location add(BytesReference data) throws IOException { + + /** + * Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to. + * + * @param data the bytes to write + * @param seqNo the sequence number associated with the operation + * @return the location the bytes were written to + * @throws IOException if writing to the translog resulted in an I/O exception + */ + public synchronized Translog.Location add(final BytesReference data, final long seqNo) throws IOException { ensureOpen(); final long offset = totalOffset; try { data.writeTo(outputStream); - } catch (Exception ex) { + } catch (final Exception ex) { try { closeWithTragicEvent(ex); - } catch (Exception inner) { + } catch (final Exception inner) { ex.addSuppressed(inner); } throw ex; } totalOffset += data.length(); + + if (minSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) { + assert operationCounter == 0; + } + if (maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED) { + assert operationCounter == 0; + } + + minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); + operationCounter++; + return new Translog.Location(generation, offset, data.length()); } @@ -191,13 +220,20 @@ public int totalOperations() { return operationCounter; } + @Override + Checkpoint getCheckpoint() { + return getLastSyncedCheckpoint(); + } + @Override public long sizeInBytes() { return totalOffset; } /** - * closes this writer and transfers it's underlying file channel to a new immutable reader + * Closes this writer and transfers its underlying file channel to a new immutable {@link TranslogReader} + * @return a new {@link TranslogReader} + * @throws IOException if any of the file operations resulted in an I/O exception */ public TranslogReader closeIntoReader() throws IOException { // make sure to acquire the sync lock first, to prevent dead locks with threads calling @@ -218,18 +254,7 @@ public TranslogReader closeIntoReader() throws IOException { throw e; } if (closed.compareAndSet(false, true)) { - boolean success = false; - try { - final TranslogReader reader = - new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter); - success = true; - return reader; - } finally { - if (success == false) { - // close the channel, as we are closed and failed to create a new reader - IOUtils.closeWhileHandlingException(channel); - } - } + return new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset()); } else { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); } @@ -272,14 +297,18 @@ public boolean syncUpTo(long offset) throws IOException { // the lock we should check again since if this code is busy we might have fsynced enough already final long offsetToSync; final int opsCounter; - final long globalCheckpoint; + final long currentMinSeqNo; + final long currentMaxSeqNo; + final long currentGlobalCheckpoint; synchronized (this) { ensureOpen(); try { outputStream.flush(); offsetToSync = totalOffset; opsCounter = operationCounter; - globalCheckpoint = globalCheckpointSupplier.getAsLong(); + currentMinSeqNo = minSeqNo; + currentMaxSeqNo = maxSeqNo; + currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong(); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -295,7 +324,7 @@ public boolean syncUpTo(long offset) throws IOException { try { channel.force(false); checkpoint = - writeCheckpoint(channelFactory, offsetToSync, opsCounter, globalCheckpoint, path.getParent(), generation); + writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, currentGlobalCheckpoint, path.getParent(), generation); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -333,24 +362,32 @@ protected void readBytes(ByteBuffer targetBuffer, long position) throws IOExcept } private static Checkpoint writeCheckpoint( - ChannelFactory channelFactory, - long syncPosition, - int numOperations, - long globalCheckpoint, - Path translogFile, - long generation) throws IOException { - final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME); - final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, globalCheckpoint); - Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE); + ChannelFactory channelFactory, + long syncPosition, + int numOperations, + long minSeqNo, + long maxSeqNo, + long globalCheckpoint, + Path translogFile, + long generation) throws IOException { + final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint); + writeCheckpoint(channelFactory, translogFile, checkpoint); return checkpoint; } + private static void writeCheckpoint( + final ChannelFactory channelFactory, + final Path translogFile, + final Checkpoint checkpoint) throws IOException { + Checkpoint.write(channelFactory, translogFile.resolve(Translog.CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE); + } + /** * The last synced checkpoint for this translog. * * @return the last synced checkpoint */ - public Checkpoint getLastSyncedCheckpoint() { + Checkpoint getLastSyncedCheckpoint() { return lastSyncedCheckpoint; } @@ -402,4 +439,5 @@ public void close() throws IOException { throw new IllegalStateException("never close this stream"); } } + } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index 40a75c16370b7..ea1f4c13dfd6a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -168,7 +168,8 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th /** Write a checkpoint file to the given location with the given generation */ public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { - Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO); + Checkpoint emptyCheckpoint = + Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); // fsync with metadata here to make sure. diff --git a/core/src/test/java/org/elasticsearch/index/seqno/SequenceNumbersTests.java b/core/src/test/java/org/elasticsearch/index/seqno/SequenceNumbersTests.java new file mode 100644 index 0000000000000..23eac18377017 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/seqno/SequenceNumbersTests.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; + +public class SequenceNumbersTests extends ESTestCase { + + public void testMin() { + final long seqNo = randomNonNegativeLong(); + assertThat(SequenceNumbers.min(SequenceNumbersService.NO_OPS_PERFORMED, seqNo), equalTo(seqNo)); + assertThat( + SequenceNumbers.min(SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.UNASSIGNED_SEQ_NO), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(SequenceNumbers.min(SequenceNumbersService.UNASSIGNED_SEQ_NO, seqNo), equalTo(seqNo)); + final long minSeqNo = randomNonNegativeLong(); + assertThat(SequenceNumbers.min(minSeqNo, seqNo), equalTo(Math.min(minSeqNo, seqNo))); + + final IllegalArgumentException e = + expectThrows(IllegalArgumentException.class, () -> SequenceNumbers.min(minSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(e, hasToString(containsString("sequence number must be assigned"))); + } + + public void testMax() { + final long seqNo = randomNonNegativeLong(); + assertThat(SequenceNumbers.max(SequenceNumbersService.NO_OPS_PERFORMED, seqNo), equalTo(seqNo)); + assertThat( + SequenceNumbers.max(SequenceNumbersService.NO_OPS_PERFORMED, SequenceNumbersService.UNASSIGNED_SEQ_NO), + equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(SequenceNumbers.max(SequenceNumbersService.UNASSIGNED_SEQ_NO, seqNo), equalTo(seqNo)); + final long maxSeqNo = randomNonNegativeLong(); + assertThat(SequenceNumbers.min(maxSeqNo, seqNo), equalTo(Math.min(maxSeqNo, seqNo))); + + final IllegalArgumentException e = + expectThrows(IllegalArgumentException.class, () -> SequenceNumbers.min(maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(e, hasToString(containsString("sequence number must be assigned"))); + } + +} diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 33f6f8b8a25ba..e47a5652b2431 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -964,13 +964,23 @@ public void testBasicCheckpoint() throws IOException { public void testTranslogWriter() throws IOException { final TranslogWriter writer = translog.createWriter(0); - final int numOps = randomIntBetween(10, 100); + final int numOps = randomIntBetween(8, 128); byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + final Set seenSeqNos = new HashSet<>(); + boolean opsHaveValidSequenceNumbers = randomBoolean(); for (int i = 0; i < numOps; i++) { out.reset(bytes); out.writeInt(i); - writer.add(new BytesArray(bytes)); + long seqNo; + do { + seqNo = opsHaveValidSequenceNumbers ? randomNonNegativeLong() : SequenceNumbersService.UNASSIGNED_SEQ_NO; + opsHaveValidSequenceNumbers = opsHaveValidSequenceNumbers || !rarely(); + } while (seenSeqNos.contains(seqNo)); + if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seenSeqNos.add(seqNo); + } + writer.add(new BytesArray(bytes), seqNo); } writer.sync(); @@ -982,10 +992,14 @@ public void testTranslogWriter() throws IOException { final int value = buffer.getInt(); assertEquals(i, value); } + final long minSeqNo = seenSeqNos.stream().min(Long::compareTo).orElse(SequenceNumbersService.NO_OPS_PERFORMED); + final long maxSeqNo = seenSeqNos.stream().max(Long::compareTo).orElse(SequenceNumbersService.NO_OPS_PERFORMED); + assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo)); + assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo)); out.reset(bytes); out.writeInt(2048); - writer.add(new BytesArray(bytes)); + writer.add(new BytesArray(bytes), randomNonNegativeLong()); if (reader instanceof TranslogReader) { ByteBuffer buffer = ByteBuffer.allocate(4); @@ -1008,40 +1022,30 @@ public void testTranslogWriter() throws IOException { IOUtils.close(writer); } - public void testFailWriterWhileClosing() throws IOException { - Path tempDir = createTempDir(); - final FailSwitch fail = new FailSwitch(); - fail.failNever(); - TranslogConfig config = getTranslogConfig(tempDir); - try (Translog translog = getFailableTranslog(fail, config)) { - final TranslogWriter writer = translog.createWriter(0); - final int numOps = randomIntBetween(10, 100); - byte[] bytes = new byte[4]; - ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + public void testCloseIntoReader() throws IOException { + try (TranslogWriter writer = translog.createWriter(0)) { + final int numOps = randomIntBetween(8, 128); + final byte[] bytes = new byte[4]; + final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); for (int i = 0; i < numOps; i++) { out.reset(bytes); out.writeInt(i); - writer.add(new BytesArray(bytes)); + writer.add(new BytesArray(bytes), randomNonNegativeLong()); } writer.sync(); - try { - fail.failAlways(); - writer.closeIntoReader(); - fail(); - } catch (MockDirectoryWrapper.FakeIOException ex) { - } - try (TranslogReader reader = translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)))) { + final Checkpoint writerCheckpoint = writer.getCheckpoint(); + try (TranslogReader reader = writer.closeIntoReader()) { for (int i = 0; i < numOps; i++) { - ByteBuffer buffer = ByteBuffer.allocate(4); + final ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); buffer.flip(); final int value = buffer.getInt(); assertEquals(i, value); } + final Checkpoint readerCheckpoint = reader.getCheckpoint(); + assertThat(readerCheckpoint, equalTo(writerCheckpoint)); } - } - } public void testBasicRecovery() throws IOException { @@ -1209,12 +1213,12 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { TranslogConfig config = translog.getConfig(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); - Checkpoint corrupted = new Checkpoint(0, 0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO); + Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, translogFileGeneration=2, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration=0, globalCheckpoint=-2}"); + assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, generation=2, minSeqNo=0, maxSeqNo=0, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { @@ -1663,7 +1667,7 @@ ChannelFactory getChannelFactory() { FileChannel channel = factory.open(file, openOption); boolean success = false; try { - final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the 20bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : paritalWrites, throwUnknownException, channel); success = true; return throwingFileChannel; @@ -1962,11 +1966,26 @@ public void testWithRandomException() throws IOException { } } + private Checkpoint randomCheckpoint() { + final long a = randomNonNegativeLong(); + final long b = randomNonNegativeLong(); + final long minSeqNo; + final long maxSeqNo; + if (a <= b) { + minSeqNo = a; + maxSeqNo = b; + } else { + minSeqNo = b; + maxSeqNo = a; + } + return new Checkpoint(randomLong(), randomInt(), randomLong(), minSeqNo, maxSeqNo, randomNonNegativeLong()); + } + public void testCheckpointOnDiskFull() throws IOException { - Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong()); + final Checkpoint checkpoint = randomCheckpoint(); Path tempDir = createTempDir(); Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong()); + final Checkpoint checkpoint2 = randomCheckpoint(); try { Checkpoint.write((p, o) -> { if (randomBoolean()) { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java index 9a9ba438a1bb6..d008749506161 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.index.translog; -import org.apache.lucene.util.IOUtils; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -84,14 +84,13 @@ public void testTruncatedTranslog() throws Exception { checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-truncated.binary", "pre-2.0 translog"); } - public TranslogReader openReader(Path path, long id) throws IOException { - FileChannel channel = FileChannel.open(path, StandardOpenOption.READ); - try { - TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id, 0), null); - channel = null; - return reader; - } finally { - IOUtils.close(channel); + public TranslogReader openReader(final Path path, final long id) throws IOException { + try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) { + final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + final Checkpoint checkpoint = + new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO); + return TranslogReader.open(channel, path, checkpoint, null); } } }