Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/101184.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 101184
summary: More robust timeout for repo analysis
area: Snapshot/Restore
type: bug
issues:
- 101182
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,5 @@ setup:
- match: { status: 500 }
- match: { error.type: repository_verification_exception }
- match: { error.reason: "/.*test_repo_slow..analysis.failed.*/" }
- match: { error.root_cause.0.type: receive_timeout_transport_exception }
- match: { error.root_cause.0.type: repository_verification_exception }
- match: { error.root_cause.0.reason: "/.*test_repo_slow..analysis.timed.out.after..1s.*/" }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -347,8 +349,26 @@ public BytesReference onCompareAndExchange(BytesRegister register, BytesReferenc
}
}

private RepositoryAnalyzeAction.Response analyseRepository(RepositoryAnalyzeAction.Request request) {
return client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
public void testTimesOutSpinningRegisterAnalysis() {
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
request.timeout(TimeValue.timeValueMillis(between(1, 1000)));

blobStore.setDisruption(new Disruption() {
@Override
public boolean compareAndExchangeReturnsWitness() {
return false;
}
});
final var exception = expectThrows(RepositoryVerificationException.class, () -> analyseRepository(request));
assertThat(exception.getMessage(), containsString("analysis failed"));
assertThat(
asInstanceOf(RepositoryVerificationException.class, exception.getCause()).getMessage(),
containsString("analysis timed out")
);
}

private void analyseRepository(RepositoryAnalyzeAction.Request request) {
client().execute(RepositoryAnalyzeAction.INSTANCE, request).actionGet(30L, TimeUnit.SECONDS);
}

private static void assertPurpose(OperationPurpose purpose) {
Expand Down Expand Up @@ -464,6 +484,10 @@ default boolean createBlobOnAbort() {
return false;
}

default boolean compareAndExchangeReturnsWitness() {
return true;
}

default BytesReference onCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) {
return register.compareAndExchange(expected, updated);
}
Expand Down Expand Up @@ -637,8 +661,12 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
assertPurpose(purpose);
final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated)));
if (disruption.compareAndExchangeReturnsWitness()) {
final var register = registers.computeIfAbsent(key, ignored -> new BytesRegister());
listener.onResponse(OptionalBytesReference.of(disruption.onCompareAndExchange(register, expected, updated)));
} else {
listener.onResponse(OptionalBytesReference.MISSING);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
Expand All @@ -22,6 +23,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -364,6 +366,7 @@ public static class AsyncAction {
private final DiscoveryNodes discoveryNodes;
private final LongSupplier currentTimeMillisSupplier;
private final ActionListener<Response> listener;
private final SubscribableListener<Void> cancellationListener;
private final long timeoutTimeMillis;

// choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction
Expand Down Expand Up @@ -394,15 +397,24 @@ public AsyncAction(
this.discoveryNodes = discoveryNodes;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.timeoutTimeMillis = currentTimeMillisSupplier.getAsLong() + request.getTimeout().millis();
this.listener = listener;

this.cancellationListener = new SubscribableListener<>();
this.listener = ActionListener.runBefore(listener, () -> cancellationListener.onResponse(null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to think through the difference scenarios how cancellationListener can already be completed before we invoke onResponse(null) here. I think they are all OK. I am writing it down to be explicit and maybe you can double check it as well.

  1. The request fails due to timeout, i.e. cancellationListener is already timed out and this runs right before we are going to call listener.onFailure. This is fine because SubscribableListener accepts only the first completion and silently ignores all future results.
  2. We are about to call listener.onResponse for success while cancellationListener times out concurrently. The timeout will set the failure and try to cancel the tasks. This is fine because we don't check the failure object anymore and cancelling completed or non-existing tasks seem to be a noop.
  3. Timeout can be called even after the listener is completed. This is fine since the timeout will be after cancellationListener.onResponse(null) and completing a SubscribableListener more than once is ignored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 sounds about right, yes.


responses = new ArrayList<>(request.blobCount);
}

private void fail(Exception e) {
private boolean setFirstFailure(Exception e) {
if (failure.compareAndSet(null, e)) {
transportService.getTaskManager().cancelTaskAndDescendants(task, "task failed", false, ActionListener.noop());
return true;
} else {
return false;
}
}

private void fail(Exception e) {
if (setFirstFailure(e) == false) {
if (innerFailures.tryAcquire()) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof TaskCancelledException || cause instanceof ReceiveTimeoutTransportException) {
Expand All @@ -424,24 +436,34 @@ private boolean isRunning() {
}

if (task.isCancelled()) {
failure.compareAndSet(null, new RepositoryVerificationException(request.repositoryName, "verification cancelled"));
setFirstFailure(new RepositoryVerificationException(request.repositoryName, "verification cancelled"));
// if this CAS failed then we're failing for some other reason, nbd; also if the task is cancelled then its descendants are
// also cancelled, so no further action is needed either way.
return false;
}

if (timeoutTimeMillis < currentTimeMillisSupplier.getAsLong()) {
if (failure.compareAndSet(
null,
new RepositoryVerificationException(request.repositoryName, "analysis timed out after [" + request.getTimeout() + "]")
)) {
transportService.getTaskManager().cancelTaskAndDescendants(task, "timed out", false, ActionListener.noop());
}
// if this CAS failed then we're already failing for some other reason, nbd
return false;
return true;
}

private class CheckForCancelListener implements ActionListener<Void> {
@Override
public void onResponse(Void unused) {
// task complete, nothing to do
}

return true;
@Override
public void onFailure(Exception e) {
assert e instanceof ElasticsearchTimeoutException : e;
if (isRunning()) {
// if this CAS fails then we're already failing for some other reason, nbd
setFirstFailure(
new RepositoryVerificationException(
request.repositoryName,
"analysis timed out after [" + request.getTimeout() + "]"
)
);
}
}
}

public void run() {
Expand All @@ -450,6 +472,9 @@ public void run() {

logger.info("running analysis of repository [{}] using path [{}]", request.getRepositoryName(), blobPath);

cancellationListener.addTimeout(request.getTimeout(), repository.threadPool(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cluster has many nodes and the repo analysis is configured to have high concurrency, would it be expensive to cancel the tasks on the scheduler thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh perhaps, but I wouldn't expect it to be a problem because (a) I don't see folks changing the concurrency very much and (b) even at 1000 nodes I don't think it'd be a huge deal, the cancel messages are tiny. We cancel things for other reasons on low-latency threads, e.g. RestCancellableNodeClient.

cancellationListener.addListener(new CheckForCancelListener());

final Random random = new Random(request.getSeed());
final List<DiscoveryNode> nodes = getSnapshotNodes(discoveryNodes);

Expand Down Expand Up @@ -536,7 +561,7 @@ private void runBlobAnalysis(Releasable ref, final BlobAnalyzeAction.Request req
BlobAnalyzeAction.NAME,
request,
task,
TransportRequestOptions.timeout(TimeValue.timeValueMillis(timeoutTimeMillis - currentTimeMillisSupplier.getAsLong())),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(BlobAnalyzeAction.Response response) {
Expand Down