Skip to content

Conversation

@bleskes
Copy link
Contributor

@bleskes bleskes commented Jul 30, 2017

During peer recoveries, we need to copy over lucene files and replay the operations they miss from the source translog. Guaranteeing that translog files are not cleaned up has seen many iterations overtime. Back in the old 1.0 days, recoveries went through the Engine and actively prevented both translog cleaning and lucene commits. We then moved to a notion called Translog Views, which allowed the recovery code to "acquire" a view into the translog which is then guaranteed to be kept around until the view is closed. The Engine code was free to commit lucene and do what it ever it wanted without coordinating with recoveries. Translog file deletion logic was based on reference counting on the file level. Those counters were incremented when a view was acquired but also when the view was used to create a Snapshot that allowed you to read operations from the files. At some point we removed the file based counting complexity in favor of constructs on the Translog level that just keep track of "open" views and the minimum translog generation they refer to. To do so, Views had to be kept around until the last snapshot that was made from them was consumed. This was fine in recovery code but lead to a subtle bug in the Primary Replica Resyncer.

Concurrently, we have developed the notion of a TranslogDeletionPolicy which is responsible for the liveness aspect of translog files. This class makes it very simple to take translog Snapshot into account for keep translog files around, allowing people that just need a snapshot to just take a snapshot and not worry about views and such. Recovery code which actually does need a view can now prevent trimming by acquiring a simple retention lock (a Closable). This removes the need for the notion of a View.

@bleskes bleskes added :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. :Translog >enhancement v6.0.0 v6.1.0 v7.0.0 labels Jul 30, 2017
@bleskes bleskes requested a review from jasontedor July 30, 2017 11:46
Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it looks good. I think that main point of contention is abstraction for the chained listeners in the primary/replica resync code. I think it's unnecessary and makes the code hard to follow, how it was (and what I propose) is clearer. Also, I left a comment about the InternalEngineTests although I'm not sure if anything can be done (I leave that to you to figure out). The rest I think are minor comments.

};
}

static <Response> ActionListener<Response> chain(ActionListener<Response> first, ActionListener<Response> second) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs please (although I'm unsure if this is really needed, see also my comment at the call site).

final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
listener = chain(wrap(r -> snapshot.close(), e -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I recoil when I see this wrapping and wrapping, I find it harder than necessary to follow. I'd like to avoid adding an abstraction used in exactly one place, we can add it later if we see the same pattern arise a few more times. Right now, I think what is already there is more straightforward and it makes it immediately clear what is going on:

diff --git a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
index 9313176d9c..ce0d36c7cc 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java
@@ -80,17 +80,33 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
         this.chunkSize = chunkSize;
     }
 
-    public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) {
+    public void resync(IndexShard indexShard, final ActionListener<ResyncTask> listener) {
+        ActionListener<ResyncTask> resyncListener = null;
         try {
             final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
             Translog.Snapshot snapshot = indexShard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
-            listener = chain(wrap(r -> snapshot.close(), e -> {
-                try {
-                    snapshot.close();
-                } catch (IOException e1) {
-                    e.addSuppressed(e1);
+            resyncListener = new ActionListener<ResyncTask>() {
+                @Override
+                public void onResponse(final ResyncTask resyncTask) {
+                    try {
+                        snapshot.close();
+                        listener.onResponse(resyncTask);
+                    } catch (final Exception e) {
+                        onFailure(e);
+                    }
+                }
+
+                @Override
+                public void onFailure(final Exception e) {
+                    try {
+                        snapshot.close();
+                    } catch (final IOException inner) {
+                        e.addSuppressed(inner);
+                    } finally {
+                        listener.onFailure(e);
+                    }
                 }
-            }), listener);
+            };
             ShardId shardId = indexShard.shardId();
 
             // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
@@ -120,9 +136,13 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
                 }
             };
             resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
-                startingSeqNo, listener);
+                startingSeqNo, resyncListener);
         } catch (Exception e) {
-            listener.onFailure(e);
+            if (resyncListener != null) {
+                resyncListener.onFailure(e);
+            } else {
+                listener.onFailure(e);
+            }
         }
     }

This is what I would prefer to see, basically undoing the change that you're proposing here. It makes it straightforward to see what is happening here.

Note also that I do not like the reassignment to listener, that also makes the code hard to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These things are subjecting. I personally prefer the wrapping because it allows to ignore the crud of wrapping and focus on the functionality. Same goes for not re-assigning the listener (I prefer my version as it the complexities are dealt with in the same place, when we wrap). I don't feel strongly about it and will happily go along with your version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree it allows focusing on the functionality, to chase down what is really being executed here you have to run off and grok two methods, it's really not straightforward at all. With the listener defined front and center you can immediately see what is happening, no need to chase anything down. Sorry, I feel very strongly about this one.

// Also fail the resync early if the shard is shutting down
Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {

@Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be synchronized too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if (snapshots.length == 0) {
onClose = () -> {};
} else {
assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a stronger condition here? That the snapshot generations are in sorted order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so? acquiring the min gen from the translog deletion policy will keep all the other ones around which is what we care about here?

onClose = () -> {};
} else {
assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get()
== snapshots[0].generation : "first reader generation of " + snapshots[0].generation + " is not the smallest";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead output the full array in the assertion message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

*/
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByView = getMinTranslogGenRequiredByViews();
long minByView = getMinTranslogGenRequiredByLocks();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove mention of views?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get() && deletionPolicy.pendingViewsCount() == 0) {
if (closed.get() && deletionPolicy.pendingTranslogRefCount() == 0) {
logger.trace("closing files. translog is closed and there are no pending views");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove mention of views from this trace message: closing files; translog is closed and there are no pending retention locks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null || snapshots.entries().isEmpty()) {
// Store newSnapshot here to be processed in clusterStateProcessed
// Store newSnapshotFromGen here to be processed in clusterStateProcessed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(


try {
prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo));
prepareTargetForTranslog(translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid invoking Translog#estimateTotalOperationsFromMinSeq twice here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could have changed? prepare target has a network call in it?

while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
try(Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space between try and (.

@bleskes
Copy link
Contributor Author

bleskes commented Jul 31, 2017

@jasontedor thx. I addressed all your feedback. Can you take another look?

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@bleskes bleskes merged commit 9d10ffd into elastic:master Jul 31, 2017
@bleskes bleskes deleted the translog_closable_snaps branch July 31, 2017 15:30
@bleskes
Copy link
Contributor Author

bleskes commented Jul 31, 2017

Thanks @jasontedor

bleskes added a commit that referenced this pull request Aug 1, 2017
During peer recoveries, we need to copy over lucene files and replay the operations they miss from the source translog. Guaranteeing that translog files are not cleaned up has seen many iterations overtime. Back in the old 1.0 days, recoveries went through the Engine and actively prevented both translog cleaning and lucene commits. We then moved to a notion called Translog Views, which allowed the recovery code to "acquire" a view into the translog which is then guaranteed to be kept around until the view is closed. The Engine code was free to commit lucene and do what it ever it wanted without coordinating with recoveries. Translog file deletion logic was based on reference counting on the file level. Those counters were incremented when a view was acquired but also when the view was used to create a `Snapshot` that allowed you to read operations from the files. At some point we removed the file based counting complexity in favor of constructs on the Translog level that just keep track of "open" views and the minimum translog generation they refer to. To do so, Views had to be kept around until the last snapshot that was made from them was consumed. This was fine in recovery code but lead to [a subtle bug](#25862) in the [Primary Replica Resyncer](#25862). 

Concurrently, we have developed the notion of a `TranslogDeletionPolicy` which is responsible for the liveness aspect of translog files. This class makes it very simple to take translog Snapshot into account for keep translog files around, allowing people that just need a snapshot to just take a snapshot and not worry about views and such. Recovery code which actually does need a view can now prevent trimming by acquiring a simple retention lock (a `Closable`). This removes the need for the notion of a View.
bleskes added a commit that referenced this pull request Aug 1, 2017
During peer recoveries, we need to copy over lucene files and replay the operations they miss from the source translog. Guaranteeing that translog files are not cleaned up has seen many iterations overtime. Back in the old 1.0 days, recoveries went through the Engine and actively prevented both translog cleaning and lucene commits. We then moved to a notion called Translog Views, which allowed the recovery code to "acquire" a view into the translog which is then guaranteed to be kept around until the view is closed. The Engine code was free to commit lucene and do what it ever it wanted without coordinating with recoveries. Translog file deletion logic was based on reference counting on the file level. Those counters were incremented when a view was acquired but also when the view was used to create a `Snapshot` that allowed you to read operations from the files. At some point we removed the file based counting complexity in favor of constructs on the Translog level that just keep track of "open" views and the minimum translog generation they refer to. To do so, Views had to be kept around until the last snapshot that was made from them was consumed. This was fine in recovery code but lead to [a subtle bug](#25862) in the [Primary Replica Resyncer](#25862). 

Concurrently, we have developed the notion of a `TranslogDeletionPolicy` which is responsible for the liveness aspect of translog files. This class makes it very simple to take translog Snapshot into account for keep translog files around, allowing people that just need a snapshot to just take a snapshot and not worry about views and such. Recovery code which actually does need a view can now prevent trimming by acquiring a simple retention lock (a `Closable`). This removes the need for the notion of a View.
@lcawl lcawl removed the v6.1.0 label Dec 12, 2017
@clintongormley clintongormley added :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Translog :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. labels Feb 13, 2018
@jpountz jpountz removed :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. v7.0.0 labels Jan 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement v6.0.0-beta1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants