Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -144,8 +146,10 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e ->
final Consumer<Exception> onFailure = e -> {
Transports.assertNotTransportThread("failure of recovery from " + shard.routingEntry() + " to " + request.targetNode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be assert Transports.assertNotTransportThread(...).

IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
};

runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
Expand Down Expand Up @@ -206,7 +210,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
// If the target previously had a copy of this shard then a file-based recovery might move its global
// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a
// new one later on in the recovery.
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(), deleteRetentionLeaseStep);
shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,
deleteRetentionLeaseStep, false));
} catch (RetentionLeaseNotFoundException e) {
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
deleteRetentionLeaseStep.onResponse(null);
Expand All @@ -218,6 +224,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
}

deleteRetentionLeaseStep.whenComplete(ignored -> {
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
phase1(safeCommitRef.getIndexCommit(), shard.getLastKnownGlobalCheckpoint(), () -> estimateNumOps, sendFileStep);
}, onFailure);

Expand All @@ -235,8 +242,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
// conservative estimate of the GCP for creating the lease. TODO use the actual GCP once it's appropriate
final long globalCheckpoint = startingSeqNo - 1;
// blindly create the lease. TODO integrate this with the recovery process
shard.addPeerRecoveryRetentionLease(
request.targetNode().getId(), globalCheckpoint, establishRetentionLeaseStep);
shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), globalCheckpoint,
new ThreadedActionListener<>(logger, shard.getThreadPool(),
ThreadPool.Names.GENERIC, establishRetentionLeaseStep, false));
} catch (RetentionLeaseAlreadyExistsException e) {
logger.debug("peer-recovery retention lease already exists", e);
establishRetentionLeaseStep.onResponse(null);
Expand All @@ -249,12 +257,14 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
}, onFailure);

establishRetentionLeaseStep.whenComplete(r -> {
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
}, onFailure);

prepareEngineStep.whenComplete(prepareEngineTime -> {
Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
Expand Down