From 6d1a93bf1977bc12091c137610a4b9271713bae1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 17:35:51 +0200 Subject: [PATCH 01/21] initial change + translog tests pass --- .../elasticsearch/index/IndexSettings.java | 4 +- .../elasticsearch/index/shard/IndexShard.java | 10 +- .../index/translog/Translog.java | 94 +++++++-- .../translog/TranslogDeletionPolicy.java | 11 +- .../index/translog/TranslogSnapshot.java | 2 +- .../index/translog/TranslogWriter.java | 30 +-- .../recovery/RecoverySourceHandler.java | 44 ++-- .../indices/recovery/RecoveryTarget.java | 4 +- .../recovery/RemoteRecoveryTargetHandler.java | 4 +- .../IndexLevelReplicationTests.java | 21 +- .../RecoveryDuringReplicationTests.java | 22 +- .../seqno/LocalCheckpointTrackerTests.java | 19 +- .../translog/TranslogDeletionPolicyTests.java | 9 +- .../index/translog/TranslogTests.java | 197 ++++++------------ .../recovery/RecoverySourceHandlerTests.java | 14 +- .../indices/recovery/RecoveryTests.java | 81 +++++++ .../org/elasticsearch/test/ESTestCase.java | 2 +- 17 files changed, 311 insertions(+), 257 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 43ddb09e61f0a..537344ca653a4 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -118,7 +118,7 @@ public final class IndexSettings { * the chance of ops based recoveries. **/ public static final Setting INDEX_TRANSLOG_RETENTION_AGE_SETTING = - Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic, + Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(12), TimeValue.timeValueMillis(-1), Property.Dynamic, Property.IndexScope); /** @@ -127,7 +127,7 @@ public final class IndexSettings { * the chance of ops based recoveries. **/ public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = - Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic, + Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 18f025c27c374..1d42c9025e90b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -876,13 +876,15 @@ public Engine.CommitId flush(FlushRequest request) { } /** - * Rolls the tranlog generation. + * Rolls the tranlog generation and cleans unneeded. * * @throws IOException if any file operations on the translog throw an I/O exception */ - private void rollTranslogGeneration() throws IOException { + private void rollTranslogAndTrimGeneration() throws IOException { final Engine engine = getEngine(); - engine.getTranslog().rollGeneration(); + final Translog translog = engine.getTranslog(); + translog.rollGeneration(); + translog.trimUnreferencedReaders(); } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { @@ -2071,7 +2073,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() throws Exception { - rollTranslogGeneration(); + rollTranslogAndTrimGeneration(); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 66d370c121f2c..4c57d58f7f5a4 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -377,7 +377,7 @@ public int totalOperations() { * Returns the size in bytes of the translog files that aren't committed to lucene. */ public long sizeInBytes() { - return sizeInBytes(deletionPolicy.getMinTranslogGenerationForRecovery()); + return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery()); } /** @@ -394,9 +394,19 @@ private int totalOperations(long minGeneration) { } /** - * Returns the size in bytes of the translog files that aren't committed to lucene. + * Returns the number of operations in the transaction files that aren't committed to lucene.. + */ + private int totalOperationsInGensAboveSeqNo(long minSeqNo) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum(); + } + } + + /** + * Returns the size in bytes of the translog files above the given generation */ - private long sizeInBytes(long minGeneration) { + private long sizeInBytesByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) @@ -406,6 +416,16 @@ private long sizeInBytes(long minGeneration) { } } + /** + * Returns the size in bytes of the translog files with ops above the given seqNo + */ + private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum(); + } + } + /** * Creates a new translog for the specified generation. * @@ -560,6 +580,24 @@ public Snapshot newSnapshot(long minGeneration) { } } + private Stream readersAboveMinSeqNo(long minSeqNo) { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + return Stream.concat(readers.stream(), Stream.of(current)) + .filter(reader -> { + final long maxSeqNo = reader.getCheckpoint().maxSeqNo; + return maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || + maxSeqNo >= minSeqNo; + }); + } + + private Snapshot createSnapshotFromMinSeqNo(long minSeqNo) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + Snapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new); + return new MultiSnapshot(snapshots); + } + } + /** * Returns a view into the current translog that is guaranteed to retain all current operations * while receiving future ones as well @@ -567,7 +605,8 @@ public Snapshot newSnapshot(long minGeneration) { public Translog.View newView() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - final long viewGen = deletionPolicy.acquireTranslogGenForView(); + final long viewGen = getMinFileGeneration(); + deletionPolicy.acquireTranslogGenForView(viewGen); try { return new View(viewGen); } catch (Exception e) { @@ -698,35 +737,36 @@ public TranslogDeletionPolicy getDeletionPolicy() { public class View implements Closeable { AtomicBoolean closed = new AtomicBoolean(); - final long minGeneration; + final long viewGenToRelease; - View(long minGeneration) { - this.minGeneration = minGeneration; - } - - /** this smallest translog generation in this view */ - public long minTranslogGeneration() { - return minGeneration; + View(long viewGenToRelease) { + this.viewGenToRelease = viewGenToRelease; } /** - * The total number of operations in the view. + * The total number of operations in the view files which contain an operation with a sequence number + * above the given min sequence numbers. This will be the number of operations in snapshot taken + * by calling {@link #snapshot(long)} with the same parameter. */ - public int totalOperations() { - return Translog.this.totalOperations(minGeneration); + public int estimateTotalOperations(long minSequenceNumber) { + return Translog.this.totalOperationsInGensAboveSeqNo(minSequenceNumber); } /** - * Returns the size in bytes of the files behind the view. + * The total size of the view files which contain an operation with a sequence number + * above the given min sequence numbers. These are the files that would need to be read by snapshot + * acquired {@link #snapshot(long)} with the same parameter. */ - public long sizeInBytes() { - return Translog.this.sizeInBytes(minGeneration); + public long estimateSizeInBytes(long minSequenceNumber) { + return Translog.this.sizeOfGensAboveSeqNoInBytes(minSequenceNumber); } - /** create a snapshot from this view */ - public Snapshot snapshot() { + /** + * create a snapshot from this view, containing all + * operations from the given sequence number and up (with potentially some more) */ + public Snapshot snapshot(long minSequenceNumber) { ensureOpen(); - return Translog.this.newSnapshot(minGeneration); + return Translog.this.createSnapshotFromMinSeqNo(minSequenceNumber); } void ensureOpen() { @@ -738,8 +778,8 @@ void ensureOpen() { @Override public void close() throws IOException { if (closed.getAndSet(true) == false) { - logger.trace("closing view starting at translog [{}]", minGeneration); - deletionPolicy.releaseTranslogGenView(minGeneration); + logger.trace("closing view starting at translog [{}]", viewGenToRelease); + deletionPolicy.releaseTranslogGenView(viewGenToRelease); trimUnreferencedReaders(); closeFilesIfNoPendingViews(); } @@ -1663,4 +1703,12 @@ public String getTranslogUUID() { return translogUUID; } + + TranslogWriter getCurrent() { + return current; + } + + List getReaders() { + return readers; + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 732b38fcedfe4..d222bf6f6bd9c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -69,9 +69,8 @@ public synchronized void setRetentionAgeInMillis(long ageInMillis) { * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. */ - synchronized long acquireTranslogGenForView() { - translogRefCounts.computeIfAbsent(minTranslogGenerationForRecovery, l -> Counter.newCounter(false)).addAndGet(1); - return minTranslogGenerationForRecovery; + synchronized void acquireTranslogGenForView(final long genForView) { + translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1); } /** returns the number of generations that were acquired for views */ @@ -80,7 +79,7 @@ synchronized int pendingViewsCount() { } /** - * releases a generation that was acquired by {@link #acquireTranslogGenForView()} + * releases a generation that was acquired by {@link #acquireTranslogGenForView(long)} */ synchronized void releaseTranslogGenView(long translogGen) { Counter current = translogRefCounts.get(translogGen); @@ -154,4 +153,8 @@ private long getMinTranslogGenRequiredByViews() { public synchronized long getMinTranslogGenerationForRecovery() { return minTranslogGenerationForRecovery; } + + synchronized long getViewCount(long viewGen) { + return translogRefCounts.getOrDefault(viewGen, Counter.newCounter(false)).get(); + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 908cf511db03b..312b7fc9db01f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -99,7 +99,7 @@ public String toString() { return "TranslogSnapshot{" + "readOperations=" + readOperations + ", position=" + position + - ", totalOperations=" + totalOperations + + ", estimateTotalOperations=" + totalOperations + ", length=" + length + ", reusableBuffer=" + reusableBuffer + '}'; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 2c0bd0c7d89c8..928511170938b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -255,8 +255,9 @@ public int totalOperations() { } @Override - Checkpoint getCheckpoint() { - return getLastSyncedCheckpoint(); + synchronized Checkpoint getCheckpoint() { + return new Checkpoint(totalOffset, operationCounter, generation, minSeqNo, maxSeqNo, + globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong()); } @Override @@ -329,22 +330,12 @@ public boolean syncUpTo(long offset) throws IOException { if (lastSyncedCheckpoint.offset < offset && syncNeeded()) { // double checked locking - we don't want to fsync unless we have to and now that we have // the lock we should check again since if this code is busy we might have fsynced enough already - final long offsetToSync; - final int opsCounter; - final long currentMinSeqNo; - final long currentMaxSeqNo; - final long currentGlobalCheckpoint; - final long currentMinTranslogGeneration; + final Checkpoint checkpointToSync; synchronized (this) { ensureOpen(); try { outputStream.flush(); - offsetToSync = totalOffset; - opsCounter = operationCounter; - currentMinSeqNo = minSeqNo; - currentMaxSeqNo = maxSeqNo; - currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong(); - currentMinTranslogGeneration = minTranslogGenerationSupplier.getAsLong(); + checkpointToSync = getCheckpoint(); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -356,12 +347,9 @@ public boolean syncUpTo(long offset) throws IOException { } // now do the actual fsync outside of the synchronized block such that // we can continue writing to the buffer etc. - final Checkpoint checkpoint; try { channel.force(false); - checkpoint = - writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, - currentGlobalCheckpoint, currentMinTranslogGeneration, path.getParent(), generation); + writeCheckpoint(channelFactory, path.getParent(), checkpointToSync); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -370,9 +358,9 @@ public boolean syncUpTo(long offset) throws IOException { } throw ex; } - assert lastSyncedCheckpoint.offset <= offsetToSync : - "illegal state: " + lastSyncedCheckpoint.offset + " <= " + offsetToSync; - lastSyncedCheckpoint = checkpoint; // write protected by syncLock + assert lastSyncedCheckpoint.offset <= checkpointToSync.offset : + "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset; + lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock return true; } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 8abd3a05d8e68..36f71899fa8da 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -58,6 +58,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -128,13 +129,14 @@ public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recov */ public RecoveryResponse recoverToTarget() throws IOException { try (Translog.View translogView = shard.acquireTranslogView()) { - logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration()); + final long startingSeqNo; boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO && isTranslogReadyForSequenceNumberBasedRecovery(translogView); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); + startingSeqNo = request.startingSeqNo(); } else { final Engine.IndexCommitRef phase1Snapshot; try { @@ -143,8 +145,12 @@ public RecoveryResponse recoverToTarget() throws IOException { IOUtils.closeWhileHandlingException(translogView); throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } + // we set this to unassigned to create a translog roughly according to the retention policy + // on the target + startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + try { - phase1(phase1Snapshot.getIndexCommit(), translogView); + phase1(phase1Snapshot.getIndexCommit(), translogView, startingSeqNo); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -157,7 +163,7 @@ public RecoveryResponse recoverToTarget() throws IOException { } try { - prepareTargetForTranslog(translogView.totalOperations()); + prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -180,12 +186,10 @@ public RecoveryResponse recoverToTarget() throws IOException { throw new IndexShardRelocatedException(request.shardId()); } - logger.trace("snapshot translog for recovery; current size is [{}]", translogView.totalOperations()); + logger.trace("snapshot translog for recovery; current size is [{}]", translogView.estimateTotalOperations(startingSeqNo)); final long targetLocalCheckpoint; try { - final long startingSeqNo = - isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO; - targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot()); + targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot(startingSeqNo)); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -219,7 +223,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); - final Translog.Snapshot snapshot = translogView.snapshot(); + final Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo); Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { @@ -244,7 +248,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public void phase1(final IndexCommit snapshot, final Translog.View translogView) { + public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; @@ -322,10 +326,10 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, - response.phase1ExistingFileSizes, translogView.totalOperations())); + response.phase1ExistingFileSizes, translogView.estimateTotalOperations(startSeqNo))); // How many bytes we've copied since we last called RateLimiter.pause final Function outputStreamFactories = - md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes); + md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView, startSeqNo), chunkSizeInBytes); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file @@ -336,7 +340,8 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) // related to this recovery (out of date segments, for example) // are deleted try { - cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogView.totalOperations(), recoverySourceMetadata)); + cancellableThreads.executeIO(() -> + recoveryTarget.cleanFiles(translogView.estimateTotalOperations(startSeqNo), recoverySourceMetadata)); } catch (RemoteTransportException | IOException targetException) { final IOException corruptIndexException; // we realized that after the index was copied and we wanted to finalize the recovery @@ -347,11 +352,8 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) try { final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot); StoreFileMetaData[] metadata = - StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new - StoreFileMetaData[size]); - ArrayUtil.timSort(metadata, (o1, o2) -> { - return Long.compare(o1.length(), o2.length()); // check small files first - }); + StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new); + ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first for (StoreFileMetaData md : metadata) { cancellableThreads.checkForCancel(); logger.debug("checking integrity for file {} after remove corruption exception", md); @@ -577,11 +579,13 @@ public String toString() { final class RecoveryOutputStream extends OutputStream { private final StoreFileMetaData md; private final Translog.View translogView; + private final long startSeqNp; private long position = 0; - RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView) { + RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView, long startSeqNp) { this.md = md; this.translogView = translogView; + this.startSeqNp = startSeqNp; } @Override @@ -599,7 +603,7 @@ public void write(byte[] b, int offset, int length) throws IOException { private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { // Actually send the file chunk to the target node, waiting for it to complete cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.totalOperations()) + recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.estimateTotalOperations(startSeqNp)) ); if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us throw new IndexShardClosedException(request.shardId()); @@ -610,7 +614,7 @@ private void sendNextChunk(long position, BytesArray content, boolean lastChunk) void sendFiles(Store store, StoreFileMetaData[] files, Function outputStreamFactory) throws Exception { store.incRef(); try { - ArrayUtil.timSort(files, (a, b) -> Long.compare(a.length(), b.length())); // send smallest first + ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first for (int i = 0; i < files.length; i++) { final StoreFileMetaData md = files[i]; try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 6a465f111150c..fd5a494932e1f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -61,8 +61,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import java.util.function.LongConsumer; +import java.util.stream.Collectors; /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of @@ -403,6 +403,8 @@ public long indexTranslogOperations(List operations, int tot translog.incrementRecoveredOperations(); } indexShard().sync(); + // roll over / flush / trim if needed + indexShard().afterWriteOperation(); return indexShard().getLocalCheckpoint(); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index a4f24b710b2fa..30129780de182 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -28,10 +28,8 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.FutureTransportResponseHandler; import org.elasticsearch.transport.TransportFuture; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -161,7 +159,7 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk, totalTranslogOps, - /* we send totalOperations with every request since we collect stats on the target and that way we can + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 9b2200d8be3f7..33a1cfed0b63e 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -18,17 +18,15 @@ */ package org.elasticsearch.index.replication; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexableField; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -44,7 +42,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryTarget; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.hamcrest.Matcher; import java.io.IOException; @@ -272,9 +269,8 @@ public void testRequestFailureReplication() throws Exception { assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); shards.assertAllEqual(0); for (IndexShard indexShard : shards) { - try(Translog.View view = indexShard.acquireTranslogView()) { - assertThat(view.totalOperations(), equalTo(0)); - } + assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog", + indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0)); } // add some replicas @@ -292,9 +288,8 @@ public void testRequestFailureReplication() throws Exception { assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); shards.assertAllEqual(0); for (IndexShard indexShard : shards) { - try(Translog.View view = indexShard.acquireTranslogView()) { - assertThat(view.totalOperations(), equalTo(0)); - } + assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog", + indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0)); } } } @@ -327,8 +322,8 @@ private static void assertNoOpTranslogOperationForDocumentFailure( String failureMessage) throws IOException { for (IndexShard indexShard : replicationGroup) { try(Translog.View view = indexShard.acquireTranslogView()) { - assertThat(view.totalOperations(), equalTo(expectedOperation)); - final Translog.Snapshot snapshot = view.snapshot(); + assertThat(view.estimateTotalOperations(SequenceNumbersService.NO_OPS_PERFORMED), equalTo(expectedOperation)); + final Translog.Snapshot snapshot = view.snapshot(SequenceNumbersService.NO_OPS_PERFORMED); long expectedSeqNo = 0L; Translog.Operation op = snapshot.next(); do { diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 1c7705d534afb..99879de5bac85 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -113,18 +112,25 @@ public void testRecoveryOfDisconnectedReplica() throws Exception { docs += missingOnReplica; replicaHasDocsSinceLastFlushedCheckpoint |= missingOnReplica > 0; - final boolean flushPrimary = randomBoolean(); - if (flushPrimary) { + final boolean translogTrimmed; + if (randomBoolean()) { shards.flush(); + translogTrimmed = randomBoolean(); + if (translogTrimmed) { + final Translog translog = shards.getPrimary().getTranslog(); + translog.getDeletionPolicy().setRetentionAgeInMillis(0); + translog.trimUnreferencedReaders(); + } + } else { + translogTrimmed = false; } - originalReplica.close("disconnected", false); IOUtils.close(originalReplica.store()); final IndexShard recoveredReplica = shards.addReplicaWithExistingPath(originalReplica.shardPath(), originalReplica.routingEntry().currentNodeId()); shards.recoverReplica(recoveredReplica); - if (flushPrimary && replicaHasDocsSinceLastFlushedCheckpoint) { - // replica has something to catch up with, but since we flushed the primary, we should fall back to full recovery + if (translogTrimmed && replicaHasDocsSinceLastFlushedCheckpoint) { + // replica has something to catch up with, but since we trimmed the primary translog, we should fall back to full recovery assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty())); } else { assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty()); @@ -407,7 +413,7 @@ public long indexTranslogOperations(final List operations, f } } - private static class BlockingTarget extends RecoveryTarget { + public static class BlockingTarget extends RecoveryTarget { private final CountDownLatch recoveryBlocked; private final CountDownLatch releaseRecovery; @@ -416,7 +422,7 @@ private static class BlockingTarget extends RecoveryTarget { EnumSet.of(RecoveryState.Stage.INDEX, RecoveryState.Stage.TRANSLOG, RecoveryState.Stage.FINALIZE); private final Logger logger; - BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, + public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, Logger logger) { super(shard, sourceNode, listener, version -> {}); this.recoveryBlocked = recoveryBlocked; diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 74183670ecb1f..3d280b4d28c98 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -45,16 +45,9 @@ public class LocalCheckpointTrackerTests extends ESTestCase { private LocalCheckpointTracker tracker; - private final int SMALL_CHUNK_SIZE = 4; + private static final int SMALL_CHUNK_SIZE = 4; - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - tracker = getTracker(); - } - - private LocalCheckpointTracker getTracker() { + public static LocalCheckpointTracker createEmptyTracker() { return new LocalCheckpointTracker( IndexSettingsModule.newIndexSettings( "test", @@ -67,6 +60,13 @@ private LocalCheckpointTracker getTracker() { ); } + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tracker = createEmptyTracker(); + } + public void testSimplePrimary() { long seqNo1, seqNo2; assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); @@ -236,5 +236,4 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte thread.join(); } - } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 3ed595543f8ea..05e05e0557278 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -126,20 +126,17 @@ public void testRetentionHierarchy() throws IOException { selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationBySize = allGens.get(selectedReader).generation; long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); - selectedReader = randomIntBetween(0, allGens.size() - 1); - long committedGen = allGens.get(selectedReader).generation; deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(size); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize)); // make a new policy as committed gen can't go backwards (for now) deletionPolicy = new MockDeletionPolicy(now, size, maxAge); + long committedGen = randomFrom(allGens).generation; deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); - long viewGen = deletionPolicy.acquireTranslogGenForView(); - selectedReader = randomIntBetween(selectedReader, allGens.size() - 1); - committedGen = allGens.get(selectedReader).generation; - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); + long viewGen = randomFrom(allGens).generation; + deletionPolicy.acquireTranslogGenForView(viewGen); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min( Math.min(committedGen, viewGen), diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 21bc1a14bc55f..e1f82583aaab5 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -63,6 +62,8 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.LocalCheckpointTrackerTests; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog.Location; @@ -158,7 +159,10 @@ private void rollAndCommit(Translog translog) throws IOException { private void commit(Translog translog, long genToCommit) throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); + long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent()); translog.trimUnreferencedReaders(); + assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); + assertFilePresences(translog); } @Override @@ -190,9 +194,12 @@ private Translog create(Path path) throws IOException { private TranslogConfig getTranslogConfig(final Path path) { final Settings settings = Settings - .builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); + .builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + // only randomize between nog age retention and a long one, so failures will have a chance of reproducing + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b") + .build(); return getTranslogConfig(path, settings); } @@ -317,7 +324,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); markCurrentGenAsCommitted(translog); - snapshot = translog.newSnapshot(); + snapshot = translog.newSnapshot(firstId + 1); assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); } @@ -719,7 +726,9 @@ public void testConcurrentWriteViewsAndSnapshot() throws Throwable { final AtomicBoolean run = new AtomicBoolean(true); final Object flushMutex = new Object(); - + final AtomicLong lastCommittedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED); + final LocalCheckpointTracker tracker = LocalCheckpointTrackerTests.createEmptyTracker(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); // any errors on threads final List errors = new CopyOnWriteArrayList<>(); logger.debug("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps); @@ -732,7 +741,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep barrier.await(); int counter = 0; while (run.get() && idGenerator.get() < maxOps) { - long id = idGenerator.incrementAndGet(); + long id = idGenerator.getAndIncrement(); final Translog.Operation op; final Translog.Operation.Type type = Translog.Operation.Type.values()[((int) (id % Translog.Operation.Type.values().length))]; @@ -751,6 +760,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep throw new AssertionError("unsupported operation type [" + type + "]"); } Translog.Location location = translog.add(op); + tracker.markSeqNoAsCompleted(id); Translog.Location existing = writtenOps.put(op, location); if (existing != null) { fail("duplicate op [" + op + "], old entry at " + location); @@ -762,7 +772,12 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep synchronized (flushMutex) { // we need not do this concurrently as we need to make sure that the generation // we're committing - is still present when we're committing - rollAndCommit(translog); + long localCheckpoint = tracker.getCheckpoint() + 1; + translog.rollGeneration(); + deletionPolicy.setMinTranslogGenerationForRecovery( + translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + translog.trimUnreferencedReaders(); + lastCommittedLocalCheckpoint.set(localCheckpoint); } } if (id % 7 == 0) { @@ -788,7 +803,7 @@ public void onFailure(Exception e) { final String threadId = "reader_" + i; readers[i] = new Thread(new AbstractRunnable() { Translog.View view = null; - Set writtenOpsAtView; + long committedLocalCheckpointAtView; @Override public void onFailure(Exception e) { @@ -811,9 +826,10 @@ void closeView() throws IOException { void newView() throws IOException { closeView(); view = translog.newView(); - // captures the currently written ops so we know what to expect from the view - writtenOpsAtView = new HashSet<>(writtenOps.keySet()); - logger.debug("--> [{}] opened view from [{}]", threadId, view.minTranslogGeneration()); + // captures the last committed checkpoint, while holding the view, simulating + // recovery logic which captures a view and gets a lucene commit + committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get(); + logger.debug("--> [{}] opened view from [{}]", threadId, view.viewGenToRelease); } @Override @@ -828,23 +844,18 @@ protected void doRun() throws Exception { // captures al views that are written since the view was created (with a small caveat see bellow) // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); - expectedOps.removeAll(writtenOpsAtView); - Translog.Snapshot snapshot = view.snapshot(); + expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); + Translog.Snapshot snapshot = view.snapshot(committedLocalCheckpointAtView + 1L); Translog.Operation op; while ((op = snapshot.next()) != null) { expectedOps.remove(op); } if (expectedOps.isEmpty() == false) { - StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()).append(" operations"); + StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()) + .append(" operations from [").append(committedLocalCheckpointAtView + 1L).append("]"); boolean failed = false; for (Translog.Operation expectedOp : expectedOps) { final Translog.Location loc = writtenOps.get(expectedOp); - if (loc.generation < view.minTranslogGeneration()) { - // writtenOps is only updated after the op was written to the translog. This mean - // that ops written to the translog before the view was taken (and will be missing from the view) - // may yet be available in writtenOpsAtView, meaning we will erroneously expect them - continue; - } failed = true; missed.append("\n --> [").append(expectedOp).append("] written at ").append(loc); } @@ -854,7 +865,7 @@ protected void doRun() throws Exception { } // slow down things a bit and spread out testing.. synchronized (signalReaderSomeDataWasIndexed) { - if (idGenerator.get() < maxOps){ + if (idGenerator.get() < maxOps) { signalReaderSomeDataWasIndexed.wait(); } } @@ -1154,7 +1165,7 @@ public void testBasicRecovery() throws IOException { translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); + Translog.Snapshot snapshot = translog.newSnapshot(translogGeneration.translogFileGeneration); for (int i = minUncommittedOp; i < translogOperations; i++) { assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); Translog.Operation next = snapshot.next(); @@ -1388,7 +1399,7 @@ public void testOpenForeignTranslog() throws IOException { } this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); - Translog.Snapshot snapshot = this.translog.newSnapshot(); + Translog.Snapshot snapshot = this.translog.newSnapshot(translogGeneration.translogFileGeneration); for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); assertNotNull("" + i, next); @@ -1772,6 +1783,10 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { final long comittedGeneration; final String translogUUID; try (Translog translog = getFailableTranslog(fail, config)) { + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + // disable retention so we trim things + deletionPolicy.setRetentionSizeInBytes(-1); + deletionPolicy.setRetentionAgeInMillis(-1); translogUUID = translog.getTranslogUUID(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations / 2; op++) { @@ -1788,9 +1803,10 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { translog.rollGeneration(); } } + deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); fail.failRandomly(); try { - commit(translog, comittedGeneration); + translog.trimUnreferencedReaders(); } catch (Exception e) { // expected... } @@ -2162,7 +2178,7 @@ public void testWithRandomException() throws IOException { TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - Translog.Snapshot snapshot = translog.newSnapshot(); + Translog.Snapshot snapshot = translog.newSnapshot(minGenForRecovery); assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { Translog.Operation next = snapshot.next(); @@ -2302,11 +2318,14 @@ public void testTranslogOpSerialization() throws Exception { public void testRollGeneration() throws Exception { // make sure we keep some files around final boolean longRetention = randomBoolean(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); if (longRetention) { - translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); + deletionPolicy.setRetentionAgeInMillis(3600 * 1000); } else { - translog.getDeletionPolicy().setRetentionAgeInMillis(-1); + deletionPolicy.setRetentionAgeInMillis(-1); } + // we control retention via time, disable size based calculations for simplicity + deletionPolicy.setRetentionSizeInBytes(-1); final long generation = translog.currentFileGeneration(); final int rolls = randomIntBetween(1, 16); int totalOperations = 0; @@ -2333,7 +2352,7 @@ public void testRollGeneration() throws Exception { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); } - translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1); + deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1); assertBusy(() -> { translog.trimUnreferencedReaders(); for (int i = 0; i < rolls; i++) { @@ -2349,65 +2368,6 @@ public void testRollGeneration() throws Exception { assertFileIsPresent(translog, generation + rolls); } - public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException { - final long generation = translog.currentFileGeneration(); - int seqNo = 0; - - final int rollsBefore = randomIntBetween(0, 16); - for (int r = 1; r <= rollsBefore; r++) { - final int operationsBefore = randomIntBetween(1, 256); - for (int i = 0; i < operationsBefore; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); - } - - try (Releasable ignored = translog.writeLock.acquire()) { - translog.rollGeneration(); - } - - assertThat(translog.currentFileGeneration(), equalTo(generation + r)); - for (int i = 0; i <= r; i++) { - assertFileIsPresent(translog, generation + r); - } - } - - assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore)); - translog.rollGeneration(); - assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1)); - - for (int i = 0; i <= rollsBefore + 1; i++) { - assertFileIsPresent(translog, generation + i); - } - - final int rollsBetween = randomIntBetween(0, 16); - for (int r = 1; r <= rollsBetween; r++) { - final int operationsBetween = randomIntBetween(1, 256); - for (int i = 0; i < operationsBetween; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); - } - - try (Releasable ignored = translog.writeLock.acquire()) { - translog.rollGeneration(); - } - - assertThat( - translog.currentFileGeneration(), - equalTo(generation + rollsBefore + 1 + r)); - for (int i = 0; i <= rollsBefore + 1 + r; i++) { - assertFileIsPresent(translog, generation + i); - } - } - - commit(translog, generation + rollsBefore + 1); - - for (int i = 0; i <= rollsBefore; i++) { - assertFileDeleted(translog, generation + i); - } - for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) { - assertFileIsPresent(translog, generation + i); - } - - } - public void testMinGenerationForSeqNo() throws IOException { final int operations = randomIntBetween(1, 4096); final List shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList()); @@ -2471,65 +2431,26 @@ public void testSimpleCommit() throws IOException { final long generation = randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration())); commit(translog, generation); - for (long g = 0; g < generation; g++) { - assertFileDeleted(translog, g); - } - for (long g = generation; g <= translog.currentFileGeneration(); g++) { - assertFileIsPresent(translog, g); - } } - public void testPrepareCommitAndCommit() throws IOException { + public void testOpenViewIsPassToDeletionPolicy() throws IOException { final int operations = randomIntBetween(1, 4096); - long seqNo = 0; - long last = -1; + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); for (int i = 0; i < operations; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); + translog.add(new Translog.NoOp(i, 0, "test")); if (rarely()) { - final long generation = translog.currentFileGeneration(); translog.rollGeneration(); - if (rarely()) { - // simulate generation filling up and rolling between preparing the commit and committing - translog.rollGeneration(); - } - final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation)); - commit(translog, committedGeneration); - last = committedGeneration; - for (long g = 0; g < committedGeneration; g++) { - assertFileDeleted(translog, g); - } - for (long g = committedGeneration; g <= translog.currentFileGeneration(); g++) { - assertFileIsPresent(translog, g); - } } - } - } - - public void testCommitWithOpenView() throws IOException { - final int operations = randomIntBetween(1, 4096); - long seqNo = 0; - long lastCommittedGeneration = -1; - for (int i = 0; i < operations; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); if (rarely()) { - try (Translog.View ignored = translog.newView()) { - final long viewGeneration = lastCommittedGeneration; - translog.rollGeneration(); - final long committedGeneration = randomIntBetween( - Math.max(1, Math.toIntExact(lastCommittedGeneration)), - Math.toIntExact(translog.currentFileGeneration())); - commit(translog, committedGeneration); - lastCommittedGeneration = committedGeneration; - // with an open view, committing should preserve generations back to the last committed generation - for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) { - assertFileDeleted(translog, g); - } - // the view generation could be -1 if no commit has been performed - final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration)); - for (long g = max; g <= translog.currentFileGeneration(); g++) { - assertFileIsPresent(translog, g); - } + commit(translog, randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), translog.currentFileGeneration())); + } + if (frequently()) { + long viewGen; + try (Translog.View view = translog.newView()) { + viewGen = view.viewGenToRelease; + assertThat(deletionPolicy.getViewCount(view.viewGenToRelease), equalTo(1L)); } + assertThat(deletionPolicy.getViewCount(viewGen), equalTo(0L)); } } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 09a787ce0d36f..e73bd8a949748 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -377,6 +377,11 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Completed() throws I when(shard.state()).thenReturn(IndexShardState.RELOCATED); when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); final AtomicBoolean phase1Called = new AtomicBoolean(); +// final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class); +// when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef); +// final IndexCommit indexCommit = mock(IndexCommit.class); +// when(indexCommitRef.getIndexCommit()).thenReturn(indexCommit); +// when(indexCommit.getUserData()).thenReturn(Collections.emptyMap());final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); final RecoverySourceHandler handler = new RecoverySourceHandler( @@ -394,7 +399,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl } @Override - public void phase1(final IndexCommit snapshot, final Translog.View translogView) { + public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) { phase1Called.set(true); } @@ -451,6 +456,11 @@ public void testWaitForClusterStateOnPrimaryRelocation() throws IOException, Int }).when(shard).relocated(any(String.class)); when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); +// final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class); +// when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef); +// final IndexCommit indexCommit = mock(IndexCommit.class); +// when(indexCommitRef.getIndexCommit()).thenReturn(indexCommit); +// when(indexCommit.getUserData()).thenReturn(Collections.emptyMap()); final Supplier currentClusterStateVersionSupplier = () -> { assertFalse(ensureClusterStateVersionCalled.get()); assertTrue(recoveriesDelayed.get()); @@ -487,7 +497,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl } @Override - public void phase1(final IndexCommit snapshot, final Translog.View translogView) { + public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) { phase1Called.set(true); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java new file mode 100644 index 0000000000000..d898d1a184df4 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; +import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; +import org.elasticsearch.index.shard.IndexShard; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; + +import static org.hamcrest.Matchers.equalTo; + +public class RecoveryTests extends ESIndexLevelReplicationTestCase { + + public void testTranslogHistoryTransferred() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startPrimary(); + int docs = shards.indexDocs(10); + shards.getPrimary().getTranslog().rollGeneration(); + shards.flush(); + if (randomBoolean()) { + docs += shards.indexDocs(10); + } + shards.addReplica(); + shards.startAll(); + final IndexShard replica = shards.getReplicas().get(0); + assertThat(replica.getTranslog().totalOperations(), equalTo(docs)); + } + } + + + public void testRetentionPolicyChangeDuringRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startPrimary(); + shards.indexDocs(10); + shards.getPrimary().getTranslog().rollGeneration(); + shards.flush(); + shards.indexDocs(10); + final IndexShard replica = shards.addReplica(); + final CountDownLatch recoveryBlocked = new CountDownLatch(1); + final CountDownLatch releaseRecovery = new CountDownLatch(1); + Future future = shards.asyncRecoverReplica(replica, + (indexShard, node) -> new RecoveryDuringReplicationTests.BlockingTarget(RecoveryState.Stage.TRANSLOG, + recoveryBlocked, releaseRecovery, indexShard, node, recoveryListener, logger)); + recoveryBlocked.await(); + IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData()); + builder.settings(Settings.builder().put(replica.indexSettings().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + // force a roll and flush + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), "100b") + ); + replica.indexSettings().updateIndexMetaData(builder.build()); + releaseRecovery.countDown(); + future.get(); + // rolling/flushing is async + assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0))); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 39dde6c14552e..bf8888fa28e39 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -633,7 +633,7 @@ public static String[] generateRandomStringArray(int maxArraySize, int maxString private static final String[] TIME_SUFFIXES = new String[]{"d", "h", "ms", "s", "m", "micros", "nanos"}; - public static String randomTimeValue(int lower, int upper, String[] suffixes) { + public static String randomTimeValue(int lower, int upper, String... suffixes) { return randomIntBetween(lower, upper) + randomFrom(suffixes); } From 2428bb190ce32b674f14c2bc960c27a1b03f3d36 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 20:26:25 +0200 Subject: [PATCH 02/21] add stats to distinguish between committed and uncommitted --- .../index/translog/Translog.java | 22 +++++-- .../index/translog/TranslogStats.java | 52 ++++++++++++---- .../index/translog/TranslogTests.java | 42 ++++++++++--- .../test/indices.stats/20_translog.yml | 62 +++++++++++++++++++ 4 files changed, 152 insertions(+), 26 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4c57d58f7f5a4..292ab155e1aaf 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -367,19 +367,33 @@ long getMinFileGeneration() { /** - * Returns the number of operations in the transaction files that aren't committed to lucene.. + * Returns the number of operations in the translog files that aren't committed to lucene. */ - public int totalOperations() { + public int uncommittedOperations() { return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery()); } /** * Returns the size in bytes of the translog files that aren't committed to lucene. */ - public long sizeInBytes() { + public long uncommittedSizeInBytes() { return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery()); } + /** + * Returns the number of operations in the translog files + */ + public int totalOperations() { + return totalOperations(-1); + } + + /** + * Returns the size in bytes of the v files + */ + public long sizeInBytes() { + return sizeInBytesByMinGen(-1); + } + /** * Returns the number of operations in the transaction files that aren't committed to lucene.. */ @@ -713,7 +727,7 @@ private void closeOnTragicEvent(Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - return new TranslogStats(totalOperations(), sizeInBytes()); + return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes()); } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java index e60fd2086b93c..d87a1d24bd3f3 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.translog; +import org.elasticsearch.Version; import org.elasticsearch.action.support.ToXContentToBytes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -30,20 +31,29 @@ public class TranslogStats extends ToXContentToBytes implements Streamable { private long translogSizeInBytes; private int numberOfOperations; + private long uncommittedSizeInBytes; + private int uncommittedOperations; public TranslogStats() { } - public TranslogStats(int numberOfOperations, long translogSizeInBytes) { + public TranslogStats(int numberOfOperations, long translogSizeInBytes, int uncommittedOperations, long uncommittedSizeInBytes) { if (numberOfOperations < 0) { throw new IllegalArgumentException("numberOfOperations must be >= 0"); } if (translogSizeInBytes < 0) { throw new IllegalArgumentException("translogSizeInBytes must be >= 0"); } - assert translogSizeInBytes >= 0 : "translogSizeInBytes must be >= 0, got [" + translogSizeInBytes + "]"; + if (uncommittedOperations < 0) { + throw new IllegalArgumentException("uncommittedOperations must be >= 0"); + } + if (uncommittedSizeInBytes < 0) { + throw new IllegalArgumentException("uncommittedSizeInBytes must be >= 0"); + } this.numberOfOperations = numberOfOperations; this.translogSizeInBytes = translogSizeInBytes; + this.uncommittedSizeInBytes = uncommittedSizeInBytes; + this.uncommittedOperations = uncommittedOperations; } public void add(TranslogStats translogStats) { @@ -53,6 +63,8 @@ public void add(TranslogStats translogStats) { this.numberOfOperations += translogStats.numberOfOperations; this.translogSizeInBytes += translogStats.translogSizeInBytes; + this.uncommittedOperations += translogStats.uncommittedOperations; + this.uncommittedSizeInBytes += translogStats.uncommittedSizeInBytes; } public long getTranslogSizeInBytes() { @@ -63,31 +75,47 @@ public long estimatedNumberOfOperations() { return numberOfOperations; } + /** the size of the generations in the translog that weren't yet to comitted to lucene */ + public long getUncommittedSizeInBytes() { + return uncommittedSizeInBytes; + } + + /** the number of operations in generations of the translog that weren't yet to comitted to lucene */ + public int getUncommittedOperations() { + return uncommittedOperations; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.TRANSLOG); - builder.field(Fields.OPERATIONS, numberOfOperations); - builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, translogSizeInBytes); + builder.startObject("translog"); + builder.field("operations", numberOfOperations); + builder.byteSizeField("size_in_bytes", "size", translogSizeInBytes); + builder.field("uncommitted_operations", uncommittedOperations); + builder.byteSizeField("uncommitted_size_in_bytes", "uncommitted_size", uncommittedSizeInBytes); builder.endObject(); return builder; } - static final class Fields { - static final String TRANSLOG = "translog"; - static final String OPERATIONS = "operations"; - static final String SIZE = "size"; - static final String SIZE_IN_BYTES = "size_in_bytes"; - } - @Override public void readFrom(StreamInput in) throws IOException { numberOfOperations = in.readVInt(); translogSizeInBytes = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) { + uncommittedOperations = in.readVInt(); + uncommittedSizeInBytes = in.readVLong(); + } else { + uncommittedOperations = numberOfOperations; + uncommittedSizeInBytes = translogSizeInBytes; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(numberOfOperations); out.writeVLong(translogSizeInBytes); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) { + out.writeVInt(uncommittedOperations); + out.writeVLong(uncommittedSizeInBytes); + } } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e1f82583aaab5..2f7b5385f947a 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -356,6 +356,8 @@ public void testStats() throws IOException { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(97L)); + assertThat(stats.getUncommittedOperations(), equalTo(1)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(97L)); } translog.add(new Translog.Delete("test", "2", 1, newUid("2"))); @@ -363,6 +365,8 @@ public void testStats() throws IOException { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(146L)); + assertThat(stats.getUncommittedOperations(), equalTo(2)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(146L)); } translog.add(new Translog.Delete("test", "3", 2, newUid("3"))); @@ -370,6 +374,8 @@ public void testStats() throws IOException { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(195L)); + assertThat(stats.getUncommittedOperations(), equalTo(3)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(195L)); } translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); @@ -377,6 +383,8 @@ public void testStats() throws IOException { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); assertThat(stats.getTranslogSizeInBytes(), equalTo(237L)); + assertThat(stats.getUncommittedOperations(), equalTo(4)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(237L)); } final long expectedSizeInBytes = 280L; @@ -384,9 +392,9 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); - assertThat( - stats.getTranslogSizeInBytes(), - equalTo(expectedSizeInBytes)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getUncommittedOperations(), equalTo(4)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes)); } { @@ -403,15 +411,18 @@ public void testStats() throws IOException { builder.startObject(); copy.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - assertThat(builder.string(), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes + "}}")); + assertThat(builder.string(), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes + + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes + "}}")); } } markCurrentGenAsCommitted(translog); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(0L)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getUncommittedOperations(), equalTo(0)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); } } @@ -420,7 +431,8 @@ public void testTotalTests() { final int n = randomIntBetween(0, 16); final List statsList = new ArrayList<>(n); for (int i = 0; i < n; i++) { - final TranslogStats stats = new TranslogStats(randomIntBetween(1, 4096), randomIntBetween(1, 1 << 20)); + final TranslogStats stats = new TranslogStats(randomIntBetween(1, 4096), randomIntBetween(1, 1 << 20), + randomIntBetween(1, 1 << 20), randomIntBetween(1, 4096)); statsList.add(stats); total.add(stats); } @@ -431,16 +443,26 @@ public void testTotalTests() { assertThat( total.getTranslogSizeInBytes(), equalTo(statsList.stream().mapToLong(TranslogStats::getTranslogSizeInBytes).sum())); + assertThat( + total.getUncommittedOperations(), + equalTo(statsList.stream().mapToInt(TranslogStats::getUncommittedOperations).sum())); + assertThat( + total.getUncommittedSizeInBytes(), + equalTo(statsList.stream().mapToLong(TranslogStats::getUncommittedSizeInBytes).sum())); } public void testNegativeNumberOfOperations() { - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1, 1, 1)); assertThat(e, hasToString(containsString("numberOfOperations must be >= 0"))); + e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, -1, 1)); + assertThat(e, hasToString(containsString("uncommittedOperations must be >= 0"))); } public void testNegativeSizeInBytes() { - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1, 1, 1)); assertThat(e, hasToString(containsString("translogSizeInBytes must be >= 0"))); + e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, -1)); + assertThat(e, hasToString(containsString("uncommittedSizeInBytes must be >= 0"))); } public void testSnapshot() throws IOException { @@ -2347,7 +2369,7 @@ public void testRollGeneration() throws Exception { } commit(translog, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); - assertThat(translog.totalOperations(), equalTo(0)); + assertThat(translog.uncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml new file mode 100644 index 0000000000000..6925c3b0325fe --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -0,0 +1,62 @@ +--- +setup: + - do: + indices.create: + index: test + +--- +"Translog retention" + - skip: + version: " - 5.99.0" + reason: translog retention was added in 6.0.0 + - do: + indics.stats: + metric: [ translog ] + - set: { indices.test.primaries.translog.size_in_bytes: empty_size } + + - do: + index: + index: test + type: bar + id: 1 + body: { "foo": "bar" } + + - do: + indics.stats: + metric: [ translog ] + - gt: { indices.test.primaries.translog.size_in_bytes: $empty_size } + - match: { indices.test.primaries.translog.operations: 1 } + - gt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size } + - match: { indices.test.primaries.translog.uncommitted_operations: 1 } + + - do: + flush: + index: test + + - do: + indics.stats: + metric: [ translog ] + - gt: { indices.test.primaries.translog.size_in_bytes: $empty_size } + - match: { indices.test.primaries.translog.operations: 1 } + - match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size } + - match: { indices.test.primaries.translog.uncommitted_operations: 0 } + + - do: + indices.put_settings: + index: test + body: + index.translog.retention.size: -1 + index.translog.retention.age: -1 + + - do: + flush: + index: test + force: true # force flush as we don't have pending ops + + - do: + indics.stats: + metric: [ translog ] + - match: { indices.test.primaries.translog.size_in_bytes: $empty_size } + - match: { indices.test.primaries.translog.operations: 0 } + - match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size } + - match: { indices.test.primaries.translog.uncommitted_operations: 0 } From cb17e8340299951f251edf5e0a9ec6a4c53eea42 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 20:41:58 +0200 Subject: [PATCH 03/21] lint --- .../recovery/RemoteRecoveryTargetHandler.java | 14 +++++++------- .../RecoveryDuringReplicationTests.java | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 30129780de182..414cbd4ea49eb 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -157,13 +157,13 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR } transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK, - new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk, - totalTranslogOps, - /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can - * see how many translog ops we accumulate while copying files across the network. A future optimization - * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. - */ - throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk, + totalTranslogOps, + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 99879de5bac85..feaccd2f204ec 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -422,8 +422,9 @@ public static class BlockingTarget extends RecoveryTarget { EnumSet.of(RecoveryState.Stage.INDEX, RecoveryState.Stage.TRANSLOG, RecoveryState.Stage.FINALIZE); private final Logger logger; - public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard, - DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, Logger logger) { + public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, + IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, + Logger logger) { super(shard, sourceNode, listener, version -> {}); this.recoveryBlocked = recoveryBlocked; this.releaseRecovery = releaseRecovery; From e1a544b9810bb22cf60883af442f29112ac0c320 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 20:58:58 +0200 Subject: [PATCH 04/21] back to int --- .../java/org/elasticsearch/index/translog/TranslogStats.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java index d87a1d24bd3f3..4b7a092a5ec9e 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogStats.java @@ -71,7 +71,7 @@ public long getTranslogSizeInBytes() { return translogSizeInBytes; } - public long estimatedNumberOfOperations() { + public int estimatedNumberOfOperations() { return numberOfOperations; } From 00f8ae3b2631945f59f60d679d40738f19d2aa8a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 21:09:17 +0200 Subject: [PATCH 05/21] fix tests --- .../index/engine/InternalEngine.java | 2 +- .../index/translog/Translog.java | 2 +- .../index/engine/InternalEngineTests.java | 26 ++++++++++++------- .../index/shard/IndexShardIT.java | 23 ++++++++-------- .../index/translog/TranslogTests.java | 18 ++++++------- .../indices/recovery/RecoveryTests.java | 1 + 6 files changed, 41 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6d10a0290995a..3ef2d3dbfd7ec 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1215,7 +1215,7 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) { + if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 292ab155e1aaf..6c5421d4c909b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -527,7 +527,7 @@ public Location add(final Operation operation) throws IOException { * @return {@code true} if the translog should be flushed */ public boolean shouldFlush() { - final long size = this.sizeInBytes(); + final long size = this.uncommittedSizeInBytes(); return size > this.indexSettings.getFlushThresholdSize().getBytes(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index af18781dfa6e9..a243835171e6b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -126,7 +126,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.test.DummyShardLock; @@ -864,14 +863,14 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { @Override public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { - assertThat(getTranslog().totalOperations(), equalTo(docs)); + assertThat(getTranslog().uncommittedOperations(), equalTo(docs)); final CommitId commitId = super.flush(force, waitIfOngoing); flushed.set(true); return commitId; } }; - assertThat(recoveringEngine.getTranslog().totalOperations(), equalTo(docs)); + assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(flushed.get()); } finally { @@ -2489,10 +2488,19 @@ private static void assertVisibleCount(InternalEngine engine, int numDocs, boole } public void testTranslogCleanUpPostCommitCrash() throws Exception { + IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(), + defaultSettings.getScopedSettings()); + IndexMetaData.Builder builder = IndexMetaData.builder(indexSettings.getIndexMetaData()); + builder.settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + ); + indexSettings.updateIndexMetaData(builder.build()); + try (Store store = createStore()) { AtomicBoolean throwErrorOnCommit = new AtomicBoolean(); final Path translogPath = createTempDir(); - try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) { + try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null)) { @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { super.commitIndexWriter(writer, translog, syncId); @@ -2507,7 +2515,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush); assertThat(e.getCause().getMessage(), equalTo("power's out")); } - try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null))) { + try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null))) { engine.recoverFromTranslog(); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2907,7 +2915,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); assertEquals(1, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().totalOperations()); + assertEquals(0L, engine.getTranslog().uncommittedOperations()); } } @@ -3824,7 +3832,7 @@ public SequenceNumbersService seqNoService() { System.nanoTime(), reason)); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled)); + assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -3970,7 +3978,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations()); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); @@ -4001,7 +4009,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { try { recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); if (flushed) { - assertEquals(0, recoveringEngine.getTranslog().totalOperations()); + assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 174f68da4b75f..7c859c27d7864 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -353,29 +353,30 @@ public void testMaybeFlush() throws Exception { Engine.Index index = new Engine.Index(new Term("_id", doc.id()), doc); shard.index(index); assertTrue(shard.shouldFlush()); - assertEquals(2, shard.getEngine().getTranslog().totalOperations()); + final Translog translog = shard.getEngine().getTranslog(); + assertEquals(2, translog.uncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldFlush()); }); - assertEquals(0, shard.getEngine().getTranslog().totalOperations()); - shard.getEngine().getTranslog().sync(); - long size = shard.getEngine().getTranslog().sizeInBytes(); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), - shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); + assertEquals(0, translog.uncommittedOperations()); + translog.sync(); + long size = translog.uncommittedSizeInBytes(); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), + translog.uncommittedOperations(), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), - shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), + translog.uncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), - shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), + translog.uncommittedOperations(), translog.getGeneration()); assertFalse(shard.shouldFlush()); }); - assertEquals(0, shard.getEngine().getTranslog().totalOperations()); + assertEquals(0, translog.uncommittedOperations()); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 2f7b5385f947a..a6acd25b17caf 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -347,14 +347,14 @@ public void testStats() throws IOException { final long firstOperationPosition = translog.getFirstOperationPosition(); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(0L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(0)); } assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC))); translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(1L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); assertThat(stats.getTranslogSizeInBytes(), equalTo(97L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(97L)); @@ -363,7 +363,7 @@ public void testStats() throws IOException { translog.add(new Translog.Delete("test", "2", 1, newUid("2"))); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(2L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); assertThat(stats.getTranslogSizeInBytes(), equalTo(146L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(146L)); @@ -372,7 +372,7 @@ public void testStats() throws IOException { translog.add(new Translog.Delete("test", "3", 2, newUid("3"))); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(3L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); assertThat(stats.getTranslogSizeInBytes(), equalTo(195L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(195L)); @@ -381,7 +381,7 @@ public void testStats() throws IOException { translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); assertThat(stats.getTranslogSizeInBytes(), equalTo(237L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(237L)); @@ -391,7 +391,7 @@ public void testStats() throws IOException { translog.rollGeneration(); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); assertThat(stats.getUncommittedOperations(), equalTo(4)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes)); @@ -404,7 +404,7 @@ public void testStats() throws IOException { final TranslogStats copy = new TranslogStats(); copy.readFrom(out.bytes().streamInput()); - assertThat(copy.estimatedNumberOfOperations(), equalTo(4L)); + assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); assertThat(copy.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { @@ -419,7 +419,7 @@ public void testStats() throws IOException { markCurrentGenAsCommitted(translog); { final TranslogStats stats = stats(); - assertThat(stats.estimatedNumberOfOperations(), equalTo(4L)); + assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); @@ -439,7 +439,7 @@ public void testTotalTests() { assertThat( total.estimatedNumberOfOperations(), - equalTo(statsList.stream().mapToLong(TranslogStats::estimatedNumberOfOperations).sum())); + equalTo(statsList.stream().mapToInt(TranslogStats::estimatedNumberOfOperations).sum())); assertThat( total.getTranslogSizeInBytes(), equalTo(statsList.stream().mapToLong(TranslogStats::getTranslogSizeInBytes).sum())); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index d898d1a184df4..2a95bf33908d0 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -72,6 +72,7 @@ public void testRetentionPolicyChangeDuringRecovery() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), "100b") ); replica.indexSettings().updateIndexMetaData(builder.build()); + replica.onSettingsChanged(); releaseRecovery.countDown(); future.get(); // rolling/flushing is async From 889492a5f9d8cf04a20293ad8bf40a7d1be42006 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 22:16:32 +0200 Subject: [PATCH 06/21] fix testReuseInFileBasedPeerRecovery --- .../org/elasticsearch/gateway/RecoveryFromGatewayIT.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4210f9c32c17f..91f4b9d7eb564 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; @@ -432,6 +433,10 @@ public Settings onNodeStopped(String nodeName) throws Exception { } // prevent a sequence-number-based recovery from being possible + client(primaryNode).admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") + ).get(); client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get(); return super.onNodeStopped(nodeName); } From d2f70d4e55d65cc4d582e210a2f63c9defc8ce16 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 22:32:27 +0200 Subject: [PATCH 07/21] fix rest test --- .../resources/rest-api-spec/test/indices.stats/20_translog.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml index 6925c3b0325fe..ac90b825c40a3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -5,7 +5,7 @@ setup: index: test --- -"Translog retention" +"Translog retention": - skip: version: " - 5.99.0" reason: translog retention was added in 6.0.0 From 68322d505745066e1fa82a055d4deaaf81b5eb9b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 22:58:09 +0200 Subject: [PATCH 08/21] another fix to 20_translog.yml --- .../test/indices.stats/20_translog.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml index ac90b825c40a3..df0125187d645 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -10,8 +10,8 @@ setup: version: " - 5.99.0" reason: translog retention was added in 6.0.0 - do: - indics.stats: - metric: [ translog ] + indices.stats: + metric: [ translog ] - set: { indices.test.primaries.translog.size_in_bytes: empty_size } - do: @@ -22,19 +22,19 @@ setup: body: { "foo": "bar" } - do: - indics.stats: - metric: [ translog ] + indices.stats: + metric: [ translog ] - gt: { indices.test.primaries.translog.size_in_bytes: $empty_size } - match: { indices.test.primaries.translog.operations: 1 } - gt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size } - match: { indices.test.primaries.translog.uncommitted_operations: 1 } - do: - flush: + indices.flush: index: test - do: - indics.stats: + indices.stats: metric: [ translog ] - gt: { indices.test.primaries.translog.size_in_bytes: $empty_size } - match: { indices.test.primaries.translog.operations: 1 } @@ -49,12 +49,12 @@ setup: index.translog.retention.age: -1 - do: - flush: + indices.flush: index: test force: true # force flush as we don't have pending ops - do: - indics.stats: + indices.stats: metric: [ translog ] - match: { indices.test.primaries.translog.size_in_bytes: $empty_size } - match: { indices.test.primaries.translog.operations: 0 } From 8170c2a50a513ef5a7053b866ba390eddc8889ce Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 18 Jun 2017 23:52:26 +0200 Subject: [PATCH 09/21] fix testRecoveryAfterPrimaryPromotion --- .../index/replication/RecoveryDuringReplicationTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index feaccd2f204ec..f4e75eb37b8a8 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -183,6 +183,10 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { // index some more totalDocs += shards.indexDocs(randomIntBetween(0, 5)); + if (randomBoolean()) { + shards.flush(); + } + oldPrimary.close("demoted", false); oldPrimary.store().close(); @@ -194,9 +198,10 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); } else { assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs)); + assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs)); } + // roll back the extra ops in the replica shards.removeReplica(replica); replica.close("resync", false); replica.store().close(); From be590595a9e1d21e9bab021f8cba2ab8b0a66eac Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 19 Jun 2017 08:14:32 +0200 Subject: [PATCH 10/21] control retention in testStats --- .../java/org/elasticsearch/index/translog/TranslogTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index a6acd25b17caf..dc78854f27228 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -344,6 +344,9 @@ protected TranslogStats stats() throws IOException { } public void testStats() throws IOException { + // self control cleaning for test + translog.getDeletionPolicy().setRetentionSizeInBytes(1024 * 1024); + translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000); final long firstOperationPosition = translog.getFirstOperationPosition(); { final TranslogStats stats = stats(); From 286d4a3bb6243966fb1ff254ebfcb88b412b327f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 19 Jun 2017 16:25:48 +0200 Subject: [PATCH 11/21] add time based trimming --- .../common/settings/IndexScopedSettings.java | 3 +- .../org/elasticsearch/index/IndexService.java | 59 +++++++++++++++++++ .../elasticsearch/index/IndexSettings.java | 23 ++++++++ .../elasticsearch/index/engine/Engine.java | 7 +++ .../index/engine/InternalEngine.java | 18 ++++++ .../elasticsearch/index/shard/IndexShard.java | 10 ++++ .../index/translog/Translog.java | 4 ++ .../index/IndexServiceTests.java | 56 +++++++++++++++++- 8 files changed, 176 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index ae4cf6cd41a36..d2e0f9f484e0f 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.common.settings; -import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -27,6 +26,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergeSchedulerConfig; @@ -129,6 +129,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 523f322da0c1a..a66e33b929666 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -111,6 +111,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final List searchOperationListeners; private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; + private volatile AsyncTrimTranslogTask trimTranslogTask; private final ThreadPool threadPool; private final BigArrays bigArrays; private final ScriptService scriptService; @@ -177,6 +178,7 @@ public IndexService( this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); // kick off async ops for the first shard in this index this.refreshTask = new AsyncRefreshTask(this); + this.trimTranslogTask = new AsyncTrimTranslogTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -606,6 +608,9 @@ public synchronized void updateMetaData(final IndexMetaData metadata) { if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { rescheduleRefreshTasks(); } + if (trimTranslogTask.getInterval().equals(indexSettings.getTranslogRetentionCheckInterval()) == false) { + rescheduleTrimTranslogcheck(); + } final Translog.Durability durability = indexSettings.getTranslogDurability(); if (durability != oldTranslogDurability) { rescheduleFsyncTask(durability); @@ -629,7 +634,14 @@ private void rescheduleRefreshTasks() { } finally { refreshTask = new AsyncRefreshTask(this); } + } + private void rescheduleTrimTranslogcheck() { + try { + trimTranslogTask.close(); + } finally { + trimTranslogTask = new AsyncTrimTranslogTask(this); + } } public interface ShardStoreDeleter { @@ -693,6 +705,28 @@ private void maybeRefreshEngine() { } } + private void maybeTrimTranslog() { + for (IndexShard shard : this.shards.values()) { + switch (shard.state()) { + case CREATED: + case RECOVERING: + case CLOSED: + continue; + case POST_RECOVERY: + case STARTED: + case RELOCATED: + try { + shard.trimTranslog(); + } catch (IndexShardClosedException | AlreadyClosedException ex) { + // fine - continue; + } + continue; + default: + throw new IllegalStateException("unknown state: " + shard.state()); + } + } + } + abstract static class BaseAsyncTask implements Runnable, Closeable { protected final IndexService indexService; protected final ThreadPool threadPool; @@ -837,6 +871,28 @@ public String toString() { } } + final class AsyncTrimTranslogTask extends BaseAsyncTask { + + AsyncTrimTranslogTask(IndexService indexService) { + super(indexService, indexService.getIndexSettings().getTranslogRetentionCheckInterval()); + } + + @Override + protected void runInternal() { + indexService.maybeTrimTranslog(); + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + + @Override + public String toString() { + return "trim_translog"; + } + } + AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } @@ -845,4 +901,7 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } + AsyncTrimTranslogTask getTrimTranslogTask() { + return trimTranslogTask; + } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 537344ca653a4..a46c770cf3c3f 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -130,6 +130,14 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); + /** + * Most of the translog trimming is done on the flying during indexing. However, once indexing stops, we need to revisit old + * translog files and clean them up. This setting how often we check for those + **/ + public static final Setting INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = + Setting.timeSetting("index.translog.retention.check_interval", new TimeValue(10, TimeUnit.MINUTES), + new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); + /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -190,6 +198,7 @@ public final class IndexSettings { private volatile TimeValue translogRetentionAge; private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; + private volatile TimeValue translogRetentionCheckInterval; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; private final IndexSortConfig indexSortConfig; @@ -288,6 +297,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); + translogRetentionCheckInterval = scopedSettings.get(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -327,6 +337,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, this::setTranslogRetentionCheckInterval); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -348,6 +359,10 @@ private void setGenerationThresholdSize(final ByteSizeValue generationThresholdS this.generationThresholdSize = generationThresholdSize; } + private void setTranslogRetentionCheckInterval(TimeValue interval) { + this.translogRetentionCheckInterval = interval; + } + private void setGCDeletes(TimeValue timeValue) { this.gcDeletesInMillis = timeValue.getMillis(); } @@ -512,6 +527,14 @@ public TimeValue getRefreshInterval() { */ public TimeValue getTranslogRetentionAge() { return translogRetentionAge; } + /** + * Returns how often we need to check if we can clean up translog files (in order to clean up old files after indexing has stopped) + */ + public TimeValue getTranslogRetentionCheckInterval() { + return translogRetentionCheckInterval; + } + + /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 6e93d1feed5f8..6c25507a6b7d9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -803,6 +803,13 @@ public final boolean refreshNeeded() { */ public abstract CommitId flush() throws EngineException; + + /** + * checks and removes translog files that no longer need to be retained. See + * {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details + */ + public abstract void trimTranslog() throws EngineException; + /** * Force merges to 1 segment */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3ef2d3dbfd7ec..8d9ea96db7b8a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1317,6 +1317,24 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti return new CommitId(newCommitId); } + @Override + public void trimTranslog() throws EngineException { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + failOnTragicEvent(e); + throw e; + } catch (Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog", e); + } + } + private void pruneDeletedTombstones() { long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1d42c9025e90b..fc48195c2783f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -875,6 +875,16 @@ public Engine.CommitId flush(FlushRequest request) { return commitId; } + /** + * checks and removes translog files that no longer need to be retained. See + * {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details + */ + public void trimTranslog() { + verifyNotClosed(); + final Engine engine = getEngine(); + engine.trimTranslog(); + } + /** * Rolls the tranlog generation and cleans unneeded. * diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 6c5421d4c909b..995f326bbd16f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1576,6 +1576,10 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } + if (readers.isEmpty()) { + // nothing to clean + return; + } long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 6a6f726230882..870c04de1e1d8 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -21,7 +21,6 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; -import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; @@ -42,8 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.core.IsEqual.equalTo; /** Unit test(s) for IndexService */ public class IndexServiceTests extends ESSingleNodeTestCase { @@ -163,6 +161,36 @@ public void testRefreshTaskIsUpdated() throws IOException { assertTrue(refreshTask.isClosed()); } + public void testTrimTranslogTaskIsUpdated() throws IOException { + IndexService indexService = createIndex("test", Settings.EMPTY); + IndexService.AsyncTrimTranslogTask trimTask = indexService.getTrimTranslogTask(); + assertEquals(10, trimTask.getInterval().minutes()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); + + // now disable + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder() + .put(indexService.getMetaData().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), -1)).build(); + indexService.updateMetaData(metaData); + assertNotSame(trimTask, indexService.getTrimTranslogTask()); + assertTrue(trimTask.isClosed()); + assertFalse(trimTask.isScheduled()); + assertFalse(indexService.getTrimTranslogTask().mustReschedule()); + + // set it to 100m + metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder() + .put(indexService.getMetaData().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100m")).build(); + indexService.updateMetaData(metaData); + assertNotSame(trimTask, indexService.getTrimTranslogTask()); + assertTrue(trimTask.isClosed()); + + trimTask = indexService.getTrimTranslogTask(); + assertTrue(trimTask.mustReschedule()); + assertTrue(trimTask.isScheduled()); + assertEquals(100, trimTask.getInterval().minutes()); + } + public void testFsyncTaskIsRunning() throws IOException { IndexService indexService = createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build()); IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask(); @@ -263,6 +291,28 @@ public void testRescheduleAsyncFsync() throws Exception { assertNotNull(indexService.getFsyncTask()); } + public void testAsyncTranslogTrimActuallyWorks() throws Exception { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") // very often :) + .build(); + IndexService indexService = createIndex("test", settings); + ensureGreen("test"); + assertTrue(indexService.getRefreshTask().mustReschedule()); + client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); + client().admin().indices().prepareFlush("test").get(); + IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder() + .put(indexService.getMetaData().getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1)) + .build(); + indexService.updateMetaData(metaData); + + IndexShard shard = indexService.getShard(0); + assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0))); + } + + + public void testIllegalFsyncInterval() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable From be0192e68af775163388225c7855dc98983e667f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 20 Jun 2017 12:18:38 +0200 Subject: [PATCH 12/21] some docs and a migration note --- .../reference/index-modules/translog.asciidoc | 30 ++++++++++++------- .../migration/migrate_6_0/indices.asciidoc | 5 ++++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/docs/reference/index-modules/translog.asciidoc b/docs/reference/index-modules/translog.asciidoc index 9889d11206875..66919597d2c37 100644 --- a/docs/reference/index-modules/translog.asciidoc +++ b/docs/reference/index-modules/translog.asciidoc @@ -20,16 +20,6 @@ replaying its operations take a considerable amount of time during recovery. It is also exposed through an API, though its rarely needed to be performed manually. -[float] -=== Flush settings - -The following <> settings -control how often the in-memory buffer is flushed to disk: - -`index.translog.flush_threshold_size`:: - -Once the translog hits this size, a flush will happen. Defaults to `512mb`. - [float] === Translog settings @@ -72,6 +62,26 @@ update, or bulk request. This setting accepts the following parameters: automatic commit will be discarded. -- +`index.translog.flush_threshold_size`:: + +The translog stores all operations that are not yet safely persisted in Lucene (i.e., are +not part of a lucene commit point). Although these operations are available for reads, they will +need to be reindexed if the shard was to shutdown and has to be recovered. This settings controls +the maximum total size of these operations, to prevent recoveries from taking too long. Once the +maximum size has been reached a flush will happen, generating a new Lucene commit. Defaults to `512mb`. + +`index.translog.retention.size`:: + +The total size of translog files to keep. Keeping more translog files increases the chance of performing +an operation based sync when recovering replicas. If the translog files are not sufficient, replica recovery +will fall back to a file based sync. Defaults to `512mb` + + +`index.translog.retention.age`:: + +The maximum duration for which translog files will be kept. Defaults to `12h`. + + [float] [[corrupt-translog-truncation]] === What to do if the translog becomes corrupted? diff --git a/docs/reference/migration/migrate_6_0/indices.asciidoc b/docs/reference/migration/migrate_6_0/indices.asciidoc index 922ff5b17d48c..b52a403ca2a60 100644 --- a/docs/reference/migration/migrate_6_0/indices.asciidoc +++ b/docs/reference/migration/migrate_6_0/indices.asciidoc @@ -68,3 +68,8 @@ matching indices). Omitting the `+` has the same effect as specifying it, hence support for `+` has been removed in index expressions. +==== Translog retention + +Translog files are now kept for up to 12 hours (by default), with a maximum size of `512mb` (default), and +are no longer deleted on `flush`. This is to increase the chance of doing an operation based recovery when +bringing up replicas up to speed. From 36e0a4eada1e04cd95bc57c262d3001288c0d48b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 14:32:08 +0200 Subject: [PATCH 13/21] Revert "add time based trimming" This reverts commit 286d4a3bb6243966fb1ff254ebfcb88b412b327f. --- .../common/settings/IndexScopedSettings.java | 3 +- .../org/elasticsearch/index/IndexService.java | 59 ------------------- .../elasticsearch/index/IndexSettings.java | 23 -------- .../elasticsearch/index/engine/Engine.java | 7 --- .../index/engine/InternalEngine.java | 18 ------ .../elasticsearch/index/shard/IndexShard.java | 10 ---- .../index/translog/Translog.java | 4 -- .../index/IndexServiceTests.java | 56 +----------------- 8 files changed, 4 insertions(+), 176 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index d633b075b919f..890a43107c53a 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.common.settings; +import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -26,7 +27,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.IndexSortConfig; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergeSchedulerConfig; @@ -130,7 +130,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, - IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY, FieldMapper.IGNORE_MALFORMED_SETTING, FieldMapper.COERCE_SETTING, diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index a66e33b929666..523f322da0c1a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -111,7 +111,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final List searchOperationListeners; private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; - private volatile AsyncTrimTranslogTask trimTranslogTask; private final ThreadPool threadPool; private final BigArrays bigArrays; private final ScriptService scriptService; @@ -178,7 +177,6 @@ public IndexService( this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); // kick off async ops for the first shard in this index this.refreshTask = new AsyncRefreshTask(this); - this.trimTranslogTask = new AsyncTrimTranslogTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -608,9 +606,6 @@ public synchronized void updateMetaData(final IndexMetaData metadata) { if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) { rescheduleRefreshTasks(); } - if (trimTranslogTask.getInterval().equals(indexSettings.getTranslogRetentionCheckInterval()) == false) { - rescheduleTrimTranslogcheck(); - } final Translog.Durability durability = indexSettings.getTranslogDurability(); if (durability != oldTranslogDurability) { rescheduleFsyncTask(durability); @@ -634,14 +629,7 @@ private void rescheduleRefreshTasks() { } finally { refreshTask = new AsyncRefreshTask(this); } - } - private void rescheduleTrimTranslogcheck() { - try { - trimTranslogTask.close(); - } finally { - trimTranslogTask = new AsyncTrimTranslogTask(this); - } } public interface ShardStoreDeleter { @@ -705,28 +693,6 @@ private void maybeRefreshEngine() { } } - private void maybeTrimTranslog() { - for (IndexShard shard : this.shards.values()) { - switch (shard.state()) { - case CREATED: - case RECOVERING: - case CLOSED: - continue; - case POST_RECOVERY: - case STARTED: - case RELOCATED: - try { - shard.trimTranslog(); - } catch (IndexShardClosedException | AlreadyClosedException ex) { - // fine - continue; - } - continue; - default: - throw new IllegalStateException("unknown state: " + shard.state()); - } - } - } - abstract static class BaseAsyncTask implements Runnable, Closeable { protected final IndexService indexService; protected final ThreadPool threadPool; @@ -871,28 +837,6 @@ public String toString() { } } - final class AsyncTrimTranslogTask extends BaseAsyncTask { - - AsyncTrimTranslogTask(IndexService indexService) { - super(indexService, indexService.getIndexSettings().getTranslogRetentionCheckInterval()); - } - - @Override - protected void runInternal() { - indexService.maybeTrimTranslog(); - } - - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } - - @Override - public String toString() { - return "trim_translog"; - } - } - AsyncRefreshTask getRefreshTask() { // for tests return refreshTask; } @@ -901,7 +845,4 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } - AsyncTrimTranslogTask getTrimTranslogTask() { - return trimTranslogTask; - } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index a46c770cf3c3f..537344ca653a4 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -130,14 +130,6 @@ public final class IndexSettings { Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic, Property.IndexScope); - /** - * Most of the translog trimming is done on the flying during indexing. However, once indexing stops, we need to revisit old - * translog files and clean them up. This setting how often we check for those - **/ - public static final Setting INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = - Setting.timeSetting("index.translog.retention.check_interval", new TimeValue(10, TimeUnit.MINUTES), - new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope); - /** * The maximum size of a translog generation. This is independent of the maximum size of * translog operations that have not been flushed. @@ -198,7 +190,6 @@ public final class IndexSettings { private volatile TimeValue translogRetentionAge; private volatile ByteSizeValue translogRetentionSize; private volatile ByteSizeValue generationThresholdSize; - private volatile TimeValue translogRetentionCheckInterval; private final MergeSchedulerConfig mergeSchedulerConfig; private final MergePolicyConfig mergePolicyConfig; private final IndexSortConfig indexSortConfig; @@ -297,7 +288,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING); translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING); - translogRetentionCheckInterval = scopedSettings.get(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); @@ -337,7 +327,6 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); - scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, this::setTranslogRetentionCheckInterval); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); @@ -359,10 +348,6 @@ private void setGenerationThresholdSize(final ByteSizeValue generationThresholdS this.generationThresholdSize = generationThresholdSize; } - private void setTranslogRetentionCheckInterval(TimeValue interval) { - this.translogRetentionCheckInterval = interval; - } - private void setGCDeletes(TimeValue timeValue) { this.gcDeletesInMillis = timeValue.getMillis(); } @@ -527,14 +512,6 @@ public TimeValue getRefreshInterval() { */ public TimeValue getTranslogRetentionAge() { return translogRetentionAge; } - /** - * Returns how often we need to check if we can clean up translog files (in order to clean up old files after indexing has stopped) - */ - public TimeValue getTranslogRetentionCheckInterval() { - return translogRetentionCheckInterval; - } - - /** * Returns the generation threshold size. As sequence numbers can cause multiple generations to * be preserved for rollback purposes, we want to keep the size of individual generations from diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 6c25507a6b7d9..6e93d1feed5f8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -803,13 +803,6 @@ public final boolean refreshNeeded() { */ public abstract CommitId flush() throws EngineException; - - /** - * checks and removes translog files that no longer need to be retained. See - * {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details - */ - public abstract void trimTranslog() throws EngineException; - /** * Force merges to 1 segment */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8d9ea96db7b8a..3ef2d3dbfd7ec 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1317,24 +1317,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti return new CommitId(newCommitId); } - @Override - public void trimTranslog() throws EngineException { - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); - translog.trimUnreferencedReaders(); - } catch (AlreadyClosedException e) { - failOnTragicEvent(e); - throw e; - } catch (Exception e) { - try { - failEngine("translog trimming failed", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new EngineException(shardId, "failed to trim translog", e); - } - } - private void pruneDeletedTombstones() { long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3ab1281cb03c8..1beff0c9d4239 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -884,16 +884,6 @@ public Engine.CommitId flush(FlushRequest request) { return commitId; } - /** - * checks and removes translog files that no longer need to be retained. See - * {@link org.elasticsearch.index.translog.TranslogDeletionPolicy} for details - */ - public void trimTranslog() { - verifyNotClosed(); - final Engine engine = getEngine(); - engine.trimTranslog(); - } - /** * Rolls the tranlog generation and cleans unneeded. * diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 995f326bbd16f..6c5421d4c909b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1576,10 +1576,6 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } - if (readers.isEmpty()) { - // nothing to clean - return; - } long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" diff --git a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 870c04de1e1d8..6a6f726230882 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; @@ -41,7 +42,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.core.IsEqual.equalTo; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; /** Unit test(s) for IndexService */ public class IndexServiceTests extends ESSingleNodeTestCase { @@ -161,36 +163,6 @@ public void testRefreshTaskIsUpdated() throws IOException { assertTrue(refreshTask.isClosed()); } - public void testTrimTranslogTaskIsUpdated() throws IOException { - IndexService indexService = createIndex("test", Settings.EMPTY); - IndexService.AsyncTrimTranslogTask trimTask = indexService.getTrimTranslogTask(); - assertEquals(10, trimTask.getInterval().minutes()); - assertTrue(indexService.getTrimTranslogTask().mustReschedule()); - - // now disable - IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder() - .put(indexService.getMetaData().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), -1)).build(); - indexService.updateMetaData(metaData); - assertNotSame(trimTask, indexService.getTrimTranslogTask()); - assertTrue(trimTask.isClosed()); - assertFalse(trimTask.isScheduled()); - assertFalse(indexService.getTrimTranslogTask().mustReschedule()); - - // set it to 100m - metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder() - .put(indexService.getMetaData().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100m")).build(); - indexService.updateMetaData(metaData); - assertNotSame(trimTask, indexService.getTrimTranslogTask()); - assertTrue(trimTask.isClosed()); - - trimTask = indexService.getTrimTranslogTask(); - assertTrue(trimTask.mustReschedule()); - assertTrue(trimTask.isScheduled()); - assertEquals(100, trimTask.getInterval().minutes()); - } - public void testFsyncTaskIsRunning() throws IOException { IndexService indexService = createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build()); IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask(); @@ -291,28 +263,6 @@ public void testRescheduleAsyncFsync() throws Exception { assertNotNull(indexService.getFsyncTask()); } - public void testAsyncTranslogTrimActuallyWorks() throws Exception { - Settings settings = Settings.builder() - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") // very often :) - .build(); - IndexService indexService = createIndex("test", settings); - ensureGreen("test"); - assertTrue(indexService.getRefreshTask().mustReschedule()); - client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); - client().admin().indices().prepareFlush("test").get(); - IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder() - .put(indexService.getMetaData().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), -1) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), -1)) - .build(); - indexService.updateMetaData(metaData); - - IndexShard shard = indexService.getShard(0); - assertBusy(() -> assertThat(shard.getTranslog().totalOperations(), equalTo(0))); - } - - - public void testIllegalFsyncInterval() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable From df0fb638e5407bafbd64e3a4d22c8dbd5c6812be Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 14:43:55 +0200 Subject: [PATCH 14/21] inline rolling and trimming into engine --- .../elasticsearch/index/engine/Engine.java | 8 ++++++++ .../index/engine/InternalEngine.java | 19 +++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 10 +++------- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 6e93d1feed5f8..5713b6d03b984 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -803,6 +803,14 @@ public final boolean refreshNeeded() { */ public abstract CommitId flush() throws EngineException; + /** + * Rolls the tranlog generation and cleans unneeded. + * + * @throws IOException if any file operations on the translog throw an I/O exception + */ + public abstract void rollTranslogGeneration() throws EngineException; + + /** * Force merges to 1 segment */ diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3ef2d3dbfd7ec..fd1d8f3843358 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1317,6 +1317,25 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti return new CommitId(newCommitId); } + @Override + public void rollTranslogGeneration() throws EngineException { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + translog.rollGeneration(); + translog.trimUnreferencedReaders(); + } catch (AlreadyClosedException e) { + failOnTragicEvent(e); + throw e; + } catch (Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to roll translog", e); + } + } + private void pruneDeletedTombstones() { long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1beff0c9d4239..c1a526487c665 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -886,14 +886,10 @@ public Engine.CommitId flush(FlushRequest request) { /** * Rolls the tranlog generation and cleans unneeded. - * - * @throws IOException if any file operations on the translog throw an I/O exception */ - private void rollTranslogAndTrimGeneration() throws IOException { + private void rollTranslogGeneration() { final Engine engine = getEngine(); - final Translog translog = engine.getTranslog(); - translog.rollGeneration(); - translog.trimUnreferencedReaders(); + engine.rollTranslogGeneration(); } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { @@ -2109,7 +2105,7 @@ public void onFailure(final Exception e) { @Override protected void doRun() throws Exception { - rollTranslogAndTrimGeneration(); + rollTranslogGeneration(); } @Override From 1042768dd7ca19259db2e595b9c0d8f0fdf6eb4b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 14:47:14 +0200 Subject: [PATCH 15/21] feedback --- .../main/java/org/elasticsearch/index/translog/Translog.java | 4 +++- .../elasticsearch/index/translog/TranslogDeletionPolicy.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 6c5421d4c909b..cc10b73fc9ba8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -595,7 +595,9 @@ public Snapshot newSnapshot(long minGeneration) { } private Stream readersAboveMinSeqNo(long minSeqNo) { - assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : + "callers of readersAboveMinSeqNo must hold a lock: readLock [" + + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]"; return Stream.concat(readers.stream(), Stream.of(current)) .filter(reader -> { final long maxSeqNo = reader.getCheckpoint().maxSeqNo; diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index d222bf6f6bd9c..5c1d0ac40bbb9 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -155,6 +155,6 @@ public synchronized long getMinTranslogGenerationForRecovery() { } synchronized long getViewCount(long viewGen) { - return translogRefCounts.getOrDefault(viewGen, Counter.newCounter(false)).get(); + return translogRefCounts.computeIfAbsent(viewGen, ignored -> Counter.newCounter(false)).get(); } } From f77ac5b16a263177c5828e4cc66ac4143917224c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 14:55:42 +0200 Subject: [PATCH 16/21] javadocs --- core/src/main/java/org/elasticsearch/index/engine/Engine.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 5713b6d03b984..d30f9629dc2b3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -804,9 +804,7 @@ public final boolean refreshNeeded() { public abstract CommitId flush() throws EngineException; /** - * Rolls the tranlog generation and cleans unneeded. - * - * @throws IOException if any file operations on the translog throw an I/O exception + * Rolls the translog generation and cleans unneeded. */ public abstract void rollTranslogGeneration() throws EngineException; From 9c567297a30cd841e8159ca8de07add9a14b0126 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 15:15:15 +0200 Subject: [PATCH 17/21] roll back computeIfAbsent --- .../elasticsearch/index/translog/TranslogDeletionPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 5c1d0ac40bbb9..d222bf6f6bd9c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -155,6 +155,6 @@ public synchronized long getMinTranslogGenerationForRecovery() { } synchronized long getViewCount(long viewGen) { - return translogRefCounts.computeIfAbsent(viewGen, ignored -> Counter.newCounter(false)).get(); + return translogRefCounts.getOrDefault(viewGen, Counter.newCounter(false)).get(); } } From 375376ba058d83380c1f0aed05e4727341e6fffc Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 15:18:20 +0200 Subject: [PATCH 18/21] a different approach --- .../elasticsearch/index/translog/TranslogDeletionPolicy.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index d222bf6f6bd9c..e1b1147b8cfb6 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -155,6 +155,7 @@ public synchronized long getMinTranslogGenerationForRecovery() { } synchronized long getViewCount(long viewGen) { - return translogRefCounts.getOrDefault(viewGen, Counter.newCounter(false)).get(); + final Counter counter = translogRefCounts.get(viewGen); + return counter == null ? 0 : counter.get(); } } From 18b7feef8b8d6cf41a7c252699907b7e2594f996 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 21 Jun 2017 15:23:15 +0200 Subject: [PATCH 19/21] jason's feedback --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- .../main/java/org/elasticsearch/index/translog/Translog.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fd1d8f3843358..a8f0759c1bbf9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1319,7 +1319,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti @Override public void rollTranslogGeneration() throws EngineException { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); translog.rollGeneration(); translog.trimUnreferencedReaders(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index cc10b73fc9ba8..89338934ec88a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -601,8 +601,7 @@ private Stream readersAboveMinSeqNo(long minSeqNo) return Stream.concat(readers.stream(), Stream.of(current)) .filter(reader -> { final long maxSeqNo = reader.getCheckpoint().maxSeqNo; - return maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || - maxSeqNo >= minSeqNo; + return maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo; }); } From d9ca199136d914375227e852d50b84a8f98ee6eb Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 22 Jun 2017 14:09:21 +0200 Subject: [PATCH 20/21] fix compilation --- .../org/elasticsearch/index/shard/PrimaryReplicaSyncer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 9ad9b82e25767..4641675afef0c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -80,7 +80,8 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests public void resync(IndexShard indexShard, ActionListener listener) throws IOException { try (Translog.View view = indexShard.acquireTranslogView()) { - Translog.Snapshot snapshot = view.snapshot(); + final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; + Translog.Snapshot snapshot = view.snapshot(startingSeqNo); ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. @@ -104,7 +105,7 @@ public synchronized Translog.Operation next() throws IOException { }; resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot, - indexShard.getGlobalCheckpoint() + 1, listener); + startingSeqNo, listener); } } From a924d30cc33b57d7538000711d56ca5eb616f9f7 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 22 Jun 2017 15:25:38 +0200 Subject: [PATCH 21/21] fix testSyncerSendsOffCorrectDocuments --- .../elasticsearch/index/shard/PrimaryReplicaSyncerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index a4a38beb6e26a..d19a51e6271db 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -75,7 +75,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { if (syncNeeded) { assertTrue("Sync action was not called", syncActionCalled.get()); } - assertEquals(numDocs, fut.get().getTotalOperations()); + assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, fut.get().getTotalOperations()); if (syncNeeded) { long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included assertEquals(skippedOps, fut.get().getSkippedOperations());