From 39e7c3091244ac1d77d8c482ef870e95c428d946 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 16 Jan 2017 10:46:12 -0700 Subject: [PATCH] Change certain replica failures not to fail the replica shard This changes the way that replica failures are handled such that not all failures will cause the replica shard to be failed or marked as stale. In some cases such as refresh operations, or global checkpoint syncs, it is "okay" for the operation to fail without the shard being failed (because no data is out of sync). In these cases, instead of failing the shard we should simply fail the operation, and, in the event it is a user-facing operation, return a 5xx response code including the shard-specific failures. This was accomplished by having two forms of the `Replicas` proxy, one that is for non-write operations that does not fail the shard, and one that is for write operations that will fail the shard when an operation fails. Relates to #10708 --- .../resources/checkstyle_suppressions.xml | 1 - .../support/broadcast/BroadcastResponse.java | 28 +- .../replication/ReplicationOperation.java | 54 ++-- .../TransportReplicationAction.java | 68 +++-- .../replication/TransportWriteAction.java | 65 +++++ .../admin/indices/RestRefreshAction.java | 2 +- .../ReplicationOperationTests.java | 22 +- .../TransportReplicationActionTests.java | 199 +++++++------- .../TransportWriteActionTests.java | 254 ++++++++++++++++++ .../gateway/RecoveryFromGatewayIT.java | 4 +- .../ESIndexLevelReplicationTestCase.java | 17 +- .../migration/migrate_6_0/rest.asciidoc | 10 +- 12 files changed, 533 insertions(+), 191 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index a800411cc630d..d5f08cb6a9adf 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -167,7 +167,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java index 54d6220fd342c..e608e8e0ab7d6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java @@ -23,6 +23,8 @@ import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; import java.util.List; @@ -42,11 +44,22 @@ public class BroadcastResponse extends ActionResponse { public BroadcastResponse() { } - public BroadcastResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + public BroadcastResponse(int totalShards, int successfulShards, int failedShards, + List shardFailures) { + assertNoShardNotAvailableFailures(shardFailures); this.totalShards = totalShards; this.successfulShards = successfulShards; this.failedShards = failedShards; - this.shardFailures = shardFailures == null ? EMPTY : shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]); + this.shardFailures = shardFailures == null ? EMPTY : + shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]); + } + + private void assertNoShardNotAvailableFailures(List shardFailures) { + if (shardFailures != null) { + for (Object e : shardFailures) { + assert (e instanceof ShardNotFoundException) == false : "expected no ShardNotFoundException failures, but got " + e; + } + } } /** @@ -70,6 +83,17 @@ public int getFailedShards() { return failedShards; } + /** + * The REST status that should be used for the response + */ + public RestStatus getStatus() { + if (failedShards > 0) { + return shardFailures[0].status(); + } else { + return RestStatus.OK; + } + } + /** * The list of shard failures exception. */ diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 6a3d217fcf6c9..0d1a632ad928f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -147,7 +147,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set decPendingAndFinishIfNeeded() @@ -209,14 +209,9 @@ public void onFailure(Exception replicaException) { shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - logger.warn( - (org.apache.logging.log4j.util.Supplier) - () -> new ParameterizedMessage("[{}] {}", shard.shardId(), message), replicaException); - replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException, - ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, - throwable -> decPendingAndFinishIfNeeded() - ); + replicasProxy.failShardIfNeeded(shard, replicaRequest.primaryTerm(), message, + replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } } }); @@ -314,10 +309,13 @@ private void finishAsFailed(Exception exception) { } } + /** + * An encapsulation of an operation that is to be performed on the primary shard + */ public interface Primary< - Request extends ReplicationRequest, - ReplicaRequest extends ReplicationRequest, - PrimaryResultT extends PrimaryResult + RequestT extends ReplicationRequest, + ReplicaRequestT extends ReplicationRequest, + PrimaryResultT extends PrimaryResult > { /** @@ -338,7 +336,7 @@ public interface Primary< * @param request the request to perform * @return the request to send to the repicas */ - PrimaryResultT perform(Request request) throws Exception; + PrimaryResultT perform(RequestT request) throws Exception; /** @@ -355,7 +353,10 @@ public interface Primary< long localCheckpoint(); } - public interface Replicas> { + /** + * An encapsulation of an operation that will be executed on the replica shards, if present. + */ + public interface Replicas> { /** * performs the the given request on the specified replica @@ -364,24 +365,29 @@ public interface Replicas listener); + void performOn(ShardRouting replica, RequestT replicaRequest, ActionListener listener); /** - * Fail the specified shard, removing it from the current set of active shards + * Fail the specified shard if needed, removing it from the current set + * of active shards. Whether a failure is needed is left up to the + * implementation. + * * @param replica shard to fail * @param primaryTerm the primary term of the primary shard when requesting the failure * @param message a (short) description of the reason * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed * @param onSuccess a callback to call when the shard has been successfully removed from the active set. * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted -* by the master. + * by the master. * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the */ - void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure); /** - * Marks shard copy as stale, removing its allocation id from the set of in-sync allocation ids. + * Marks shard copy as stale if needed, removing its allocation id from + * the set of in-sync allocation ids. Whether marking as stale is needed + * is left up to the implementation. * * @param shardId shard id * @param allocationId allocation id to remove from the set of in-sync allocation ids @@ -391,8 +397,8 @@ void failShard(ShardRouting replica, long primaryTerm, String message, Exception * by the master. * @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored. */ - void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure); } /** @@ -422,13 +428,13 @@ public RetryOnPrimaryException(StreamInput in) throws IOException { } } - public interface PrimaryResult> { + public interface PrimaryResult> { /** * @return null if no operation needs to be sent to a replica * (for example when the operation failed on the primary due to a parsing exception) */ - @Nullable R replicaRequest(); + @Nullable RequestT replicaRequest(); void setShardInfo(ReplicationResponse.ShardInfo shardInfo); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 32f5b8f0f2cb7..7190879976811 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -99,15 +99,15 @@ public abstract class TransportReplicationAction< private final TransportService transportService; protected final ClusterService clusterService; + protected final ShardStateAction shardStateAction; private final IndicesService indicesService; - private final ShardStateAction shardStateAction; private final TransportRequestOptions transportOptions; private final String executor; // package private for testing private final String transportReplicaAction; private final String transportPrimaryAction; - private final ReplicasProxy replicasProxy; + private final ReplicationOperation.Replicas replicasProxy; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, @@ -135,7 +135,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportOptions = transportOptions(); - this.replicasProxy = new ReplicasProxy(); + this.replicasProxy = newReplicasProxy(); } @Override @@ -148,6 +148,10 @@ protected void doExecute(Task task, Request request, ActionListener li new ReroutePhase((ReplicationTask) task, request, listener).run(); } + protected ReplicationOperation.Replicas newReplicasProxy() { + return new ReplicasProxy(); + } + protected abstract Response newResponseInstance(); /** @@ -369,8 +373,7 @@ protected ReplicationOperation> listener, PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { return new ReplicationOperation<>(request, primaryShardReference, listener, - executeOnReplicas, replicasProxy, clusterService::state, logger, actionName - ); + executeOnReplicas, replicasProxy, clusterService::state, logger, actionName); } } @@ -1030,7 +1033,13 @@ public String allocationId() { } } - final class ReplicasProxy implements ReplicationOperation.Replicas { + /** + * The {@code ReplicasProxy} is an implementation of the {@code Replicas} + * interface that performs the actual {@code ReplicaRequest} on the replica + * shards. It also encapsulates the logic required for failing the replica + * if deemed necessary as well as marking it as stale when needed. + */ + class ReplicasProxy implements ReplicationOperation.Replicas { @Override public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener listener) { @@ -1041,45 +1050,28 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen return; } final ConcreteShardRequest concreteShardRequest = - new ConcreteShardRequest<>(request, replica.allocationId().getId()); + new ConcreteShardRequest<>(request, replica.allocationId().getId()); sendReplicaRequest(concreteShardRequest, node, listener); } @Override - public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + // This does not need to fail the shard. The idea is that this + // is a non-write operation (something like a refresh or a global + // checkpoint sync) and therefore the replica should still be + // "alive" if it were to fail. + onSuccess.run(); } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); - } - - private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ShardStateAction.Listener() { - @Override - public void onSuccess() { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + // This does not need to make the shard stale. The idea is that this + // is a non-write operation (something like a refresh or a global + // checkpoint sync) and therefore the replica should still be + // "alive" if it were to be marked as stale. + onSuccess.run(); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 8569b28257f9f..10f8741ecccb6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -20,29 +20,38 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import org.apache.logging.log4j.core.pattern.ConverterKeys; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -63,6 +72,11 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe indexNameExpressionResolver, request, replicaRequest, executor); } + @Override + protected ReplicationOperation.Replicas newReplicasProxy() { + return new WriteActionReplicasProxy(); + } + /** * Called on the primary with a reference to the primary {@linkplain IndexShard} to modify. * @@ -311,4 +325,55 @@ void run() { } } } + + /** + * A proxy for write operations that need to be performed on the + * replicas, where a failure to execute the operation should fail + * the replica shard and/or mark the replica as stale. + * + * This extends {@code TransportReplicationAction.ReplicasProxy} to do the + * failing and stale-ing. + */ + class WriteActionReplicasProxy extends ReplicasProxy { + + @Override + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + + logger.warn((org.apache.logging.log4j.util.Supplier) + () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); + shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, + createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + + @Override + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null, + createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + + public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, + final Consumer onIgnoredFailure) { + return new ShardStateAction.Listener() { + @Override + public void onSuccess() { + onSuccess.run(); + } + + @Override + public void onFailure(Exception shardFailedError) { + if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { + onPrimaryDemoted.accept(shardFailedError); + } else { + // these can occur if the node is shutting down and are okay + // any other exception here is not expected and merits investigation + assert shardFailedError instanceof TransportException || + shardFailedError instanceof NodeClosedException : shardFailedError; + onIgnoredFailure.accept(shardFailedError); + } + } + }; + } + } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java index c213f3be18b6b..7b317ab404e48 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java @@ -60,7 +60,7 @@ public RestResponse buildResponse(RefreshResponse response, XContentBuilder buil builder.startObject(); buildBroadcastShardsHeader(builder, request, response); builder.endObject(); - return new BytesRestResponse(OK, builder); + return new BytesRestResponse(response.getStatus(), builder); } }); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index a701193d22042..459bafd3af2f1 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -190,11 +190,11 @@ public void testDemotedPrimary() throws Exception { final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean(); final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) { @Override - public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, + Consumer onIgnoredFailure) { if (testPrimaryDemotedOnStaleShardCopies) { - super.failShard(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.failShardIfNeeded(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); } else { assertThat(replica, equalTo(failedReplica)); onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); @@ -202,12 +202,12 @@ public void failShard(ShardRouting replica, long primaryTerm, String message, Ex } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (testPrimaryDemotedOnStaleShardCopies) { onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); } else { - super.markShardCopyAsStale(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.markShardCopyAsStaleIfNeeded(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure); } } }; @@ -486,8 +486,8 @@ public void performOn(ShardRouting replica, Request request, ActionListener onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); } @@ -503,8 +503,8 @@ public void failShard(ShardRouting replica, long primaryTerm, String message, Ex } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (markedAsStaleCopies.add(allocationId) == false) { fail("replica [" + allocationId + "] was marked as stale twice"); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index c2c484fe3e62c..1bd943704707d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -135,7 +135,7 @@ public static R resolveRequest(TransportRequest r private ClusterService clusterService; private TransportService transportService; private CapturingTransport transport; - private Action action; + private TestAction action; private ShardStateAction shardStateAction; /* * @@ -159,7 +159,7 @@ public void setUp() throws Exception { transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); - action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool); + action = new TestAction(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool); } @After @@ -185,9 +185,10 @@ void assertListenerThrows(String msg, PlainActionFuture listener, Class listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { + TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks", + transportService, clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel globalBlockLevel() { return ClusterBlockLevel.WRITE; @@ -197,7 +198,7 @@ protected ClusterBlockLevel globalBlockLevel() { ClusterBlocks.Builder block = ClusterBlocks.builder() .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class); assertPhase(task, "failed"); @@ -226,7 +227,7 @@ protected ClusterBlockLevel globalBlockLevel() { ClusterBlockException.class); assertIndexShardUninitialized(); - action = new Action(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) { + action = new TestAction(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel globalBlockLevel() { return null; @@ -253,8 +254,8 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(shardId).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + PlainActionFuture listener = new PlainActionFuture<>(); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class); assertPhase(task, "failed"); @@ -301,8 +302,8 @@ public void testNoRerouteOnStaleClusterState() throws InterruptedException, Exec logger.debug("--> relocation ongoing state:\n{}", clusterService.state()); Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1); - PlainActionFuture listener = new PlainActionFuture<>(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); + PlainActionFuture listener = new PlainActionFuture<>(); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); reroutePhase.run(); assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class); assertTrue(request.isRetrySet.compareAndSet(true, false)); @@ -340,10 +341,10 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException { randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED)); logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class); assertPhase(task, "failed"); @@ -364,17 +365,18 @@ public void testClosedIndexOnReroute() throws InterruptedException { ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index))); logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null; - Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { + TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks", transportService, + clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel indexBlockLevel() { return indexBlockLevel; } }; - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); if (indexBlockLevel == ClusterBlockLevel.WRITE) { assertListenerThrows("must throw block exception", listener, ClusterBlockException.class); @@ -398,10 +400,10 @@ public void testStalePrimaryShardOnReroute() throws InterruptedException { } else { request.timeout("1h"); } - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests, arrayWithSize(1)); @@ -452,9 +454,9 @@ public void testRoutePhaseExecutesRequest() { final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId(); Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertThat(request.shardId(), equalTo(shardId)); logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); @@ -479,7 +481,7 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); setState(clusterService, state); Request request = new Request(shardId).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); @@ -492,11 +494,12 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw } action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation> - createReplicatedOperation(Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation( + Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -521,7 +524,7 @@ public void execute() throws Exception { assertThat(requests.size(), equalTo(1)); assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]")); assertPhase(task, "primary_delegation"); - transport.handleResponse(requests.get(0).requestId, new Response()); + transport.handleResponse(requests.get(0).requestId, new TestResponse()); assertTrue(listener.isDone()); listener.get(); assertPhase(task, "finished"); @@ -539,16 +542,17 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build(); setState(clusterService, state); Request request = new Request(shardId).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation> - createReplicatedOperation(Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation( + Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -579,7 +583,7 @@ public void testPrimaryReference() throws Exception { fail("releasable is closed twice"); } }; - Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); + TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(); Request replicaRequest = (Request) primary.perform(request).replicaRequest; @@ -596,7 +600,7 @@ public void testPrimaryReference() throws Exception { } public void testReplicaProxy() throws InterruptedException, ExecutionException { - Action.ReplicasProxy proxy = action.new ReplicasProxy(); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); @@ -636,43 +640,15 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { assertListenerThrows("listener should reflect remote error", listener, TransportException.class); } - AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); - proxy.failShard(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set + proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"), + () -> success.set(true), failure::set, ignoredFailure::set ); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); - assertEquals(1, shardFailedRequests.length); - CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; - ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; - // the shard the request was sent to and the shard to be failed should be the same - assertEquals(shardEntry.getShardId(), replica.shardId()); - assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); - if (randomBoolean()) { - // simulate success - transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); - assertTrue(success.get()); - assertNull(failure.get()); - assertNull(ignoredFailure.get()); - - } else if (randomBoolean()) { - // simulate the primary has been demoted - transport.handleRemoteError(shardFailedRequest.requestId, - new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(), - "shard-failed-test")); - assertFalse(success.get()); - assertNotNull(failure.get()); - assertNull(ignoredFailure.get()); - - } else { - // simulated an "ignored" exception - transport.handleRemoteError(shardFailedRequest.requestId, - new NodeClosedException(state.nodes().getLocalNode())); - assertFalse(success.get()); - assertNull(failure.get()); - assertNotNull(ignoredFailure.get()); - } + // A replication action doesn't not fail the request + assertEquals(0, shardFailedRequests.length); } public void testShadowIndexDisablesReplication() throws Exception { @@ -691,9 +667,9 @@ public void testShadowIndexDisablesReplication() throws Exception { action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(), createTransportChannel(new PlainActionFuture<>()), null) { @Override - protected ReplicationOperation> createReplicatedOperation( - Request request, ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, + protected ReplicationOperation> createReplicatedOperation( + Request request, ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { assertFalse(executeOnReplicas); assertFalse(executed.getAndSet(true)); @@ -715,7 +691,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { Request request = new Request(shardId); TransportReplicationAction.ConcreteShardRequest concreteShardRequest = new TransportReplicationAction.ConcreteShardRequest<>(request, routingEntry.allocationId().getId()); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); final IndexShard shard = mock(IndexShard.class); @@ -730,10 +706,10 @@ public void testSeqNoIsSetOnPrimary() throws Exception { } }; - Action action = - new Action(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool); + TestAction action = + new TestAction(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool); - TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = + TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler(); primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); @@ -751,7 +727,7 @@ public void testCounterOnPrimary() throws Exception { logger.debug("--> using initial state:\n{}", clusterService.state()); final ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard(); Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); int i = randomInt(3); final boolean throwExceptionOnCreation = i == 1; @@ -759,11 +735,12 @@ public void testCounterOnPrimary() throws Exception { final boolean respondWithError = i == 3; action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation> - createReplicatedOperation(Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation( + Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { assertIndexShardCounter(1); if (throwExceptionOnCreation) { throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); @@ -808,7 +785,7 @@ public void testReplicasCounter() throws Exception { final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); boolean throwException = randomBoolean(); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { @@ -820,7 +797,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { replicaOperationTransportHandler.messageReceived( new TransportReplicationAction.ConcreteShardRequest<>( @@ -871,7 +848,7 @@ public void testPrimaryActionRejectsWrongAid() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); setState(clusterService, state(index, true, ShardRoutingState.STARTED)); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new PrimaryOperationTransportHandler().messageReceived( new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"), @@ -897,7 +874,7 @@ public void testReplicaActionRejectsWrongAid() throws Exception { state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(replica.currentNodeId())).build(); setState(clusterService, state); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new ReplicaOperationTransportHandler().messageReceived( new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"), @@ -928,7 +905,7 @@ public void testRetryOnReplica() throws Exception { setState(clusterService, state); AtomicBoolean throwException = new AtomicBoolean(true); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { @@ -939,8 +916,8 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); - final PlainActionFuture listener = new PlainActionFuture<>(); + final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); request.primaryTerm(state.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); replicaOperationTransportHandler.messageReceived( @@ -1047,31 +1024,46 @@ public String toString() { } } - static class Response extends ReplicationResponse { + static class TestResponse extends ReplicationResponse { } - class Action extends TransportReplicationAction { + private class TestAction extends TransportReplicationAction { - Action(Settings settings, String actionName, TransportService transportService, - ClusterService clusterService, - ShardStateAction shardStateAction, - ThreadPool threadPool) { + private final boolean withDocumentFailureOnPrimary; + private final boolean withDocumentFailureOnReplica; + + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, + shardStateAction, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + Request::new, Request::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = false; + this.withDocumentFailureOnReplica = false; + } + + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; + this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @Override - protected Response newResponseInstance() { - return new Response(); + protected TestResponse newResponseInstance() { + return new TestResponse(); } @Override protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new Response()); + return new PrimaryResult(shardRequest, new TestResponse()); } @Override @@ -1156,22 +1148,23 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService return indexShard; } - class NoopReplicationOperation extends ReplicationOperation> { - NoopReplicationOperation(Request request, ActionListener> listener) { + class NoopReplicationOperation extends ReplicationOperation> { + + NoopReplicationOperation(Request request, ActionListener> listener) { super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override public void execute() throws Exception { // Using the diamond operator (<>) prevents Eclipse from being able to compile this code - this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult(null, new Response())); + this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult(null, new TestResponse())); } } /** * Transport channel that is needed for replica operation testing. */ - public TransportChannel createTransportChannel(final PlainActionFuture listener) { + public TransportChannel createTransportChannel(final PlainActionFuture listener) { return new TransportChannel() { @Override @@ -1186,12 +1179,12 @@ public String getProfileName() { @Override public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse(((Response) response)); + listener.onResponse(((TestResponse) response)); } @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - listener.onResponse(((Response) response)); + listener.onResponse(((TestResponse) response)); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 82cd013c1954e..781059fd85945 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -19,38 +19,117 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.mockito.ArgumentCaptor; import java.util.HashSet; +import java.util.Locale; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportWriteActionTests extends ESTestCase { + + private static ThreadPool threadPool; + + private ClusterService clusterService; private IndexShard indexShard; private Translog.Location location; + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("ShardReplicationTests"); + } + @Before public void initCommonMocks() { indexShard = mock(IndexShard.class); location = mock(Translog.Location.class); + clusterService = createClusterService(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { + try { + listener.get(); + fail(msg); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(klass)); + } } public void testPrimaryNoRefreshCall() throws Exception { @@ -176,6 +255,95 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { assertNotNull(listener.failure); } + public void testReplicaProxy() throws InterruptedException, ExecutionException { + CapturingTransport transport = new CapturingTransport(); + TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + transportService.start(); + transportService.acceptIncomingRequests(); + ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); + TestAction action = action = new TestAction(Settings.EMPTY, "testAction", transportService, + clusterService, shardStateAction, threadPool); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); + logger.info("using state: {}", state); + ClusterServiceUtils.setState(clusterService, state); + + // check that at unknown node fails + PlainActionFuture listener = new PlainActionFuture<>(); + proxy.performOn( + TestShardRouting.newShardRouting(shardId, "NOT THERE", false, randomFrom(ShardRoutingState.values())), + new TestRequest(), listener); + assertTrue(listener.isDone()); + assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class); + + final IndexShardRoutingTable shardRoutings = state.routingTable().shardRoutingTable(shardId); + final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() + .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); + listener = new PlainActionFuture<>(); + proxy.performOn(replica, new TestRequest(), listener); + assertFalse(listener.isDone()); + + CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); + assertThat(captures, arrayWithSize(1)); + if (randomBoolean()) { + final TransportReplicationAction.ReplicaResponse response = + new TransportReplicationAction.ReplicaResponse(randomAsciiOfLength(10), randomLong()); + transport.handleResponse(captures[0].requestId, response); + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + } else if (randomBoolean()) { + transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated")); + assertTrue(listener.isDone()); + assertListenerThrows("listener should reflect remote error", listener, ElasticsearchException.class); + } else { + transport.handleError(captures[0].requestId, new TransportException("simulated")); + assertTrue(listener.isDone()); + assertListenerThrows("listener should reflect remote error", listener, TransportException.class); + } + + AtomicReference failure = new AtomicReference<>(); + AtomicReference ignoredFailure = new AtomicReference<>(); + AtomicBoolean success = new AtomicBoolean(); + proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"), + () -> success.set(true), failure::set, ignoredFailure::set + ); + CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); + // A write replication action proxy should fail the shard + assertEquals(1, shardFailedRequests.length); + CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; + ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; + // the shard the request was sent to and the shard to be failed should be the same + assertEquals(shardEntry.getShardId(), replica.shardId()); + assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); + if (randomBoolean()) { + // simulate success + transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + assertTrue(success.get()); + assertNull(failure.get()); + assertNull(ignoredFailure.get()); + + } else if (randomBoolean()) { + // simulate the primary has been demoted + transport.handleRemoteError(shardFailedRequest.requestId, + new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(), + "shard-failed-test")); + assertFalse(success.get()); + assertNotNull(failure.get()); + assertNull(ignoredFailure.get()); + + } else { + // simulated an "ignored" exception + transport.handleRemoteError(shardFailedRequest.requestId, + new NodeClosedException(state.nodes().getLocalNode())); + assertFalse(success.get()); + assertNull(failure.get()); + assertNotNull(ignoredFailure.get()); + } + } + private class TestAction extends TransportWriteAction { private final boolean withDocumentFailureOnPrimary; @@ -184,6 +352,7 @@ private class TestAction extends TransportWriteAction null, null), null, @@ -193,6 +362,17 @@ null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionRes this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } + protected TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, + mockIndicesService(clusterService), threadPool, shardStateAction, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = false; + this.withDocumentFailureOnReplica = false; + } + + @Override protected TestResponse newResponseInstance() { return new TestResponse(); @@ -222,6 +402,80 @@ protected WriteReplicaResult shardOperationOnReplica(TestRequest re } } + final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(anyInt())).then(invocation -> { + int shard = (Integer) invocation.getArguments()[0]; + final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard); + if (shard > indexMetaData.getNumberOfShards()) { + throw new ShardNotFoundException(shardId); + } + return mockIndexShard(shardId, clusterService); + }); + return indexService; + } + + final IndicesService mockIndicesService(ClusterService clusterService) { + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> { + Index index = (Index)invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + final IndexMetaData indexSafe = state.metaData().getIndexSafe(index); + return mockIndexService(indexSafe, clusterService); + }); + when(indicesService.indexService(any(Index.class))).then(invocation -> { + Index index = (Index) invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + if (state.metaData().hasIndex(index.getName())) { + final IndexMetaData indexSafe = state.metaData().getIndexSafe(index); + return mockIndexService(clusterService.state().metaData().getIndexSafe(index), clusterService); + } else { + return null; + } + }); + return indicesService; + } + + private final AtomicInteger count = new AtomicInteger(0); + + private final AtomicBoolean isRelocated = new AtomicBoolean(false); + + private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { + final IndexShard indexShard = mock(IndexShard.class); + doAnswer(invocation -> { + ActionListener callback = (ActionListener) invocation.getArguments()[0]; + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + doAnswer(invocation -> { + long term = (Long)invocation.getArguments()[0]; + ActionListener callback = (ActionListener) invocation.getArguments()[1]; + final long primaryTerm = indexShard.getPrimaryTerm(); + if (term < primaryTerm) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])", + shardId, term, primaryTerm)); + } + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { + final ClusterState state = clusterService.state(); + final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + final ShardRouting routing = node.getByShardId(shardId); + if (routing == null) { + throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node"); + } + return routing; + }); + when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); + doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); + when(indexShard.getPrimaryTerm()).thenAnswer(i -> + clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); + return indexShard; + } + private static class TestRequest extends ReplicatedWriteRequest { TestRequest() { setShardId(new ShardId("test", "test", 1)); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index c97c0982ebbd6..bc341d339ce39 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -339,9 +339,11 @@ public void testLatestVersionLoaded() throws Exception { client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); // TODO: remove once refresh doesn't fail immediately if there a master block: // https://github.com/elastic/elasticsearch/issues/9997 - client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); + // client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); + logger.info("--> refreshing all indices after indexing is complete"); client().admin().indices().prepareRefresh().execute().actionGet(); + logger.info("--> checking if documents exist, there should be 3"); for (int i = 0; i < 10; i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index b33b082f5c455..4477320b95680 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -429,14 +429,15 @@ public void performOn( } @Override - public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, + Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } } @@ -519,9 +520,8 @@ protected void indexOnReplica(IndexRequest request, IndexShard replica) throws I TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } - - class GlobalCheckpointSync extends - ReplicationAction { + class GlobalCheckpointSync extends ReplicationAction { GlobalCheckpointSync(ActionListener listener, ReplicationGroup replicationGroup) { super(new GlobalCheckpointSyncAction.PrimaryRequest(replicationGroup.getPrimary().shardId()), listener, @@ -529,7 +529,8 @@ class GlobalCheckpointSync extends } @Override - protected PrimaryResult performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception { + protected PrimaryResult performOnPrimary(IndexShard primary, + GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception { return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()), new ReplicationResponse()); } diff --git a/docs/reference/migration/migrate_6_0/rest.asciidoc b/docs/reference/migration/migrate_6_0/rest.asciidoc index f7b52b914daf3..cad56618fe010 100644 --- a/docs/reference/migration/migrate_6_0/rest.asciidoc +++ b/docs/reference/migration/migrate_6_0/rest.asciidoc @@ -34,7 +34,13 @@ The `ignore_unavailable` and `allow_no_indices` options are no longer accepted as they could cause undesired results when their values differed from their defaults. -=== `timestamp` and `ttl` in index requests +==== `timestamp` and `ttl` in index requests `timestamp` and `ttl` are not accepted anymore as parameters of index/update -requests. \ No newline at end of file +requests. + +==== Refresh requests with one or more shard failures return HTTP 500 response instead of 200 + +Refresh requests that are broadcast to multiple shards that can have one or more +shards fail during the request now return a 500 response instead of a 200 +response in the event there is at least one failure.