From 8f495be50bb23cf346b64ead49bba0423df390b0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 26 Jul 2017 10:35:01 +0200 Subject: [PATCH 01/10] wip --- .../elasticsearch/action/ActionListener.java | 29 +++++++- .../index/engine/InternalEngine.java | 5 +- .../index/shard/PrimaryReplicaSyncer.java | 15 +++- .../index/translog/BaseTranslogReader.java | 2 +- .../index/translog/MultiSnapshot.java | 17 ++++- .../index/translog/Translog.java | 73 +++++++++++++------ .../translog/TranslogDeletionPolicy.java | 19 +++-- .../index/translog/TranslogSnapshot.java | 3 +- .../index/translog/TranslogWriter.java | 2 +- .../recovery/RecoverySourceHandler.java | 15 ++-- .../snapshots/SnapshotsService.java | 2 +- .../IndexLevelReplicationTests.java | 5 +- .../index/shard/IndexShardTests.java | 5 ++ .../translog/TranslogDeletionPolicyTests.java | 34 +++++---- .../index/translog/TranslogTests.java | 27 +++---- .../recovery/RecoverySourceHandlerTests.java | 5 ++ 16 files changed, 174 insertions(+), 84 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index e0d91a9036437..e4fe30a636eec 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** @@ -70,6 +69,34 @@ public void onFailure(Exception e) { }; } + static ActionListener chain(ActionListener first, ActionListener second) { + return new ActionListener() { + @Override + public void onResponse(Response response) { + try { + first.onResponse(response); + } catch (Exception e) { + onFailure(e); + } + try { + second.onResponse(response); + } catch (Exception e) { + second.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + first.onFailure(e); + } catch (Exception suppressed) { + e.addSuppressed(suppressed); + } + second.onFailure(e); + } + }; + } + /** * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bd11afdced962..2edfa3c1299fd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -238,8 +238,7 @@ public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); final long localCheckpoint = seqNoService().getLocalCheckpoint(); - try (Translog.View view = getTranslog().newView()) { - final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1); + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() > localCheckpoint) { @@ -327,7 +326,7 @@ private void recoverFromTranslogInternal() throws IOException { final int opsRecovered; try { final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - Translog.Snapshot snapshot = translog.newSnapshot(translogGen); + Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen); opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index bec263f54702f..93c28229de9f1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -49,6 +49,8 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.requireNonNull; +import static org.elasticsearch.action.ActionListener.chain; +import static org.elasticsearch.action.ActionListener.wrap; public class PrimaryReplicaSyncer extends AbstractComponent { @@ -79,9 +81,10 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests } public void resync(IndexShard indexShard, ActionListener listener) throws IOException { - try (Translog.View view = indexShard.acquireTranslogView()) { + try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; - Translog.Snapshot snapshot = view.snapshot(startingSeqNo); + Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); + listener = chain(wrap(r -> snapshot.close(), e -> snapshot.close()), listener); ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. @@ -89,6 +92,11 @@ public void resync(IndexShard indexShard, ActionListener listener) t // Also fail the resync early if the shard is shutting down Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { + @Override + public void close() { + snapshot.close(); + } + @Override public synchronized int totalOperations() { return snapshot.totalOperations(); @@ -103,9 +111,10 @@ public synchronized Translog.Operation next() throws IOException { return snapshot.next(); } }; - resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, startingSeqNo, listener); + } catch (Exception e) { + listener.onFailure(e); } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 7f8b7f3fb2c76..d86c4491b63e9 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -77,7 +77,7 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO return size; } - public Translog.Snapshot newSnapshot() { + public TranslogSnapshot newSnapshot() { return new TranslogSnapshot(this, sizeInBytes()); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 7b1a05e1ac1e2..135ae90b08700 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.translog; +import org.elasticsearch.common.lease.Releasable; + import java.io.IOException; import java.util.Arrays; @@ -27,16 +29,18 @@ */ final class MultiSnapshot implements Translog.Snapshot { - private final Translog.Snapshot[] translogs; + private final TranslogSnapshot[] translogs; private final int totalOperations; + private final Releasable onClose; private int index; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ - MultiSnapshot(Translog.Snapshot[] translogs) { + MultiSnapshot(TranslogSnapshot[] translogs, Releasable onClose) { this.translogs = translogs; - totalOperations = Arrays.stream(translogs).mapToInt(Translog.Snapshot::totalOperations).sum(); + totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); + this.onClose = onClose; index = 0; } @@ -49,7 +53,7 @@ public int totalOperations() { @Override public Translog.Operation next() throws IOException { for (; index < translogs.length; index++) { - final Translog.Snapshot current = translogs[index]; + final TranslogSnapshot current = translogs[index]; Translog.Operation op = current.next(); if (op != null) { // if we are null we move to the next snapshot return op; @@ -57,4 +61,9 @@ public Translog.Operation next() throws IOException { } return null; } + + @Override + public void close() { + onClose.close(); + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 3109af8453bea..97097ceba428e 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -56,6 +56,7 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -576,21 +577,51 @@ public long getLastSyncedGlobalCheckpoint() { */ public Snapshot newSnapshot() { try (ReleasableLock ignored = readLock.acquire()) { - return newSnapshot(getMinFileGeneration()); + return newSnapshotFromGen(getMinFileGeneration()); } } - public Snapshot newSnapshot(long minGeneration) { + public Snapshot newSnapshotFromGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (minGeneration < getMinFileGeneration()) { throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " + "Min referenced generation is [" + getMinFileGeneration() + "]"); } - Snapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) + TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) .filter(reader -> reader.getGeneration() >= minGeneration) - .map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new); - return new MultiSnapshot(snapshots); + .map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); + return newMultiSnapshot(snapshots); + } + } + + public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot) + .toArray(TranslogSnapshot[]::new); + return newMultiSnapshot(snapshots); + } + } + + private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) { + final Releasable onClose; + if (snapshots.length == 0) { + onClose = () -> {}; + } else { + assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get() + == snapshots[0].generation : "first reader generation of " + snapshots[0].generation + " is not the smallest"; + onClose = deletionPolicy.acquireTranslogGen(snapshots[0].generation); + } + boolean success = false; + try { + Snapshot result = new MultiSnapshot(snapshots, onClose); + success = true; + return result; + } finally { + if (success == false) { + onClose.close(); + } } } @@ -605,14 +636,6 @@ private Stream readersAboveMinSeqNo(long minSeqNo) }); } - private Snapshot createSnapshotFromMinSeqNo(long minSeqNo) { - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - Snapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new); - return new MultiSnapshot(snapshots); - } - } - /** * Returns a view into the current translog that is guaranteed to retain all current operations * while receiving future ones as well @@ -621,11 +644,11 @@ public Translog.View newView() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); final long viewGen = getMinFileGeneration(); - deletionPolicy.acquireTranslogGenForView(viewGen); + Releasable onClose = deletionPolicy.acquireTranslogGen(viewGen); try { - return new View(viewGen); + return new View(viewGen, onClose); } catch (Exception e) { - deletionPolicy.releaseTranslogGenView(viewGen); + onClose.close(); throw e; } } @@ -751,11 +774,13 @@ public TranslogDeletionPolicy getDeletionPolicy() { */ public class View implements Closeable { - AtomicBoolean closed = new AtomicBoolean(); - final long viewGenToRelease; + final AtomicBoolean closed = new AtomicBoolean(); + final long baseTranslogGen; + private final Releasable onClose; - View(long viewGenToRelease) { - this.viewGenToRelease = viewGenToRelease; + View(long baseTranslogGen, Releasable onClose) { + this.baseTranslogGen = baseTranslogGen; + this.onClose = onClose; } /** @@ -781,7 +806,7 @@ public long estimateSizeInBytes(long minSequenceNumber) { * operations from the given sequence number and up (with potentially some more) */ public Snapshot snapshot(long minSequenceNumber) { ensureOpen(); - return Translog.this.createSnapshotFromMinSeqNo(minSequenceNumber); + return Translog.this.newSnapshotFromMinSeqNo(minSequenceNumber); } void ensureOpen() { @@ -793,8 +818,8 @@ void ensureOpen() { @Override public void close() throws IOException { if (closed.getAndSet(true) == false) { - logger.trace("closing view starting at translog [{}]", viewGenToRelease); - deletionPolicy.releaseTranslogGenView(viewGenToRelease); + logger.trace("closing view starting at translog [{}]", baseTranslogGen); + onClose.close(); trimUnreferencedReaders(); closeFilesIfNoPendingViews(); } @@ -859,7 +884,7 @@ public int hashCode() { /** * A snapshot of the transaction log, allows to iterate over all the transaction log operations. */ - public interface Snapshot { + public interface Snapshot extends Releasable { /** * The total number of operations in the translog. diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index e1b1147b8cfb6..48df0b6e52098 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -20,11 +20,13 @@ package org.elasticsearch.index.translog; import org.apache.lucene.util.Counter; +import org.elasticsearch.common.lease.Releasable; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; public class TranslogDeletionPolicy { @@ -66,11 +68,18 @@ public synchronized void setRetentionAgeInMillis(long ageInMillis) { } /** - * acquires the basis generation for a new view. Any translog generation above, and including, the returned generation - * will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called. + * acquires the basis generation for a new view or snapshot. Any translog generation above, and including, the returned generation + * will not be deleted until the returned {@link Releasable} is closed. */ - synchronized void acquireTranslogGenForView(final long genForView) { + synchronized Releasable acquireTranslogGen(final long genForView) { translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1); + final AtomicBoolean closed = new AtomicBoolean(); + return () -> { + if (closed.compareAndSet(false, true)) { + // TODO add assertions that this is called + releaseTranslogGenView(genForView); + } + }; } /** returns the number of generations that were acquired for views */ @@ -79,9 +88,9 @@ synchronized int pendingViewsCount() { } /** - * releases a generation that was acquired by {@link #acquireTranslogGenForView(long)} + * releases a generation that was acquired by {@link #acquireTranslogGen(long)} */ - synchronized void releaseTranslogGenView(long translogGen) { + private synchronized void releaseTranslogGenView(long translogGen) { Counter current = translogRefCounts.get(translogGen); if (current == null || current.get() <= 0) { throw new IllegalArgumentException("translog gen [" + translogGen + "] wasn't acquired"); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 312b7fc9db01f..42b8d239a3a60 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; -final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot { +final class TranslogSnapshot extends BaseTranslogReader { private final int totalOperations; private final Checkpoint checkpoint; @@ -59,7 +59,6 @@ Checkpoint getCheckpoint() { return checkpoint; } - @Override public Translog.Operation next() throws IOException { if (readOperations < totalOperations) { return readOperation(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 928511170938b..9c95471e60e82 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -299,7 +299,7 @@ public TranslogReader closeIntoReader() throws IOException { @Override - public Translog.Snapshot newSnapshot() { + public TranslogSnapshot newSnapshot() { // make sure to acquire the sync lock first, to prevent dead locks with threads calling // syncUpTo() , where the sync lock is acquired first, following by the synchronize(this) synchronized (syncLock) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index fd2c9d4d306ae..0ea8425165c1a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -184,8 +184,8 @@ public RecoveryResponse recoverToTarget() throws IOException { logger.trace("snapshot translog for recovery; current size is [{}]", translogView.estimateTotalOperations(startingSeqNo)); final long targetLocalCheckpoint; - try { - targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot(startingSeqNo)); + try(Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo)) { + targetLocalCheckpoint = phase2(startingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -234,11 +234,12 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); - final Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo); - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsCompleted(operation.seqNo()); + try(Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo)) { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + tracker.markSeqNoAsCompleted(operation.seqNo()); + } } } return tracker.getCheckpoint() >= endingSeqNo; diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index c8dfb7732815a..21e0f2c220e50 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -249,7 +249,7 @@ public ClusterState execute(ClusterState currentState) { } SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots == null || snapshots.entries().isEmpty()) { - // Store newSnapshot here to be processed in clusterStateProcessed + // Store newSnapshotFromGen here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); List snapshotIndices = repositoryData.resolveNewIndices(indices); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 878f4bfce903e..a16e09e110b82 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -326,9 +326,8 @@ private static void assertNoOpTranslogOperationForDocumentFailure( long expectedPrimaryTerm, String failureMessage) throws IOException { for (IndexShard indexShard : replicationGroup) { - try(Translog.View view = indexShard.acquireTranslogView()) { - assertThat(view.estimateTotalOperations(SequenceNumbersService.NO_OPS_PERFORMED), equalTo(expectedOperation)); - final Translog.Snapshot snapshot = view.snapshot(SequenceNumbersService.NO_OPS_PERFORMED); + try(Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(expectedOperation)); long expectedSeqNo = 0L; Translog.Operation op = snapshot.next(); do { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 924dd3c8b97f8..4683a876b6dc1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1882,6 +1882,11 @@ public void testRecoverFromTranslog() throws IOException { Iterator iterator = operations.iterator(); Translog.Snapshot snapshot = new Translog.Snapshot() { + @Override + public void close() { + + } + @Override public int totalOperations() { return numTotalEntries; diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 05e05e0557278..b44a174ceaebf 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -136,22 +137,23 @@ public void testRetentionHierarchy() throws IOException { assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); long viewGen = randomFrom(allGens).generation; - deletionPolicy.acquireTranslogGenForView(viewGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, - Math.min( - Math.min(committedGen, viewGen), - Math.max(selectedGenerationByAge, selectedGenerationBySize))); - // disable age - deletionPolicy.setRetentionAgeInMillis(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); - // disable size - deletionPolicy.setRetentionAgeInMillis(maxAge); - deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); - // disable both - deletionPolicy.setRetentionAgeInMillis(-1); - deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) { + assertMinGenRequired(deletionPolicy, readersAndWriter, + Math.min( + Math.min(committedGen, viewGen), + Math.max(selectedGenerationByAge, selectedGenerationBySize))); + // disable age + deletionPolicy.setRetentionAgeInMillis(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationBySize)); + // disable size + deletionPolicy.setRetentionAgeInMillis(maxAge); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(Math.min(committedGen, viewGen), selectedGenerationByAge)); + // disable both + deletionPolicy.setRetentionAgeInMillis(-1); + deletionPolicy.setRetentionSizeInBytes(-1); + assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + } } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 86c23268c926c..d9c87a63885cf 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -324,7 +324,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); markCurrentGenAsCommitted(translog); - snapshot = translog.newSnapshot(firstId + 1); + snapshot = translog.newSnapshotFromGen(firstId + 1); assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); } @@ -855,7 +855,7 @@ void newView() throws IOException { // captures the last committed checkpoint, while holding the view, simulating // recovery logic which captures a view and gets a lucene commit committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get(); - logger.info("--> [{}] opened view from [{}]", threadId, view.viewGenToRelease); + logger.info("--> [{}] opened view from [{}]", threadId, view.baseTranslogGen); } @Override @@ -871,10 +871,11 @@ protected void doRun() throws Exception { // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); - Translog.Snapshot snapshot = view.snapshot(committedLocalCheckpointAtView + 1L); - Translog.Operation op; - while ((op = snapshot.next()) != null) { - expectedOps.remove(op); + try (Translog.Snapshot snapshot = view.snapshot(committedLocalCheckpointAtView + 1L)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + expectedOps.remove(op); + } } if (expectedOps.isEmpty() == false) { StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()) @@ -1049,7 +1050,7 @@ public void testBasicCheckpoint() throws IOException { final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)); try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { assertEquals(lastSynced + 1, reader.totalOperations()); - Translog.Snapshot snapshot = reader.newSnapshot(); + TranslogSnapshot snapshot = reader.newSnapshot(); for (int op = 0; op < translogOperations; op++) { if (op <= lastSynced) { @@ -1191,7 +1192,7 @@ public void testBasicRecovery() throws IOException { translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(translogGeneration.translogFileGeneration); + Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration.translogFileGeneration); for (int i = minUncommittedOp; i < translogOperations; i++) { assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); Translog.Operation next = snapshot.next(); @@ -1425,7 +1426,7 @@ public void testOpenForeignTranslog() throws IOException { } this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); - Translog.Snapshot snapshot = this.translog.newSnapshot(translogGeneration.translogFileGeneration); + Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration.translogFileGeneration); for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); assertNotNull("" + i, next); @@ -2204,7 +2205,7 @@ public void testWithRandomException() throws IOException { TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - Translog.Snapshot snapshot = translog.newSnapshot(minGenForRecovery); + Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery); assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { Translog.Operation next = snapshot.next(); @@ -2425,7 +2426,7 @@ public void testMinGenerationForSeqNo() throws IOException { final Set> generationSeenSeqNos = new HashSet<>(); final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g))); try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) { - Translog.Snapshot snapshot = reader.newSnapshot(); + TranslogSnapshot snapshot = reader.newSnapshot(); Translog.Operation operation; while ((operation = snapshot.next()) != null) { generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm())); @@ -2473,8 +2474,8 @@ public void testOpenViewIsPassToDeletionPolicy() throws IOException { if (frequently()) { long viewGen; try (Translog.View view = translog.newView()) { - viewGen = view.viewGenToRelease; - assertThat(deletionPolicy.getViewCount(view.viewGenToRelease), equalTo(1L)); + viewGen = view.baseTranslogGen; + assertThat(deletionPolicy.getViewCount(view.baseTranslogGen), equalTo(1L)); } assertThat(deletionPolicy.getViewCount(viewGen), equalTo(0L)); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 77cf93e30bd57..aa87f8f0b1ae1 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -181,6 +181,11 @@ public void testSendSnapshotSendsOps() throws IOException { } operations.add(null); RecoverySourceHandler.SendSnapshotResult result = handler.sendSnapshot(startingSeqNo, new Translog.Snapshot() { + @Override + public void close() { + + } + private int counter = 0; @Override From 8c129f1c65aef9632bf45a864e92706607ad60a7 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 27 Jul 2017 17:30:21 +0200 Subject: [PATCH 02/10] remove views --- .../org/elasticsearch/ExceptionsHelper.java | 3 +- .../index/engine/InternalEngine.java | 5 +- .../elasticsearch/index/shard/IndexShard.java | 5 +- .../index/shard/PrimaryReplicaSyncer.java | 10 +- .../index/translog/MultiSnapshot.java | 9 +- .../index/translog/Translog.java | 109 ++--- .../translog/TranslogDeletionPolicy.java | 37 +- .../recovery/RecoverySourceHandler.java | 41 +- .../index/engine/InternalEngineTests.java | 44 +- .../index/shard/IndexShardTests.java | 19 +- .../index/translog/TranslogTests.java | 442 ++++++++++-------- .../recovery/RecoverySourceHandlerTests.java | 8 +- .../test/InternalTestCluster.java | 20 + 13 files changed, 394 insertions(+), 358 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ExceptionsHelper.java b/core/src/main/java/org/elasticsearch/ExceptionsHelper.java index e89e04a301da1..4b888ba884003 100644 --- a/core/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/core/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -33,6 +33,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -127,7 +128,7 @@ public static String stackTrace(Throwable e) { * If the given list is empty no exception is thrown * */ - public static void rethrowAndSuppress(List exceptions) throws T { + public static void rethrowAndSuppress(Collection exceptions) throws T { T main = null; for (T ex : exceptions) { main = useOrSuppress(main, ex); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2edfa3c1299fd..5ede2ff872c21 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -324,9 +324,8 @@ public InternalEngine recoverFromTranslog() throws IOException { private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; - try { - final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen); + final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) { opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb887e9d19f1e..42a2cdddd7a6f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -131,6 +131,7 @@ import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; @@ -1556,10 +1557,10 @@ public void onSettingsChanged() { } } - public Translog.View acquireTranslogView() { + public Closeable acquireTranslogRetentionLock() { Engine engine = getEngine(); assert engine.getTranslog() != null : "translog must not be null"; - return engine.getTranslog().newView(); + return engine.getTranslog().acquireRetentionLock(); } public List segments(boolean verbose) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 93c28229de9f1..2740178573646 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -84,7 +84,13 @@ public void resync(IndexShard indexShard, ActionListener listener) t try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); - listener = chain(wrap(r -> snapshot.close(), e -> snapshot.close()), listener); + listener = chain(wrap(r -> snapshot.close(), e -> { + try { + snapshot.close(); + } catch (IOException e1) { + e.addSuppressed(e1); + } + }), listener); ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. @@ -93,7 +99,7 @@ public void resync(IndexShard indexShard, ActionListener listener) t Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override - public void close() { + public void close() throws IOException { snapshot.close(); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index 135ae90b08700..668633e07ef3b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -19,8 +19,7 @@ package org.elasticsearch.index.translog; -import org.elasticsearch.common.lease.Releasable; - +import java.io.Closeable; import java.io.IOException; import java.util.Arrays; @@ -31,13 +30,13 @@ final class MultiSnapshot implements Translog.Snapshot { private final TranslogSnapshot[] translogs; private final int totalOperations; - private final Releasable onClose; + private final Closeable onClose; private int index; /** * Creates a new point in time snapshot of the given snapshots. Those snapshots are always iterated in-order. */ - MultiSnapshot(TranslogSnapshot[] translogs, Releasable onClose) { + MultiSnapshot(TranslogSnapshot[] translogs, Closeable onClose) { this.translogs = translogs; totalOperations = Arrays.stream(translogs).mapToInt(TranslogSnapshot::totalOperations).sum(); this.onClose = onClose; @@ -63,7 +62,7 @@ public Translog.Operation next() throws IOException { } @Override - public void close() { + public void close() throws IOException { onClose.close(); } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 97097ceba428e..fdb3ffdb8831c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -327,7 +327,7 @@ public void close() throws IOException { try { current.sync(); } finally { - closeFilesIfNoPendingViews(); + closeFilesIfNoPendingRetentionLocks(); } } finally { logger.debug("translog closed"); @@ -409,9 +409,9 @@ private int totalOperations(long minGeneration) { } /** - * Returns the number of operations in the transaction files that aren't committed to lucene.. + * Returns the number of operations in the transaction files that contain operations with seq# above the given number. */ - private int totalOperationsInGensAboveSeqNo(long minSeqNo) { + public int estimateTotalOperationsFromMinSeq(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum(); @@ -575,13 +575,13 @@ public long getLastSyncedGlobalCheckpoint() { * Snapshots the current transaction log allowing to safely iterate over the snapshot. * Snapshots are fixed in time and will not be updated with future operations. */ - public Snapshot newSnapshot() { + public Snapshot newSnapshot() throws IOException { try (ReleasableLock ignored = readLock.acquire()) { return newSnapshotFromGen(getMinFileGeneration()); } } - public Snapshot newSnapshotFromGen(long minGeneration) { + public Snapshot newSnapshotFromGen(long minGeneration) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); if (minGeneration < getMinFileGeneration()) { @@ -595,7 +595,7 @@ public Snapshot newSnapshotFromGen(long minGeneration) { } } - public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) { + public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot) @@ -604,14 +604,14 @@ public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) { } } - private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) { - final Releasable onClose; + private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOException { + final Closeable onClose; if (snapshots.length == 0) { onClose = () -> {}; } else { assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get() == snapshots[0].generation : "first reader generation of " + snapshots[0].generation + " is not the smallest"; - onClose = deletionPolicy.acquireTranslogGen(snapshots[0].generation); + onClose = acquireTranslogGenFromDeletionPolicy(snapshots[0].generation); } boolean success = false; try { @@ -637,21 +637,26 @@ private Stream readersAboveMinSeqNo(long minSeqNo) } /** - * Returns a view into the current translog that is guaranteed to retain all current operations - * while receiving future ones as well + * Acquires a lock on the translog files, preventing them from being trimmed */ - public Translog.View newView() { + public Closeable acquireRetentionLock() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); final long viewGen = getMinFileGeneration(); - Releasable onClose = deletionPolicy.acquireTranslogGen(viewGen); + return acquireTranslogGenFromDeletionPolicy(viewGen); + } + } + + private Closeable acquireTranslogGenFromDeletionPolicy(long viewGen) { + Releasable toClose = deletionPolicy.acquireTranslogGen(viewGen); + return () -> { try { - return new View(viewGen, onClose); - } catch (Exception e) { - onClose.close(); - throw e; + toClose.close(); + } finally { + trimUnreferencedReaders(); + closeFilesIfNoPendingRetentionLocks(); } - } + }; } /** @@ -764,68 +769,6 @@ public TranslogDeletionPolicy getDeletionPolicy() { return deletionPolicy; } - /** - * a view into the translog, capturing all translog file at the moment of creation - * and updated with any future translog. - */ - /** - * a view into the translog, capturing all translog file at the moment of creation - * and updated with any future translog. - */ - public class View implements Closeable { - - final AtomicBoolean closed = new AtomicBoolean(); - final long baseTranslogGen; - private final Releasable onClose; - - View(long baseTranslogGen, Releasable onClose) { - this.baseTranslogGen = baseTranslogGen; - this.onClose = onClose; - } - - /** - * The total number of operations in the view files which contain an operation with a sequence number - * above the given min sequence numbers. This will be the number of operations in snapshot taken - * by calling {@link #snapshot(long)} with the same parameter. - */ - public int estimateTotalOperations(long minSequenceNumber) { - return Translog.this.totalOperationsInGensAboveSeqNo(minSequenceNumber); - } - - /** - * The total size of the view files which contain an operation with a sequence number - * above the given min sequence numbers. These are the files that would need to be read by snapshot - * acquired {@link #snapshot(long)} with the same parameter. - */ - public long estimateSizeInBytes(long minSequenceNumber) { - return Translog.this.sizeOfGensAboveSeqNoInBytes(minSequenceNumber); - } - - /** - * create a snapshot from this view, containing all - * operations from the given sequence number and up (with potentially some more) */ - public Snapshot snapshot(long minSequenceNumber) { - ensureOpen(); - return Translog.this.newSnapshotFromMinSeqNo(minSequenceNumber); - } - - void ensureOpen() { - if (closed.get()) { - throw new AlreadyClosedException("View is already closed"); - } - } - - @Override - public void close() throws IOException { - if (closed.getAndSet(true) == false) { - logger.trace("closing view starting at translog [{}]", baseTranslogGen); - onClose.close(); - trimUnreferencedReaders(); - closeFilesIfNoPendingViews(); - } - } - } - public static class Location implements Comparable { @@ -884,7 +827,7 @@ public int hashCode() { /** * A snapshot of the transaction log, allows to iterate over all the transaction log operations. */ - public interface Snapshot extends Releasable { + public interface Snapshot extends Closeable { /** * The total number of operations in the translog. @@ -1652,9 +1595,9 @@ void deleteReaderFiles(TranslogReader reader) { reader.path().resolveSibling(getCommitCheckpointFileName(reader.getGeneration()))); } - void closeFilesIfNoPendingViews() throws IOException { + void closeFilesIfNoPendingRetentionLocks() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { - if (closed.get() && deletionPolicy.pendingViewsCount() == 0) { + if (closed.get() && deletionPolicy.pendingTranslogRefCount() == 0) { logger.trace("closing files. translog is closed and there are no pending views"); ArrayList toClose = new ArrayList<>(readers); toClose.add(current); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 48df0b6e52098..26952dd960610 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -20,7 +20,10 @@ package org.elasticsearch.index.translog; import org.apache.lucene.util.Counter; +import org.elasticsearch.Assertions; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.io.IOException; import java.util.HashMap; @@ -30,6 +33,12 @@ public class TranslogDeletionPolicy { + private final Map openTranslogRef; + + public void assertNoOpenTranslogRefs() { + ExceptionsHelper.rethrowAndSuppress(openTranslogRef.values()); + } + /** * Records how many views are held against each * translog generation @@ -49,6 +58,11 @@ public class TranslogDeletionPolicy { public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) { this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; + if (Assertions.ENABLED) { + openTranslogRef = ConcurrentCollections.newConcurrentMap(); + } else { + openTranslogRef = null; + } } public synchronized void setMinTranslogGenerationForRecovery(long newGen) { @@ -74,16 +88,25 @@ public synchronized void setRetentionAgeInMillis(long ageInMillis) { synchronized Releasable acquireTranslogGen(final long genForView) { translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1); final AtomicBoolean closed = new AtomicBoolean(); + assert assertAddTranslogRef(closed); return () -> { if (closed.compareAndSet(false, true)) { - // TODO add assertions that this is called releaseTranslogGenView(genForView); + assert assertRemoveTranslogRef(closed); } }; } - /** returns the number of generations that were acquired for views */ - synchronized int pendingViewsCount() { + private boolean assertAddTranslogRef(Object reference) { + return openTranslogRef.put(reference, new RuntimeException()) == null; + } + + private boolean assertRemoveTranslogRef(Object reference) { + return openTranslogRef.remove(reference) != null; + } + + /** returns the number of generations that were acquired for snapshots */ + synchronized int pendingTranslogRefCount() { return translogRefCounts.size(); } @@ -108,7 +131,7 @@ private synchronized void releaseTranslogGenView(long translogGen) { * @param writer current translog writer */ synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { - long minByView = getMinTranslogGenRequiredByViews(); + long minByView = getMinTranslogGenRequiredByLocks(); long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); final long minByAgeAndSize; @@ -154,7 +177,7 @@ protected long currentTime() { return System.currentTimeMillis(); } - private long getMinTranslogGenRequiredByViews() { + private long getMinTranslogGenRequiredByLocks() { return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } @@ -163,8 +186,8 @@ public synchronized long getMinTranslogGenerationForRecovery() { return minTranslogGenerationForRecovery; } - synchronized long getViewCount(long viewGen) { - final Counter counter = translogRefCounts.get(viewGen); + synchronized long getTranslogRefCount(long gen) { + final Counter counter = translogRefCounts.get(gen); return counter == null ? 0 : counter.get(); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 0ea8425165c1a..181c6c353e373 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -59,6 +59,7 @@ import org.elasticsearch.transport.RemoteTransportException; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -66,6 +67,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.StreamSupport; /** @@ -140,11 +142,13 @@ public RecoveryResponse recoverToTarget() throws IOException { assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }); - try (Translog.View translogView = shard.acquireTranslogView()) { + try (Closeable translogRetentionLock = shard.acquireTranslogRetentionLock()) { + + final Translog translog = shard.getTranslog(); final long startingSeqNo; boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO && - isTranslogReadyForSequenceNumberBasedRecovery(translogView); + isTranslogReadyForSequenceNumberBasedRecovery(); if (isSequenceNumberBasedRecoveryPossible) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); @@ -154,7 +158,7 @@ public RecoveryResponse recoverToTarget() throws IOException { try { phase1Snapshot = shard.acquireIndexCommit(false); } catch (final Exception e) { - IOUtils.closeWhileHandlingException(translogView); + IOUtils.closeWhileHandlingException(); throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } // we set this to unassigned to create a translog roughly according to the retention policy @@ -162,7 +166,7 @@ public RecoveryResponse recoverToTarget() throws IOException { startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; try { - phase1(phase1Snapshot.getIndexCommit(), translogView, startingSeqNo); + phase1(phase1Snapshot.getIndexCommit(), translog::totalOperations); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e); } finally { @@ -177,14 +181,14 @@ public RecoveryResponse recoverToTarget() throws IOException { runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); try { - prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo)); + prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } - logger.trace("snapshot translog for recovery; current size is [{}]", translogView.estimateTotalOperations(startingSeqNo)); + logger.trace("snapshot translog for recovery; current size is [{}]", translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); final long targetLocalCheckpoint; - try(Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo)) { + try(Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo)) { targetLocalCheckpoint = phase2(startingSeqNo, snapshot); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); @@ -214,11 +218,10 @@ private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { * Determines if the source translog is ready for a sequence-number-based peer recovery. The main condition here is that the source * translog contains all operations between the local checkpoint on the target and the current maximum sequence number on the source. * - * @param translogView a view of the translog on the source * @return {@code true} if the source is ready for a sequence-number-based recovery * @throws IOException if an I/O exception occurred reading the translog snapshot */ - boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) throws IOException { + boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { final long startingSeqNo = request.startingSeqNo(); assert startingSeqNo >= 0; final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); @@ -234,7 +237,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); - try(Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo)) { + try(Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { @@ -260,7 +263,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View transl * segments that are missing. Only segments that have the same size and * checksum can be reused */ - public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) { + public void phase1(final IndexCommit snapshot, final Supplier translogOps) { cancellableThreads.checkForCancel(); // Total size of segment files that are recovered long totalSize = 0; @@ -338,10 +341,10 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView, new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, - response.phase1ExistingFileSizes, translogView.estimateTotalOperations(startSeqNo))); + response.phase1ExistingFileSizes, translogOps.get())); // How many bytes we've copied since we last called RateLimiter.pause final Function outputStreamFactories = - md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView, startSeqNo), chunkSizeInBytes); + md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogOps), chunkSizeInBytes); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // Send the CLEAN_FILES request, which takes all of the files that // were transferred and renames them from their temporary file @@ -353,7 +356,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView, // are deleted try { cancellableThreads.executeIO(() -> - recoveryTarget.cleanFiles(translogView.estimateTotalOperations(startSeqNo), recoverySourceMetadata)); + recoveryTarget.cleanFiles(translogOps.get(), recoverySourceMetadata)); } catch (RemoteTransportException | IOException targetException) { final IOException corruptIndexException; // we realized that after the index was copied and we wanted to finalize the recovery @@ -585,14 +588,12 @@ public String toString() { final class RecoveryOutputStream extends OutputStream { private final StoreFileMetaData md; - private final Translog.View translogView; - private final long startSeqNp; + private final Supplier translogOps; private long position = 0; - RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView, long startSeqNp) { + RecoveryOutputStream(StoreFileMetaData md, Supplier translogOps) { this.md = md; - this.translogView = translogView; - this.startSeqNp = startSeqNp; + this.translogOps = translogOps; } @Override @@ -610,7 +611,7 @@ public void write(byte[] b, int offset, int length) throws IOException { private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { // Actually send the file chunk to the target node, waiting for it to complete cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.estimateTotalOperations(startSeqNp)) + recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogOps.get()) ); if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us throw new IndexShardClosedException(request.shardId()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0cb7dd00aa5e5..9e528b15acba8 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -280,6 +280,12 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An @After public void tearDown() throws Exception { super.tearDown(); + if (engine != null && engine.isClosed.get() == false) { + engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } + if (replicaEngine != null && replicaEngine.isClosed.get() == false) { + replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } IOUtils.close( replicaEngine, storeReplica, engine, store); @@ -2478,7 +2484,7 @@ public void testTranslogReplayWithFailure() throws IOException { // since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents // this so we have to disable the check explicitly boolean started = false; - final int numIters = randomIntBetween(10, 20); + final int numIters = 1;//randomIntBetween(10, 20); for (int i = 0; i < numIters; i++) { directory.setRandomIOExceptionRateOnOpen(randomDouble()); directory.setRandomIOExceptionRate(randomDouble()); @@ -3952,9 +3958,10 @@ public SequenceNumbersService seqNoService() { // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; - final Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot(); - while ((op = snapshot.next()) != null) { - last = op; + try (Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot()) { + while ((op = snapshot.next()) != null) { + last = op; + } } assertNotNull(last); assertThat(last, instanceOf(Translog.NoOp.class)); @@ -4162,21 +4169,22 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); // now snapshot the tlog and ensure the primary term is updated - Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); - assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations()); - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.opType() == Translog.Operation.Type.NO_OP) { - assertEquals(2, operation.primaryTerm()); - } else { - assertEquals(1, operation.primaryTerm()); - } + try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) { + assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations()); + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + assertEquals(2, operation.primaryTerm()); + } else { + assertEquals(1, operation.primaryTerm()); + } - } - assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); - assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); - if ((flushed = randomBoolean())) { - recoveringEngine.flush(true, true); + } + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); + if ((flushed = randomBoolean())) { + recoveringEngine.flush(true, true); + } } } finally { IOUtils.close(recoveringEngine); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 314c623e175bd..20411c70f4fa8 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1494,17 +1494,18 @@ public void testRecoverFromStoreWithNoOps() throws IOException { assertEquals(1, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(1, newShard.recoveryState().getTranslog().totalOperationsOnStart()); assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); - Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot(); - Translog.Operation operation; - int numNoops = 0; - while((operation = snapshot.next()) != null) { - if (operation.opType() == Translog.Operation.Type.NO_OP) { - numNoops++; - assertEquals(newShard.getPrimaryTerm(), operation.primaryTerm()); - assertEquals(0, operation.seqNo()); + try (Translog.Snapshot snapshot = newShard.getTranslog().newSnapshot()) { + Translog.Operation operation; + int numNoops = 0; + while ((operation = snapshot.next()) != null) { + if (operation.opType() == Translog.Operation.Type.NO_OP) { + numNoops++; + assertEquals(newShard.getPrimaryTerm(), operation.primaryTerm()); + assertEquals(0, operation.seqNo()); + } } + assertEquals(1, numNoops); } - assertEquals(1, numNoops); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); assertDocCount(newShard, 1); assertDocCount(shard, 2); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index d9c87a63885cf..4d2244870da31 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -73,6 +73,7 @@ import org.junit.After; import org.junit.Before; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -178,7 +179,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { try { - assertEquals("there are still open views", 0, translog.getDeletionPolicy().pendingViewsCount()); + translog.getDeletionPolicy().assertNoOpenTranslogRefs(); translog.close(); } finally { super.tearDown(); @@ -279,54 +280,60 @@ private String randomNonTranslogPatternString(int min, int max) { public void testSimpleOperations() throws IOException { ArrayList ops = new ArrayList<>(); - Translog.Snapshot snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.size(0)); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.size(0)); + } addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); - snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.totalOperations(), equalTo(ops.size())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + } addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", 1, newUid("2"))); - snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.totalOperations(), equalTo(ops.size())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + } final long seqNo = randomNonNegativeLong(); final long primaryTerm = randomNonNegativeLong(); final String reason = randomAlphaOfLength(16); addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, primaryTerm, reason)); - snapshot = translog.newSnapshot(); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Index index = (Translog.Index) snapshot.next(); - assertNotNull(index); - assertThat(BytesReference.toBytes(index.source()), equalTo(new byte[]{1})); + Translog.Index index = (Translog.Index) snapshot.next(); + assertNotNull(index); + assertThat(BytesReference.toBytes(index.source()), equalTo(new byte[]{1})); - Translog.Delete delete = (Translog.Delete) snapshot.next(); - assertNotNull(delete); - assertThat(delete.uid(), equalTo(newUid("2"))); + Translog.Delete delete = (Translog.Delete) snapshot.next(); + assertNotNull(delete); + assertThat(delete.uid(), equalTo(newUid("2"))); - Translog.NoOp noOp = (Translog.NoOp) snapshot.next(); - assertNotNull(noOp); - assertThat(noOp.seqNo(), equalTo(seqNo)); - assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); - assertThat(noOp.reason(), equalTo(reason)); + Translog.NoOp noOp = (Translog.NoOp) snapshot.next(); + assertNotNull(noOp); + assertThat(noOp.seqNo(), equalTo(seqNo)); + assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); + assertThat(noOp.reason(), equalTo(reason)); - assertNull(snapshot.next()); + assertNull(snapshot.next()); + } long firstId = translog.currentFileGeneration(); translog.rollGeneration(); assertThat(translog.currentFileGeneration(), Matchers.not(equalTo(firstId))); - snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.totalOperations(), equalTo(ops.size())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + } markCurrentGenAsCommitted(translog); - snapshot = translog.newSnapshotFromGen(firstId + 1); - assertThat(snapshot, SnapshotMatchers.size(0)); - assertThat(snapshot.totalOperations(), equalTo(0)); + try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(firstId + 1)) { + assertThat(snapshot, SnapshotMatchers.size(0)); + assertThat(snapshot.totalOperations(), equalTo(0)); + } } protected TranslogStats stats() throws IOException { @@ -470,44 +477,53 @@ public void testNegativeSizeInBytes() { public void testSnapshot() throws IOException { ArrayList ops = new ArrayList<>(); - Translog.Snapshot snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.size(0)); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.size(0)); + } addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); - snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.totalOperations(), equalTo(1)); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(1)); + } - snapshot = translog.newSnapshot(); - Translog.Snapshot snapshot1 = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.totalOperations(), equalTo(1)); + try (Translog.Snapshot snapshot = translog.newSnapshot(); + Translog.Snapshot snapshot1 = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(1)); - assertThat(snapshot1, SnapshotMatchers.size(1)); - assertThat(snapshot1.totalOperations(), equalTo(1)); + assertThat(snapshot1, SnapshotMatchers.size(1)); + assertThat(snapshot1.totalOperations(), equalTo(1)); + } } public void testSnapshotWithNewTranslog() throws IOException { - ArrayList ops = new ArrayList<>(); - Translog.Snapshot snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.size(0)); + List toClose = new ArrayList<>(); + try { + ArrayList ops = new ArrayList<>(); + Translog.Snapshot snapshot = translog.newSnapshot(); + toClose.add(snapshot); + assertThat(snapshot, SnapshotMatchers.size(0)); - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); - Translog.Snapshot snapshot1 = translog.newSnapshot(); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); + Translog.Snapshot snapshot1 = translog.newSnapshot(); + toClose.add(snapshot1); - addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{2})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{2})); - assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); + assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); - translog.rollGeneration(); - addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3})); + translog.rollGeneration(); + addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3})); - try (Translog.View view = translog.newView()) { Translog.Snapshot snapshot2 = translog.newSnapshot(); + toClose.add(snapshot2); markCurrentGenAsCommitted(translog); assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); + } finally { + IOUtils.closeWhileHandlingException(toClose); } } @@ -587,43 +603,43 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { List collect = new ArrayList<>(writtenOperations); Collections.sort(collect); - Translog.Snapshot snapshot = translog.newSnapshot(); - for (LocationOperation locationOperation : collect) { - Translog.Operation op = snapshot.next(); - assertNotNull(op); - Translog.Operation expectedOp = locationOperation.operation; - assertEquals(expectedOp.opType(), op.opType()); - switch (op.opType()) { - case INDEX: - Translog.Index indexOp = (Translog.Index) op; - Translog.Index expIndexOp = (Translog.Index) expectedOp; - assertEquals(expIndexOp.id(), indexOp.id()); - assertEquals(expIndexOp.routing(), indexOp.routing()); - assertEquals(expIndexOp.type(), indexOp.type()); - assertEquals(expIndexOp.source(), indexOp.source()); - assertEquals(expIndexOp.version(), indexOp.version()); - assertEquals(expIndexOp.versionType(), indexOp.versionType()); - break; - case DELETE: - Translog.Delete delOp = (Translog.Delete) op; - Translog.Delete expDelOp = (Translog.Delete) expectedOp; - assertEquals(expDelOp.uid(), delOp.uid()); - assertEquals(expDelOp.version(), delOp.version()); - assertEquals(expDelOp.versionType(), delOp.versionType()); - break; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) op; - final Translog.NoOp expectedNoOp = (Translog.NoOp) expectedOp; - assertThat(noOp.seqNo(), equalTo(expectedNoOp.seqNo())); - assertThat(noOp.primaryTerm(), equalTo(expectedNoOp.primaryTerm())); - assertThat(noOp.reason(), equalTo(expectedNoOp.reason())); - break; - default: - throw new AssertionError("unsupported operation type [" + op.opType() + "]"); - } - - } - assertNull(snapshot.next()); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + for (LocationOperation locationOperation : collect) { + Translog.Operation op = snapshot.next(); + assertNotNull(op); + Translog.Operation expectedOp = locationOperation.operation; + assertEquals(expectedOp.opType(), op.opType()); + switch (op.opType()) { + case INDEX: + Translog.Index indexOp = (Translog.Index) op; + Translog.Index expIndexOp = (Translog.Index) expectedOp; + assertEquals(expIndexOp.id(), indexOp.id()); + assertEquals(expIndexOp.routing(), indexOp.routing()); + assertEquals(expIndexOp.type(), indexOp.type()); + assertEquals(expIndexOp.source(), indexOp.source()); + assertEquals(expIndexOp.version(), indexOp.version()); + assertEquals(expIndexOp.versionType(), indexOp.versionType()); + break; + case DELETE: + Translog.Delete delOp = (Translog.Delete) op; + Translog.Delete expDelOp = (Translog.Delete) expectedOp; + assertEquals(expDelOp.uid(), delOp.uid()); + assertEquals(expDelOp.version(), delOp.version()); + assertEquals(expDelOp.versionType(), delOp.versionType()); + break; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) op; + final Translog.NoOp expectedNoOp = (Translog.NoOp) expectedOp; + assertThat(noOp.seqNo(), equalTo(expectedNoOp.seqNo())); + assertThat(noOp.primaryTerm(), equalTo(expectedNoOp.primaryTerm())); + assertThat(noOp.reason(), equalTo(expectedNoOp.reason())); + break; + default: + throw new AssertionError("unsupported operation type [" + op.opType() + "]"); + } + } + assertNull(snapshot.next()); + } } @@ -640,17 +656,18 @@ public void testTranslogChecksums() throws Exception { corruptTranslogs(translogDir); AtomicInteger corruptionsCaught = new AtomicInteger(0); - Translog.Snapshot snapshot = translog.newSnapshot(); - for (Translog.Location location : locations) { - try { - Translog.Operation next = snapshot.next(); - assertNotNull(next); - } catch (TranslogCorruptedException e) { - corruptionsCaught.incrementAndGet(); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + for (Translog.Location location : locations) { + try { + Translog.Operation next = snapshot.next(); + assertNotNull(next); + } catch (TranslogCorruptedException e) { + corruptionsCaught.incrementAndGet(); + } } + expectThrows(TranslogCorruptedException.class, snapshot::next); + assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1)); } - expectThrows(TranslogCorruptedException.class, snapshot::next); - assertThat("at least one corruption was caused and caught", corruptionsCaught.get(), greaterThanOrEqualTo(1)); } public void testTruncatedTranslogs() throws Exception { @@ -666,12 +683,13 @@ public void testTruncatedTranslogs() throws Exception { truncateTranslogs(translogDir); AtomicInteger truncations = new AtomicInteger(0); - Translog.Snapshot snap = translog.newSnapshot(); - for (Translog.Location location : locations) { - try { - assertNotNull(snap.next()); - } catch (EOFException e) { - truncations.incrementAndGet(); + try (Translog.Snapshot snap = translog.newSnapshot()) { + for (Translog.Location location : locations) { + try { + assertNotNull(snap.next()); + } catch (EOFException e) { + truncations.incrementAndGet(); + } } } assertThat("at least one truncation was caused and caught", truncations.get(), greaterThanOrEqualTo(1)); @@ -723,10 +741,11 @@ private Term newUid(String uid) { public void testVerifyTranslogIsNotDeleted() throws IOException { assertFileIsPresent(translog, 1); translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); - Translog.Snapshot snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.size(1)); - assertFileIsPresent(translog, 1); - assertThat(snapshot.totalOperations(), equalTo(1)); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.size(1)); + assertFileIsPresent(translog, 1); + assertThat(snapshot.totalOperations(), equalTo(1)); + } translog.close(); assertFileIsPresent(translog, 1); @@ -828,7 +847,7 @@ public void onFailure(Exception e) { for (int i = 0; i < readers.length; i++) { final String threadId = "reader_" + i; readers[i] = new Thread(new AbstractRunnable() { - Translog.View view = null; + Closeable retentionLock = null; long committedLocalCheckpointAtView; @Override @@ -836,26 +855,26 @@ public void onFailure(Exception e) { logger.error((Supplier) () -> new ParameterizedMessage("--> reader [{}] had an error", threadId), e); errors.add(e); try { - closeView(); + closeRetentionLock(); } catch (IOException inner) { inner.addSuppressed(e); logger.error("unexpected error while closing view, after failure", inner); } } - void closeView() throws IOException { - if (view != null) { - view.close(); + void closeRetentionLock() throws IOException { + if (retentionLock != null) { + retentionLock.close(); } } - void newView() throws IOException { - closeView(); - view = translog.newView(); + void acquireRetentionLock() throws IOException { + closeRetentionLock(); + retentionLock = translog.acquireRetentionLock(); // captures the last committed checkpoint, while holding the view, simulating // recovery logic which captures a view and gets a lucene commit committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get(); - logger.info("--> [{}] opened view from [{}]", threadId, view.baseTranslogGen); + logger.info("--> [{}] min gen after acquiring lock [{}]", threadId, translog.getMinFileGeneration()); } @Override @@ -864,14 +883,14 @@ protected void doRun() throws Exception { int iter = 0; while (idGenerator.get() < maxOps) { if (iter++ % 10 == 0) { - newView(); + acquireRetentionLock(); } // captures al views that are written since the view was created (with a small caveat see bellow) // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); - try (Translog.Snapshot snapshot = view.snapshot(committedLocalCheckpointAtView + 1L)) { + try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) { Translog.Operation op; while ((op = snapshot.next()) != null) { expectedOps.remove(op); @@ -897,7 +916,7 @@ protected void doRun() throws Exception { } } } - closeView(); + closeRetentionLock(); logger.info("--> [{}] done. tested [{}] snapshots", threadId, iter); } }, threadId); @@ -1009,14 +1028,15 @@ public void testLocationComparison() throws IOException { } assertEquals(max.generation, translog.currentFileGeneration()); - Translog.Snapshot snap = translog.newSnapshot(); - Translog.Operation next; - Translog.Operation maxOp = null; - while ((next = snap.next()) != null) { - maxOp = next; + try (Translog.Snapshot snap = translog.newSnapshot()) { + Translog.Operation next; + Translog.Operation maxOp = null; + while ((next = snap.next()) != null) { + maxOp = next; + } + assertNotNull(maxOp); + assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count)); } - assertNotNull(maxOp); - assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count)); } public static Translog.Location max(Translog.Location a, Translog.Location b) { @@ -1186,18 +1206,21 @@ public void testBasicRecovery() throws IOException { assertEquals(0, translog.stats().estimatedNumberOfOperations()); assertEquals(1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); - assertNull(snapshot.next()); + try(Translog.Snapshot snapshot = translog.newSnapshot()) { + assertNull(snapshot.next()); + } } else { translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration.translogFileGeneration); - for (int i = minUncommittedOp; i < translogOperations; i++) { - assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals(i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) { + for (int i = minUncommittedOp; i < translogOperations; i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", + translog.currentFileGeneration() - 1, locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } } } @@ -1229,12 +1252,13 @@ public void testRecoveryUncommitted() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } } if (randomBoolean()) { // recover twice @@ -1242,12 +1266,13 @@ public void testRecoveryUncommitted() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } } } @@ -1285,14 +1310,14 @@ public void testRecoveryUncommittedFileExists() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } - } if (randomBoolean()) { // recover twice @@ -1300,12 +1325,13 @@ public void testRecoveryUncommittedFileExists() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } } } @@ -1348,12 +1374,13 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - Translog.Snapshot snapshot = translog.newSnapshot(); - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } } } @@ -1426,13 +1453,14 @@ public void testOpenForeignTranslog() throws IOException { } this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); - Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration.translogFileGeneration); - for (int i = firstUncommitted; i < translogOperations; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("" + i, next); - assertEquals(Integer.parseInt(next.getSource().source.utf8ToString()), i); + try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) { + for (int i = firstUncommitted; i < translogOperations; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("" + i, next); + assertEquals(Integer.parseInt(next.getSource().source.utf8ToString()), i); + } + assertNull(snapshot.next()); } - assertNull(snapshot.next()); } public void testFailOnClosedWrite() throws IOException { @@ -1614,13 +1642,15 @@ public void testFailFlush() throws IOException { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); - Translog.Snapshot snapshot = tlog.newSnapshot(); - assertEquals(opsSynced, snapshot.totalOperations()); - for (int i = 0; i < opsSynced; i++) { - assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals(i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + assertEquals(opsSynced, snapshot.totalOperations()); + for (int i = 0; i < opsSynced; i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, + locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } } } @@ -1632,12 +1662,14 @@ public void testTranslogOpsCountIsCorrect() throws IOException { for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { locations.add(translog.add( new Translog.Index("test", "" + opsAdded, opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); - Translog.Snapshot snapshot = this.translog.newSnapshot(); - assertEquals(opsAdded + 1, snapshot.totalOperations()); - for (int i = 0; i < opsAdded; i++) { - assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); + try (Translog.Snapshot snapshot = this.translog.newSnapshot()) { + assertEquals(opsAdded + 1, snapshot.totalOperations()); + for (int i = 0; i < opsAdded; i++) { + assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), + locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + } } } } @@ -1706,7 +1738,7 @@ protected void afterAdd() throws IOException { } downLatch.countDown(); added.await(); - try (Translog.View view = translog.newView()) { + try (Closeable ignored = translog.acquireRetentionLock()) { // this holds a reference to the current tlog channel such that it's not closed // if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip // otherwise our assertions here are off by one sometimes. @@ -1741,8 +1773,9 @@ protected void afterAdd() throws IOException { iterator.remove(); } } - try (Translog tlog = new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - Translog.Snapshot snapshot = tlog.newSnapshot(); + try (Translog tlog = + new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + Translog.Snapshot snapshot = tlog.newSnapshot()) { if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { if (threadExceptions[i] != null) { @@ -2036,9 +2069,9 @@ public void testRecoverWithUnbackedNextGen() throws IOException { Checkpoint read = Checkpoint.read(ckp); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); - try (Translog tlog = createTranslog(config, translog.getTranslogUUID())) { + try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); + Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - Translog.Snapshot snapshot = tlog.newSnapshot(); for (int i = 0; i < 1; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null", next); @@ -2046,9 +2079,9 @@ public void testRecoverWithUnbackedNextGen() throws IOException { } tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } - try (Translog tlog = createTranslog(config, translog.getTranslogUUID())) { + try (Translog tlog = createTranslog(config, translog.getTranslogUUID()); + Translog.Snapshot snapshot = tlog.newSnapshot()) { assertFalse(tlog.syncNeeded()); - Translog.Snapshot snapshot = tlog.newSnapshot(); for (int i = 0; i < 2; i++) { Translog.Operation next = snapshot.next(); assertNotNull("operation " + i + " must be non-null", next); @@ -2091,11 +2124,12 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { assertFalse(tlog.syncNeeded()); - Translog.Snapshot snapshot = tlog.newSnapshot(); - for (int i = 0; i < 1; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); + } } tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } @@ -2204,8 +2238,8 @@ public void testWithRandomException() throws IOException { fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); - try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { - Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery); + try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery)) { assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { Translog.Operation next = snapshot.next(); @@ -2272,10 +2306,10 @@ public void testPendingDelete() throws IOException { translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); translog.rollGeneration(); - Translog.View view = translog.newView(); + Closeable lock = translog.acquireRetentionLock(); translog.add(new Translog.Index("test", "3", 2, new byte[]{3})); translog.close(); - IOUtils.close(view); + IOUtils.close(lock); translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } @@ -2460,7 +2494,7 @@ public void testSimpleCommit() throws IOException { commit(translog, generation); } - public void testOpenViewIsPassToDeletionPolicy() throws IOException { + public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { final int operations = randomIntBetween(1, 4096); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); for (int i = 0; i < operations; i++) { @@ -2472,12 +2506,12 @@ public void testOpenViewIsPassToDeletionPolicy() throws IOException { commit(translog, randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), translog.currentFileGeneration())); } if (frequently()) { - long viewGen; - try (Translog.View view = translog.newView()) { - viewGen = view.baseTranslogGen; - assertThat(deletionPolicy.getViewCount(view.baseTranslogGen), equalTo(1L)); + long minGen; + try (Closeable ignored = translog.acquireRetentionLock()) { + minGen = translog.getMinFileGeneration(); + assertThat(deletionPolicy.getTranslogRefCount(minGen), equalTo(1L)); } - assertThat(deletionPolicy.getViewCount(viewGen), equalTo(0L)); + assertThat(deletionPolicy.getTranslogRefCount(minGen), equalTo(0L)); } } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index aa87f8f0b1ae1..4f1a2364d184b 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -77,6 +77,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -376,8 +377,6 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE final IndexShard shard = mock(IndexShard.class); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class)); - final Translog.View translogView = mock(Translog.View.class); - when(shard.acquireTranslogView()).thenReturn(translogView); when(shard.state()).thenReturn(IndexShardState.RELOCATED); when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); doAnswer(invocation -> { @@ -394,13 +393,14 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE recoverySettings.getChunkSize().bytesAsInt(), Settings.EMPTY) { + @Override - boolean isTranslogReadyForSequenceNumberBasedRecovery(final Translog.View translogView) { + boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { return randomBoolean(); } @Override - public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) { + public void phase1(final IndexCommit snapshot, final Supplier translogOps) { phase1Called.set(true); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 33cae7adceb7a..f7ace8d2c58de 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -25,6 +25,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -1124,6 +1125,7 @@ public void beforeIndexDeletion() throws Exception { assertShardIndexCounter(); //check that shards that have same sync id also contain same number of documents assertSameSyncIdSameDocs(); + assertOpenTranslogReferences(); } private void assertSameSyncIdSameDocs() { @@ -1181,6 +1183,24 @@ private void assertShardIndexCounter() throws Exception { }); } + private void assertOpenTranslogReferences() throws Exception { + assertBusy(() -> { + final Collection nodesAndClients = nodes.values(); + for (NodeAndClient nodeAndClient : nodesAndClients) { + IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + try { + indexShard.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + } catch (AlreadyClosedException ok) { + // all good + } + } + } + } + }); + } + private void randomlyResetClients() throws IOException { // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { From 3104f21c3e9b3d0526dfdeb9176e4271244455e7 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 29 Jul 2017 10:55:34 +0200 Subject: [PATCH 03/10] improve assertion message --- .../index/translog/TranslogDeletionPolicy.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 26952dd960610..0fe5e7c21206b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -98,7 +98,11 @@ synchronized Releasable acquireTranslogGen(final long genForView) { } private boolean assertAddTranslogRef(Object reference) { - return openTranslogRef.put(reference, new RuntimeException()) == null; + final RuntimeException existing = openTranslogRef.put(reference, new RuntimeException()); + if (existing != null) { + throw new AssertionError("double adding of closing reference", existing); + } + return true; } private boolean assertRemoveTranslogRef(Object reference) { From 6bd6bce57b7f9e51dcca1c819b15ece8dae9b247 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 29 Jul 2017 14:20:04 +0200 Subject: [PATCH 04/10] change to assertion --- .../index/translog/TranslogDeletionPolicy.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 0fe5e7c21206b..5f84d01db47fd 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -21,7 +21,6 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.Assertions; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -36,7 +35,11 @@ public class TranslogDeletionPolicy { private final Map openTranslogRef; public void assertNoOpenTranslogRefs() { - ExceptionsHelper.rethrowAndSuppress(openTranslogRef.values()); + if (openTranslogRef.isEmpty() == false) { + AssertionError e = new AssertionError("not all translog generations have been released"); + openTranslogRef.values().forEach(e::addSuppressed); + throw e; + } } /** From 896161131492a358ae2242a17396e0a3894a7adf Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 29 Jul 2017 18:52:22 +0200 Subject: [PATCH 05/10] roll back unneeded commit --- core/src/main/java/org/elasticsearch/ExceptionsHelper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ExceptionsHelper.java b/core/src/main/java/org/elasticsearch/ExceptionsHelper.java index 4b888ba884003..e89e04a301da1 100644 --- a/core/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/core/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -33,7 +33,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; -import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -128,7 +127,7 @@ public static String stackTrace(Throwable e) { * If the given list is empty no exception is thrown * */ - public static void rethrowAndSuppress(Collection exceptions) throws T { + public static void rethrowAndSuppress(List exceptions) throws T { T main = null; for (T ex : exceptions) { main = useOrSuppress(main, ex); From 530c33f943fe42f9e8a3fd1f8464056e49e41a45 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 29 Jul 2017 21:15:59 +0200 Subject: [PATCH 06/10] add test for minSeqNo APIs --- .../index/translog/TranslogTests.java | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 4d2244870da31..ced93e5cd22f9 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -115,6 +115,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; @LuceneTestCase.SuppressFileSystems("ExtrasFS") @@ -2429,8 +2430,8 @@ public void testRollGeneration() throws Exception { assertFileIsPresent(translog, generation + rolls); } - public void testMinGenerationForSeqNo() throws IOException { - final int operations = randomIntBetween(1, 4096); + public void testMinSeqNoBasedAPI() throws IOException { + final int operations = randomIntBetween(1, 512); final List shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList()); Randomness.shuffle(shuffledSeqNos); final List> seqNos = new ArrayList<>(); @@ -2448,30 +2449,50 @@ public void testMinGenerationForSeqNo() throws IOException { } } - Map>> generations = new HashMap<>(); + final Map>> seqNoPerGeneration = new HashMap<>(); + final Map opCountPerGeneration = new HashMap<>(); // one extra roll to make sure that all ops so far are available via a reader and a translog-{gen}.ckp // file in a consistent way, in order to simplify checking code. translog.rollGeneration(); for (long seqNo = 0; seqNo < operations; seqNo++) { final Set> seenSeqNos = new HashSet<>(); final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration; + int expectedSnapshotOps = 0; for (long g = generation; g < translog.currentFileGeneration(); g++) { - if (!generations.containsKey(g)) { + if (!seqNoPerGeneration.containsKey(g)) { final Set> generationSeenSeqNos = new HashSet<>(); + int opCount = 0; final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g))); try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) { TranslogSnapshot snapshot = reader.newSnapshot(); Translog.Operation operation; while ((operation = snapshot.next()) != null) { generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm())); + opCount++; } + assertThat(opCount, equalTo(reader.totalOperations())); + assertThat(opCount, equalTo(checkpoint.numOps)); } - generations.put(g, generationSeenSeqNos); - + opCountPerGeneration.put(g, opCount); + seqNoPerGeneration.put(g, generationSeenSeqNos); + } + final Set> generationSeqNo = seqNoPerGeneration.get(g); + if (generationSeqNo.stream().map(Tuple::v1).max(Long::compareTo).orElse(Long.MIN_VALUE) >= seqNo) { + expectedSnapshotOps += opCountPerGeneration.get(g); + } + seenSeqNos.addAll(generationSeqNo); + } + assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); + int readFromSnapshot = 0; + try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { + assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + assertThat(Tuple.tuple(op.seqNo(), op.primaryTerm()), isIn(seenSeqNos)); + readFromSnapshot++; } - seenSeqNos.addAll(generations.get(g)); } - + assertThat(readFromSnapshot, equalTo(expectedSnapshotOps)); final long seqNoLowerBound = seqNo; final Set> expected = seqNos.stream().filter(t -> t.v1() >= seqNoLowerBound).collect(Collectors.toSet()); seenSeqNos.retainAll(expected); From d5a97fb8c5e6b06b9bbe5a18ec034b42073f5319 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 30 Jul 2017 09:12:29 +0200 Subject: [PATCH 07/10] remove empty closeWhileHandlingException() --- .../elasticsearch/indices/recovery/RecoverySourceHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 181c6c353e373..688a4253054b8 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -158,7 +158,6 @@ public RecoveryResponse recoverToTarget() throws IOException { try { phase1Snapshot = shard.acquireIndexCommit(false); } catch (final Exception e) { - IOUtils.closeWhileHandlingException(); throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } // we set this to unassigned to create a translog roughly according to the retention policy From 0b391fac732ee62988d730f8ddb8aeca6f9d96e1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 30 Jul 2017 09:13:59 +0200 Subject: [PATCH 08/10] restore randomization --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9e528b15acba8..a299503bcaaca 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2484,7 +2484,7 @@ public void testTranslogReplayWithFailure() throws IOException { // since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents // this so we have to disable the check explicitly boolean started = false; - final int numIters = 1;//randomIntBetween(10, 20); + final int numIters = randomIntBetween(10, 20); for (int i = 0; i < numIters; i++) { directory.setRandomIOExceptionRateOnOpen(randomDouble()); directory.setRandomIOExceptionRate(randomDouble()); From a268c4f6701bc1c937eadab971af92f9505dd8e4 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 31 Jul 2017 12:55:53 +0200 Subject: [PATCH 09/10] Jason rocks --- .../elasticsearch/action/ActionListener.java | 28 ------------- .../index/shard/PrimaryReplicaSyncer.java | 40 ++++++++++++++----- 2 files changed, 29 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index e4fe30a636eec..f9fafa9f95a2e 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -69,34 +69,6 @@ public void onFailure(Exception e) { }; } - static ActionListener chain(ActionListener first, ActionListener second) { - return new ActionListener() { - @Override - public void onResponse(Response response) { - try { - first.onResponse(response); - } catch (Exception e) { - onFailure(e); - } - try { - second.onResponse(response); - } catch (Exception e) { - second.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - first.onFailure(e); - } catch (Exception suppressed) { - e.addSuppressed(suppressed); - } - second.onFailure(e); - } - }; - } - /** * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 9313176d9cc3d..ce03b96b1ce41 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -49,8 +49,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.requireNonNull; -import static org.elasticsearch.action.ActionListener.chain; -import static org.elasticsearch.action.ActionListener.wrap; public class PrimaryReplicaSyncer extends AbstractComponent { @@ -80,17 +78,33 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests this.chunkSize = chunkSize; } - public void resync(IndexShard indexShard, ActionListener listener) { + public void resync(final IndexShard indexShard, final ActionListener listener) { + ActionListener resyncListener = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); - listener = chain(wrap(r -> snapshot.close(), e -> { - try { - snapshot.close(); - } catch (IOException e1) { - e.addSuppressed(e1); + resyncListener = new ActionListener() { + @Override + public void onResponse(final ResyncTask resyncTask) { + try { + snapshot.close(); + listener.onResponse(resyncTask); + } catch (final Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + try { + snapshot.close(); + } catch (final Exception inner) { + e.addSuppressed(inner); + } finally { + listener.onFailure(e); + } } - }), listener); + }; ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. @@ -120,9 +134,13 @@ public synchronized Translog.Operation next() throws IOException { } }; resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, - startingSeqNo, listener); + startingSeqNo, resyncListener); } catch (Exception e) { - listener.onFailure(e); + if (resyncListener != null) { + resyncListener.onFailure(e); + } else { + listener.onFailure(e); + } } } From 1c426100828ab90f872da958c0d8194e2df768be Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 31 Jul 2017 13:15:36 +0200 Subject: [PATCH 10/10] feedback --- .../elasticsearch/index/shard/IndexShard.java | 1 - .../index/shard/PrimaryReplicaSyncer.java | 2 +- .../index/translog/Translog.java | 8 +++++--- .../translog/TranslogDeletionPolicy.java | 20 +++++++++---------- .../index/translog/TranslogSnapshot.java | 1 + .../recovery/RecoverySourceHandler.java | 4 ++-- .../snapshots/SnapshotsService.java | 2 +- 7 files changed, 20 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 46a8eba39da47..b1644e5f2196f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1558,7 +1558,6 @@ public void onSettingsChanged() { public Closeable acquireTranslogRetentionLock() { Engine engine = getEngine(); - assert engine.getTranslog() != null : "translog must not be null"; return engine.getTranslog().acquireRetentionLock(); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index ce03b96b1ce41..3716dcaff0fc7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -113,7 +113,7 @@ public void onFailure(final Exception e) { Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override - public void close() throws IOException { + public synchronized void close() throws IOException { snapshot.close(); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index fdb3ffdb8831c..3664b76807818 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -361,6 +361,8 @@ long getMinFileGeneration() { if (readers.isEmpty()) { return current.getGeneration(); } else { + assert readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).get() + .equals(readers.get(0).getGeneration()) : "the first translog isn't the one with the minimum generation:" + readers; return readers.get(0).getGeneration(); } } @@ -610,12 +612,12 @@ private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOExcepti onClose = () -> {}; } else { assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get() - == snapshots[0].generation : "first reader generation of " + snapshots[0].generation + " is not the smallest"; + == snapshots[0].generation : "first reader generation of " + snapshots + " is not the smallest"; onClose = acquireTranslogGenFromDeletionPolicy(snapshots[0].generation); } boolean success = false; try { - Snapshot result = new MultiSnapshot(snapshots, onClose); + Snapshot result = new MultiSnapshot(snapshots, onClose); success = true; return result; } finally { @@ -1598,7 +1600,7 @@ void deleteReaderFiles(TranslogReader reader) { void closeFilesIfNoPendingRetentionLocks() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get() && deletionPolicy.pendingTranslogRefCount() == 0) { - logger.trace("closing files. translog is closed and there are no pending views"); + logger.trace("closing files. translog is closed and there are no pending retention locks"); ArrayList toClose = new ArrayList<>(readers); toClose.add(current); IOUtils.close(toClose); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 5f84d01db47fd..adee4bd9fa9a0 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -22,12 +22,12 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; public class TranslogDeletionPolicy { @@ -43,7 +43,7 @@ public void assertNoOpenTranslogRefs() { } /** - * Records how many views are held against each + * Records how many retention locks are held against each * translog generation */ private final Map translogRefCounts = new HashMap<>(); @@ -62,7 +62,7 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; if (Assertions.ENABLED) { - openTranslogRef = ConcurrentCollections.newConcurrentMap(); + openTranslogRef = new ConcurrentHashMap<>(); } else { openTranslogRef = null; } @@ -85,16 +85,16 @@ public synchronized void setRetentionAgeInMillis(long ageInMillis) { } /** - * acquires the basis generation for a new view or snapshot. Any translog generation above, and including, the returned generation + * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * will not be deleted until the returned {@link Releasable} is closed. */ - synchronized Releasable acquireTranslogGen(final long genForView) { - translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1); + synchronized Releasable acquireTranslogGen(final long translogGen) { + translogRefCounts.computeIfAbsent(translogGen, l -> Counter.newCounter(false)).addAndGet(1); final AtomicBoolean closed = new AtomicBoolean(); assert assertAddTranslogRef(closed); return () -> { if (closed.compareAndSet(false, true)) { - releaseTranslogGenView(genForView); + releaseTranslogGen(translogGen); assert assertRemoveTranslogRef(closed); } }; @@ -120,7 +120,7 @@ synchronized int pendingTranslogRefCount() { /** * releases a generation that was acquired by {@link #acquireTranslogGen(long)} */ - private synchronized void releaseTranslogGenView(long translogGen) { + private synchronized void releaseTranslogGen(long translogGen) { Counter current = translogRefCounts.get(translogGen); if (current == null || current.get() <= 0) { throw new IllegalArgumentException("translog gen [" + translogGen + "] wasn't acquired"); @@ -138,7 +138,7 @@ private synchronized void releaseTranslogGenView(long translogGen) { * @param writer current translog writer */ synchronized long minTranslogGenRequired(List readers, TranslogWriter writer) throws IOException { - long minByView = getMinTranslogGenRequiredByLocks(); + long minByLocks = getMinTranslogGenRequiredByLocks(); long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); final long minByAgeAndSize; @@ -148,7 +148,7 @@ synchronized long minTranslogGenRequired(List readers, TranslogW } else { minByAgeAndSize = Math.max(minByAge, minBySize); } - return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery)); + return Math.min(minByAgeAndSize, Math.min(minByLocks, minTranslogGenerationForRecovery)); } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 42b8d239a3a60..656772fa8169d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -100,6 +100,7 @@ public String toString() { ", position=" + position + ", estimateTotalOperations=" + totalOperations + ", length=" + length + + ", generation=" + generation + ", reusableBuffer=" + reusableBuffer + '}'; } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 688a4253054b8..a5c1d9cf371e2 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -142,7 +142,7 @@ public RecoveryResponse recoverToTarget() throws IOException { assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; }); - try (Closeable translogRetentionLock = shard.acquireTranslogRetentionLock()) { + try (Closeable ignored = shard.acquireTranslogRetentionLock()) { final Translog translog = shard.getTranslog(); @@ -236,7 +236,7 @@ boolean isTranslogReadyForSequenceNumberBasedRecovery() throws IOException { logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); - try(Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 0e10f599462d3..037db4d5caf66 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -249,7 +249,7 @@ public ClusterState execute(ClusterState currentState) { } SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots == null || snapshots.entries().isEmpty()) { - // Store newSnapshotFromGen here to be processed in clusterStateProcessed + // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); List snapshotIndices = repositoryData.resolveNewIndices(indices);