Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedSupplier;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -180,4 +181,16 @@ protected void innerOnFailure(Exception e) {
}
};
}

/**
* Completes the given listener with the result from the provided supplier accordingly.
* This method is mainly used to complete a listener with a block of synchronous code.
*/
static <Response> void completeWith(ActionListener<Response> listener, CheckedSupplier<Response, ? extends Exception> supplier) {
try {
listener.onResponse(supplier.get());
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i

private final ActionListener<? super Response> listener;
private final Writeable.Reader<Response> reader;
private final String executor;

public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader, String executor) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}

public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}

@Override
Expand All @@ -55,7 +61,7 @@ public void handleException(TransportException e) {

@Override
public String executor() {
return ThreadPool.Names.SAME;
return executor;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,12 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery

@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().finalizeRecovery(request.globalCheckpoint());
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -137,6 +139,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
throw e;
});
final Consumer<Exception> onFailure = e ->
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));

runUnderPrimaryPermit(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
Expand Down Expand Up @@ -235,16 +240,21 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}

finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint);
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
IOUtils.close(resources);
wrappedListener.onResponse(
new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
final StepListener<Void> finalizeStep = new StepListener<>();
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
finalizeStep.whenComplete(r -> {
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(),
sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis())
);
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
} finally {
IOUtils.close(resources);
}
}, onFailure);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
}
Expand Down Expand Up @@ -585,10 +595,7 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
}

/*
* finalizes the recovery process
*/
public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException {
void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
Expand All @@ -604,21 +611,26 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getGlobalCheckpoint();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);

if (request.isPrimaryRelocation()) {
logger.trace("performing relocation hand-off");
// TODO: make relocated async
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext));
/*
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
}
stopWatch.stop();
logger.trace("finalizing recovery took [{}]", stopWatch.totalTime());
listener.onResponse(null);
}, listener::onFailure);
}

static final class SendSnapshotResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,15 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
}

@Override
public void finalizeRecovery(final long globalCheckpoint) throws IOException {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.finalizeRecovery();
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public interface RecoveryTargetHandler {
* updates the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param listener the listener which will be notified when this method is completed
*/
void finalizeRecovery(long globalCheckpoint) throws IOException;
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);

/**
* Blockingly waits for cluster state with at least clusterStateVersion to be available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportFuture;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -85,11 +86,12 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
}

@Override
public void finalizeRecovery(final long globalCheckpoint) {
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class ActionListenerTests extends ESTestCase {

Expand Down Expand Up @@ -201,4 +205,16 @@ public void onFailure(Exception e) {
assertThat(onFailureTimes.get(), equalTo(1));
}
}

public void testCompleteWith() {
PlainActionFuture<Integer> onResponseListener = new PlainActionFuture<>();
ActionListener.completeWith(onResponseListener, () -> 100);
assertThat(onResponseListener.isDone(), equalTo(true));
assertThat(onResponseListener.actionGet(), equalTo(100));

PlainActionFuture<Integer> onFailureListener = new PlainActionFuture<>();
ActionListener.completeWith(onFailureListener, () -> { throw new IOException("not found"); });
assertThat(onFailureListener.isDone(), equalTo(true));
assertThat(expectThrows(ExecutionException.class, onFailureListener::get).getCause(), instanceOf(IOException.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
Expand Down Expand Up @@ -847,13 +848,13 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
}

@Override
public void finalizeRecovery(long globalCheckpoint) throws IOException {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
super.finalizeRecovery(globalCheckpoint);
super.finalizeRecovery(globalCheckpoint, listener);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2524,9 +2524,8 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
}

@Override
public void finalizeRecovery(long globalCheckpoint) throws IOException {
super.finalizeRecovery(globalCheckpoint);
assertListenerCalled.accept(replica);
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica)));
}
}, false, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra
}

@Override
public void finalizeRecovery(long globalCheckpoint) {
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
}

@Override
Expand Down