diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index a800411cc630d..d5f08cb6a9adf 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -167,7 +167,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java index 54d6220fd342c..e608e8e0ab7d6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastResponse.java @@ -23,6 +23,8 @@ import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; import java.util.List; @@ -42,11 +44,22 @@ public class BroadcastResponse extends ActionResponse { public BroadcastResponse() { } - public BroadcastResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + public BroadcastResponse(int totalShards, int successfulShards, int failedShards, + List shardFailures) { + assertNoShardNotAvailableFailures(shardFailures); this.totalShards = totalShards; this.successfulShards = successfulShards; this.failedShards = failedShards; - this.shardFailures = shardFailures == null ? EMPTY : shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]); + this.shardFailures = shardFailures == null ? EMPTY : + shardFailures.toArray(new ShardOperationFailedException[shardFailures.size()]); + } + + private void assertNoShardNotAvailableFailures(List shardFailures) { + if (shardFailures != null) { + for (Object e : shardFailures) { + assert (e instanceof ShardNotFoundException) == false : "expected no ShardNotFoundException failures, but got " + e; + } + } } /** @@ -70,6 +83,17 @@ public int getFailedShards() { return failedShards; } + /** + * The REST status that should be used for the response + */ + public RestStatus getStatus() { + if (failedShards > 0) { + return shardFailures[0].status(); + } else { + return RestStatus.OK; + } + } + /** * The list of shard failures exception. */ diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 6a3d217fcf6c9..0d1a632ad928f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -147,7 +147,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set decPendingAndFinishIfNeeded() @@ -209,14 +209,9 @@ public void onFailure(Exception replicaException) { shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - logger.warn( - (org.apache.logging.log4j.util.Supplier) - () -> new ParameterizedMessage("[{}] {}", shard.shardId(), message), replicaException); - replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException, - ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, - throwable -> decPendingAndFinishIfNeeded() - ); + replicasProxy.failShardIfNeeded(shard, replicaRequest.primaryTerm(), message, + replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } } }); @@ -314,10 +309,13 @@ private void finishAsFailed(Exception exception) { } } + /** + * An encapsulation of an operation that is to be performed on the primary shard + */ public interface Primary< - Request extends ReplicationRequest, - ReplicaRequest extends ReplicationRequest, - PrimaryResultT extends PrimaryResult + RequestT extends ReplicationRequest, + ReplicaRequestT extends ReplicationRequest, + PrimaryResultT extends PrimaryResult > { /** @@ -338,7 +336,7 @@ public interface Primary< * @param request the request to perform * @return the request to send to the repicas */ - PrimaryResultT perform(Request request) throws Exception; + PrimaryResultT perform(RequestT request) throws Exception; /** @@ -355,7 +353,10 @@ public interface Primary< long localCheckpoint(); } - public interface Replicas> { + /** + * An encapsulation of an operation that will be executed on the replica shards, if present. + */ + public interface Replicas> { /** * performs the the given request on the specified replica @@ -364,24 +365,29 @@ public interface Replicas listener); + void performOn(ShardRouting replica, RequestT replicaRequest, ActionListener listener); /** - * Fail the specified shard, removing it from the current set of active shards + * Fail the specified shard if needed, removing it from the current set + * of active shards. Whether a failure is needed is left up to the + * implementation. + * * @param replica shard to fail * @param primaryTerm the primary term of the primary shard when requesting the failure * @param message a (short) description of the reason * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed * @param onSuccess a callback to call when the shard has been successfully removed from the active set. * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted -* by the master. + * by the master. * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the */ - void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure); /** - * Marks shard copy as stale, removing its allocation id from the set of in-sync allocation ids. + * Marks shard copy as stale if needed, removing its allocation id from + * the set of in-sync allocation ids. Whether marking as stale is needed + * is left up to the implementation. * * @param shardId shard id * @param allocationId allocation id to remove from the set of in-sync allocation ids @@ -391,8 +397,8 @@ void failShard(ShardRouting replica, long primaryTerm, String message, Exception * by the master. * @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored. */ - void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure); } /** @@ -422,13 +428,13 @@ public RetryOnPrimaryException(StreamInput in) throws IOException { } } - public interface PrimaryResult> { + public interface PrimaryResult> { /** * @return null if no operation needs to be sent to a replica * (for example when the operation failed on the primary due to a parsing exception) */ - @Nullable R replicaRequest(); + @Nullable RequestT replicaRequest(); void setShardInfo(ReplicationResponse.ShardInfo shardInfo); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 32f5b8f0f2cb7..7190879976811 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -99,15 +99,15 @@ public abstract class TransportReplicationAction< private final TransportService transportService; protected final ClusterService clusterService; + protected final ShardStateAction shardStateAction; private final IndicesService indicesService; - private final ShardStateAction shardStateAction; private final TransportRequestOptions transportOptions; private final String executor; // package private for testing private final String transportReplicaAction; private final String transportPrimaryAction; - private final ReplicasProxy replicasProxy; + private final ReplicationOperation.Replicas replicasProxy; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, @@ -135,7 +135,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportOptions = transportOptions(); - this.replicasProxy = new ReplicasProxy(); + this.replicasProxy = newReplicasProxy(); } @Override @@ -148,6 +148,10 @@ protected void doExecute(Task task, Request request, ActionListener li new ReroutePhase((ReplicationTask) task, request, listener).run(); } + protected ReplicationOperation.Replicas newReplicasProxy() { + return new ReplicasProxy(); + } + protected abstract Response newResponseInstance(); /** @@ -369,8 +373,7 @@ protected ReplicationOperation> listener, PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { return new ReplicationOperation<>(request, primaryShardReference, listener, - executeOnReplicas, replicasProxy, clusterService::state, logger, actionName - ); + executeOnReplicas, replicasProxy, clusterService::state, logger, actionName); } } @@ -1030,7 +1033,13 @@ public String allocationId() { } } - final class ReplicasProxy implements ReplicationOperation.Replicas { + /** + * The {@code ReplicasProxy} is an implementation of the {@code Replicas} + * interface that performs the actual {@code ReplicaRequest} on the replica + * shards. It also encapsulates the logic required for failing the replica + * if deemed necessary as well as marking it as stale when needed. + */ + class ReplicasProxy implements ReplicationOperation.Replicas { @Override public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener listener) { @@ -1041,45 +1050,28 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen return; } final ConcreteShardRequest concreteShardRequest = - new ConcreteShardRequest<>(request, replica.allocationId().getId()); + new ConcreteShardRequest<>(request, replica.allocationId().getId()); sendReplicaRequest(concreteShardRequest, node, listener); } @Override - public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + // This does not need to fail the shard. The idea is that this + // is a non-write operation (something like a refresh or a global + // checkpoint sync) and therefore the replica should still be + // "alive" if it were to fail. + onSuccess.run(); } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { - shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null, - createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); - } - - private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, - final Consumer onIgnoredFailure) { - return new ShardStateAction.Listener() { - @Override - public void onSuccess() { - onSuccess.run(); - } - - @Override - public void onFailure(Exception shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - onPrimaryDemoted.accept(shardFailedError); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onIgnoredFailure.accept(shardFailedError); - } - } - }; + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + // This does not need to make the shard stale. The idea is that this + // is a non-write operation (something like a refresh or a global + // checkpoint sync) and therefore the replica should still be + // "alive" if it were to be marked as stale. + onSuccess.run(); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 8569b28257f9f..10f8741ecccb6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -20,29 +20,38 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import org.apache.logging.log4j.core.pattern.ConverterKeys; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -63,6 +72,11 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe indexNameExpressionResolver, request, replicaRequest, executor); } + @Override + protected ReplicationOperation.Replicas newReplicasProxy() { + return new WriteActionReplicasProxy(); + } + /** * Called on the primary with a reference to the primary {@linkplain IndexShard} to modify. * @@ -311,4 +325,55 @@ void run() { } } } + + /** + * A proxy for write operations that need to be performed on the + * replicas, where a failure to execute the operation should fail + * the replica shard and/or mark the replica as stale. + * + * This extends {@code TransportReplicationAction.ReplicasProxy} to do the + * failing and stale-ing. + */ + class WriteActionReplicasProxy extends ReplicasProxy { + + @Override + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + + logger.warn((org.apache.logging.log4j.util.Supplier) + () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); + shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, + createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + + @Override + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, "mark copy as stale", null, + createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); + } + + public ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, + final Consumer onIgnoredFailure) { + return new ShardStateAction.Listener() { + @Override + public void onSuccess() { + onSuccess.run(); + } + + @Override + public void onFailure(Exception shardFailedError) { + if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { + onPrimaryDemoted.accept(shardFailedError); + } else { + // these can occur if the node is shutting down and are okay + // any other exception here is not expected and merits investigation + assert shardFailedError instanceof TransportException || + shardFailedError instanceof NodeClosedException : shardFailedError; + onIgnoredFailure.accept(shardFailedError); + } + } + }; + } + } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java index c213f3be18b6b..7b317ab404e48 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRefreshAction.java @@ -60,7 +60,7 @@ public RestResponse buildResponse(RefreshResponse response, XContentBuilder buil builder.startObject(); buildBroadcastShardsHeader(builder, request, response); builder.endObject(); - return new BytesRestResponse(OK, builder); + return new BytesRestResponse(response.getStatus(), builder); } }); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index a701193d22042..459bafd3af2f1 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -190,11 +190,11 @@ public void testDemotedPrimary() throws Exception { final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean(); final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) { @Override - public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, - Runnable onSuccess, Consumer onPrimaryDemoted, - Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, + Consumer onIgnoredFailure) { if (testPrimaryDemotedOnStaleShardCopies) { - super.failShard(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.failShardIfNeeded(replica, primaryTerm, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure); } else { assertThat(replica, equalTo(failedReplica)); onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); @@ -202,12 +202,12 @@ public void failShard(ShardRouting replica, long primaryTerm, String message, Ex } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (testPrimaryDemotedOnStaleShardCopies) { onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); } else { - super.markShardCopyAsStale(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure); + super.markShardCopyAsStaleIfNeeded(shardId, allocationId, primaryTerm, onSuccess, onPrimaryDemoted, onIgnoredFailure); } } }; @@ -486,8 +486,8 @@ public void performOn(ShardRouting replica, Request request, ActionListener onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); } @@ -503,8 +503,8 @@ public void failShard(ShardRouting replica, long primaryTerm, String message, Ex } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (markedAsStaleCopies.add(allocationId) == false) { fail("replica [" + allocationId + "] was marked as stale twice"); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index c2c484fe3e62c..1bd943704707d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -135,7 +135,7 @@ public static R resolveRequest(TransportRequest r private ClusterService clusterService; private TransportService transportService; private CapturingTransport transport; - private Action action; + private TestAction action; private ShardStateAction shardStateAction; /* * @@ -159,7 +159,7 @@ public void setUp() throws Exception { transportService.start(); transportService.acceptIncomingRequests(); shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); - action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool); + action = new TestAction(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool); } @After @@ -185,9 +185,10 @@ void assertListenerThrows(String msg, PlainActionFuture listener, Class listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { + TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks", + transportService, clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel globalBlockLevel() { return ClusterBlockLevel.WRITE; @@ -197,7 +198,7 @@ protected ClusterBlockLevel globalBlockLevel() { ClusterBlocks.Builder block = ClusterBlocks.builder() .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class); assertPhase(task, "failed"); @@ -226,7 +227,7 @@ protected ClusterBlockLevel globalBlockLevel() { ClusterBlockException.class); assertIndexShardUninitialized(); - action = new Action(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) { + action = new TestAction(Settings.EMPTY, "testActionWithNoBlocks", transportService, clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel globalBlockLevel() { return null; @@ -253,8 +254,8 @@ public void testNotStartedPrimary() throws InterruptedException, ExecutionExcept logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(shardId).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + PlainActionFuture listener = new PlainActionFuture<>(); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class); assertPhase(task, "failed"); @@ -301,8 +302,8 @@ public void testNoRerouteOnStaleClusterState() throws InterruptedException, Exec logger.debug("--> relocation ongoing state:\n{}", clusterService.state()); Request request = new Request(shardId).timeout("1ms").routedBasedOnClusterVersion(clusterService.state().version() + 1); - PlainActionFuture listener = new PlainActionFuture<>(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); + PlainActionFuture listener = new PlainActionFuture<>(); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); reroutePhase.run(); assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class); assertTrue(request.isRetrySet.compareAndSet(true, false)); @@ -340,10 +341,10 @@ public void testUnknownIndexOrShardOnReroute() throws InterruptedException { randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED)); logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class); assertPhase(task, "failed"); @@ -364,17 +365,18 @@ public void testClosedIndexOnReroute() throws InterruptedException { ShardRoutingState.UNASSIGNED), new CloseIndexRequest(index))); logger.debug("--> using initial state:\n{}", clusterService.state()); Request request = new Request(new ShardId("test", "_na_", 0)).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null; - Action action = new Action(Settings.EMPTY, "testActionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { + TestAction action = new TestAction(Settings.EMPTY, "testActionWithBlocks", transportService, + clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel indexBlockLevel() { return indexBlockLevel; } }; - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); if (indexBlockLevel == ClusterBlockLevel.WRITE) { assertListenerThrows("must throw block exception", listener, ClusterBlockException.class); @@ -398,10 +400,10 @@ public void testStalePrimaryShardOnReroute() throws InterruptedException { } else { request.timeout("1h"); } - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests, arrayWithSize(1)); @@ -452,9 +454,9 @@ public void testRoutePhaseExecutesRequest() { final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId(); Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); - Action.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); assertThat(request.shardId(), equalTo(shardId)); logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); @@ -479,7 +481,7 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); setState(clusterService, state); Request request = new Request(shardId).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); @@ -492,11 +494,12 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw } action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation> - createReplicatedOperation(Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation( + Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -521,7 +524,7 @@ public void execute() throws Exception { assertThat(requests.size(), equalTo(1)); assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]")); assertPhase(task, "primary_delegation"); - transport.handleResponse(requests.get(0).requestId, new Response()); + transport.handleResponse(requests.get(0).requestId, new TestResponse()); assertTrue(listener.isDone()); listener.get(); assertPhase(task, "finished"); @@ -539,16 +542,17 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build(); setState(clusterService, state); Request request = new Request(shardId).timeout("1ms"); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation> - createReplicatedOperation(Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation( + Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { return new NoopReplicationOperation(request, actionListener) { public void execute() throws Exception { assertPhase(task, "primary"); @@ -579,7 +583,7 @@ public void testPrimaryReference() throws Exception { fail("releasable is closed twice"); } }; - Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); + TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(); Request replicaRequest = (Request) primary.perform(request).replicaRequest; @@ -596,7 +600,7 @@ public void testPrimaryReference() throws Exception { } public void testReplicaProxy() throws InterruptedException, ExecutionException { - Action.ReplicasProxy proxy = action.new ReplicasProxy(); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); @@ -636,43 +640,15 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { assertListenerThrows("listener should reflect remote error", listener, TransportException.class); } - AtomicReference failure = new AtomicReference<>(); - AtomicReference ignoredFailure = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); - proxy.failShard(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"), - () -> success.set(true), failure::set, ignoredFailure::set + proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"), + () -> success.set(true), failure::set, ignoredFailure::set ); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); - assertEquals(1, shardFailedRequests.length); - CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; - ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; - // the shard the request was sent to and the shard to be failed should be the same - assertEquals(shardEntry.getShardId(), replica.shardId()); - assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); - if (randomBoolean()) { - // simulate success - transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); - assertTrue(success.get()); - assertNull(failure.get()); - assertNull(ignoredFailure.get()); - - } else if (randomBoolean()) { - // simulate the primary has been demoted - transport.handleRemoteError(shardFailedRequest.requestId, - new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(), - "shard-failed-test")); - assertFalse(success.get()); - assertNotNull(failure.get()); - assertNull(ignoredFailure.get()); - - } else { - // simulated an "ignored" exception - transport.handleRemoteError(shardFailedRequest.requestId, - new NodeClosedException(state.nodes().getLocalNode())); - assertFalse(success.get()); - assertNull(failure.get()); - assertNotNull(ignoredFailure.get()); - } + // A replication action doesn't not fail the request + assertEquals(0, shardFailedRequests.length); } public void testShadowIndexDisablesReplication() throws Exception { @@ -691,9 +667,9 @@ public void testShadowIndexDisablesReplication() throws Exception { action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(), createTransportChannel(new PlainActionFuture<>()), null) { @Override - protected ReplicationOperation> createReplicatedOperation( - Request request, ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, + protected ReplicationOperation> createReplicatedOperation( + Request request, ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { assertFalse(executeOnReplicas); assertFalse(executed.getAndSet(true)); @@ -715,7 +691,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { Request request = new Request(shardId); TransportReplicationAction.ConcreteShardRequest concreteShardRequest = new TransportReplicationAction.ConcreteShardRequest<>(request, routingEntry.allocationId().getId()); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); final IndexShard shard = mock(IndexShard.class); @@ -730,10 +706,10 @@ public void testSeqNoIsSetOnPrimary() throws Exception { } }; - Action action = - new Action(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool); + TestAction action = + new TestAction(Settings.EMPTY, "testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool); - TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = + TransportReplicationAction.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler(); primaryPhase.messageReceived(concreteShardRequest, createTransportChannel(listener), null); CapturingTransport.CapturedRequest[] requestsToReplicas = transport.capturedRequests(); @@ -751,7 +727,7 @@ public void testCounterOnPrimary() throws Exception { logger.debug("--> using initial state:\n{}", clusterService.state()); final ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard(); Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); int i = randomInt(3); final boolean throwExceptionOnCreation = i == 1; @@ -759,11 +735,12 @@ public void testCounterOnPrimary() throws Exception { final boolean respondWithError = i == 3; action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override - protected ReplicationOperation> - createReplicatedOperation(Request request, - ActionListener> actionListener, - TransportReplicationAction.PrimaryShardReference primaryShardReference, - boolean executeOnReplicas) { + protected ReplicationOperation> + createReplicatedOperation( + Request request, + ActionListener> actionListener, + TransportReplicationAction.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { assertIndexShardCounter(1); if (throwExceptionOnCreation) { throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); @@ -808,7 +785,7 @@ public void testReplicasCounter() throws Exception { final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); boolean throwException = randomBoolean(); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { @@ -820,7 +797,7 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { replicaOperationTransportHandler.messageReceived( new TransportReplicationAction.ConcreteShardRequest<>( @@ -871,7 +848,7 @@ public void testPrimaryActionRejectsWrongAid() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); setState(clusterService, state(index, true, ShardRoutingState.STARTED)); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new PrimaryOperationTransportHandler().messageReceived( new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"), @@ -897,7 +874,7 @@ public void testReplicaActionRejectsWrongAid() throws Exception { state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(replica.currentNodeId())).build(); setState(clusterService, state); - PlainActionFuture listener = new PlainActionFuture<>(); + PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new ReplicaOperationTransportHandler().messageReceived( new TransportReplicationAction.ConcreteShardRequest<>(request, "_not_a_valid_aid_"), @@ -928,7 +905,7 @@ public void testRetryOnReplica() throws Exception { setState(clusterService, state); AtomicBoolean throwException = new AtomicBoolean(true); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { @@ -939,8 +916,8 @@ protected ReplicaResult shardOperationOnReplica(Request request, IndexShard repl return new ReplicaResult(); } }; - final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); - final PlainActionFuture listener = new PlainActionFuture<>(); + final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + final PlainActionFuture listener = new PlainActionFuture<>(); final Request request = new Request().setShardId(shardId); request.primaryTerm(state.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); replicaOperationTransportHandler.messageReceived( @@ -1047,31 +1024,46 @@ public String toString() { } } - static class Response extends ReplicationResponse { + static class TestResponse extends ReplicationResponse { } - class Action extends TransportReplicationAction { + private class TestAction extends TransportReplicationAction { - Action(Settings settings, String actionName, TransportService transportService, - ClusterService clusterService, - ShardStateAction shardStateAction, - ThreadPool threadPool) { + private final boolean withDocumentFailureOnPrimary; + private final boolean withDocumentFailureOnReplica; + + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, + shardStateAction, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + Request::new, Request::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = false; + this.withDocumentFailureOnReplica = false; + } + + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; + this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @Override - protected Response newResponseInstance() { - return new Response(); + protected TestResponse newResponseInstance() { + return new TestResponse(); } @Override protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new Response()); + return new PrimaryResult(shardRequest, new TestResponse()); } @Override @@ -1156,22 +1148,23 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService return indexShard; } - class NoopReplicationOperation extends ReplicationOperation> { - NoopReplicationOperation(Request request, ActionListener> listener) { + class NoopReplicationOperation extends ReplicationOperation> { + + NoopReplicationOperation(Request request, ActionListener> listener) { super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override public void execute() throws Exception { // Using the diamond operator (<>) prevents Eclipse from being able to compile this code - this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult(null, new Response())); + this.resultListener.onResponse(new TransportReplicationAction.PrimaryResult(null, new TestResponse())); } } /** * Transport channel that is needed for replica operation testing. */ - public TransportChannel createTransportChannel(final PlainActionFuture listener) { + public TransportChannel createTransportChannel(final PlainActionFuture listener) { return new TransportChannel() { @Override @@ -1186,12 +1179,12 @@ public String getProfileName() { @Override public void sendResponse(TransportResponse response) throws IOException { - listener.onResponse(((Response) response)); + listener.onResponse(((TestResponse) response)); } @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - listener.onResponse(((Response) response)); + listener.onResponse(((TestResponse) response)); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 82cd013c1954e..781059fd85945 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -19,38 +19,117 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse; +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.mockito.ArgumentCaptor; import java.util.HashSet; +import java.util.Locale; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportWriteActionTests extends ESTestCase { + + private static ThreadPool threadPool; + + private ClusterService clusterService; private IndexShard indexShard; private Translog.Location location; + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("ShardReplicationTests"); + } + @Before public void initCommonMocks() { indexShard = mock(IndexShard.class); location = mock(Translog.Location.class); + clusterService = createClusterService(threadPool); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { + try { + listener.get(); + fail(msg); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(klass)); + } } public void testPrimaryNoRefreshCall() throws Exception { @@ -176,6 +255,95 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { assertNotNull(listener.failure); } + public void testReplicaProxy() throws InterruptedException, ExecutionException { + CapturingTransport transport = new CapturingTransport(); + TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null); + transportService.start(); + transportService.acceptIncomingRequests(); + ShardStateAction shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); + TestAction action = action = new TestAction(Settings.EMPTY, "testAction", transportService, + clusterService, shardStateAction, threadPool); + ReplicationOperation.Replicas proxy = action.newReplicasProxy(); + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); + logger.info("using state: {}", state); + ClusterServiceUtils.setState(clusterService, state); + + // check that at unknown node fails + PlainActionFuture listener = new PlainActionFuture<>(); + proxy.performOn( + TestShardRouting.newShardRouting(shardId, "NOT THERE", false, randomFrom(ShardRoutingState.values())), + new TestRequest(), listener); + assertTrue(listener.isDone()); + assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class); + + final IndexShardRoutingTable shardRoutings = state.routingTable().shardRoutingTable(shardId); + final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() + .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); + listener = new PlainActionFuture<>(); + proxy.performOn(replica, new TestRequest(), listener); + assertFalse(listener.isDone()); + + CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); + assertThat(captures, arrayWithSize(1)); + if (randomBoolean()) { + final TransportReplicationAction.ReplicaResponse response = + new TransportReplicationAction.ReplicaResponse(randomAsciiOfLength(10), randomLong()); + transport.handleResponse(captures[0].requestId, response); + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + } else if (randomBoolean()) { + transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated")); + assertTrue(listener.isDone()); + assertListenerThrows("listener should reflect remote error", listener, ElasticsearchException.class); + } else { + transport.handleError(captures[0].requestId, new TransportException("simulated")); + assertTrue(listener.isDone()); + assertListenerThrows("listener should reflect remote error", listener, TransportException.class); + } + + AtomicReference failure = new AtomicReference<>(); + AtomicReference ignoredFailure = new AtomicReference<>(); + AtomicBoolean success = new AtomicBoolean(); + proxy.failShardIfNeeded(replica, randomIntBetween(1, 10), "test", new ElasticsearchException("simulated"), + () -> success.set(true), failure::set, ignoredFailure::set + ); + CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); + // A write replication action proxy should fail the shard + assertEquals(1, shardFailedRequests.length); + CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; + ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; + // the shard the request was sent to and the shard to be failed should be the same + assertEquals(shardEntry.getShardId(), replica.shardId()); + assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); + if (randomBoolean()) { + // simulate success + transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + assertTrue(success.get()); + assertNull(failure.get()); + assertNull(ignoredFailure.get()); + + } else if (randomBoolean()) { + // simulate the primary has been demoted + transport.handleRemoteError(shardFailedRequest.requestId, + new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(), + "shard-failed-test")); + assertFalse(success.get()); + assertNotNull(failure.get()); + assertNull(ignoredFailure.get()); + + } else { + // simulated an "ignored" exception + transport.handleRemoteError(shardFailedRequest.requestId, + new NodeClosedException(state.nodes().getLocalNode())); + assertFalse(success.get()); + assertNull(failure.get()); + assertNotNull(ignoredFailure.get()); + } + } + private class TestAction extends TransportWriteAction { private final boolean withDocumentFailureOnPrimary; @@ -184,6 +352,7 @@ private class TestAction extends TransportWriteAction null, null), null, @@ -193,6 +362,17 @@ null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionRes this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } + protected TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, + mockIndicesService(clusterService), threadPool, shardStateAction, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); + this.withDocumentFailureOnPrimary = false; + this.withDocumentFailureOnReplica = false; + } + + @Override protected TestResponse newResponseInstance() { return new TestResponse(); @@ -222,6 +402,80 @@ protected WriteReplicaResult shardOperationOnReplica(TestRequest re } } + final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(anyInt())).then(invocation -> { + int shard = (Integer) invocation.getArguments()[0]; + final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard); + if (shard > indexMetaData.getNumberOfShards()) { + throw new ShardNotFoundException(shardId); + } + return mockIndexShard(shardId, clusterService); + }); + return indexService; + } + + final IndicesService mockIndicesService(ClusterService clusterService) { + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> { + Index index = (Index)invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + final IndexMetaData indexSafe = state.metaData().getIndexSafe(index); + return mockIndexService(indexSafe, clusterService); + }); + when(indicesService.indexService(any(Index.class))).then(invocation -> { + Index index = (Index) invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + if (state.metaData().hasIndex(index.getName())) { + final IndexMetaData indexSafe = state.metaData().getIndexSafe(index); + return mockIndexService(clusterService.state().metaData().getIndexSafe(index), clusterService); + } else { + return null; + } + }); + return indicesService; + } + + private final AtomicInteger count = new AtomicInteger(0); + + private final AtomicBoolean isRelocated = new AtomicBoolean(false); + + private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { + final IndexShard indexShard = mock(IndexShard.class); + doAnswer(invocation -> { + ActionListener callback = (ActionListener) invocation.getArguments()[0]; + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + doAnswer(invocation -> { + long term = (Long)invocation.getArguments()[0]; + ActionListener callback = (ActionListener) invocation.getArguments()[1]; + final long primaryTerm = indexShard.getPrimaryTerm(); + if (term < primaryTerm) { + throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])", + shardId, term, primaryTerm)); + } + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { + final ClusterState state = clusterService.state(); + final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + final ShardRouting routing = node.getByShardId(shardId); + if (routing == null) { + throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node"); + } + return routing; + }); + when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); + doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); + when(indexShard.getPrimaryTerm()).thenAnswer(i -> + clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); + return indexShard; + } + private static class TestRequest extends ReplicatedWriteRequest { TestRequest() { setShardId(new ShardId("test", "test", 1)); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index c97c0982ebbd6..bc341d339ce39 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -339,9 +339,11 @@ public void testLatestVersionLoaded() throws Exception { client().prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet(); // TODO: remove once refresh doesn't fail immediately if there a master block: // https://github.com/elastic/elasticsearch/issues/9997 - client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); + // client().admin().cluster().prepareHealth("test").setWaitForYellowStatus().get(); + logger.info("--> refreshing all indices after indexing is complete"); client().admin().indices().prepareRefresh().execute().actionGet(); + logger.info("--> checking if documents exist, there should be 3"); for (int i = 0; i < 10; i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 3); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index b33b082f5c455..4477320b95680 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -429,14 +429,15 @@ public void performOn( } @Override - public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void failShardIfNeeded(ShardRouting replica, long primaryTerm, String message, Exception exception, + Runnable onSuccess, Consumer onPrimaryDemoted, + Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } @Override - public void markShardCopyAsStale(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, - Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, long primaryTerm, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } } @@ -519,9 +520,8 @@ protected void indexOnReplica(IndexRequest request, IndexShard replica) throws I TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); } - - class GlobalCheckpointSync extends - ReplicationAction { + class GlobalCheckpointSync extends ReplicationAction { GlobalCheckpointSync(ActionListener listener, ReplicationGroup replicationGroup) { super(new GlobalCheckpointSyncAction.PrimaryRequest(replicationGroup.getPrimary().shardId()), listener, @@ -529,7 +529,8 @@ class GlobalCheckpointSync extends } @Override - protected PrimaryResult performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception { + protected PrimaryResult performOnPrimary(IndexShard primary, + GlobalCheckpointSyncAction.PrimaryRequest request) throws Exception { return new PrimaryResult(new GlobalCheckpointSyncAction.ReplicaRequest(request, primary.getGlobalCheckpoint()), new ReplicationResponse()); } diff --git a/docs/reference/migration/migrate_6_0/rest.asciidoc b/docs/reference/migration/migrate_6_0/rest.asciidoc index f7b52b914daf3..cad56618fe010 100644 --- a/docs/reference/migration/migrate_6_0/rest.asciidoc +++ b/docs/reference/migration/migrate_6_0/rest.asciidoc @@ -34,7 +34,13 @@ The `ignore_unavailable` and `allow_no_indices` options are no longer accepted as they could cause undesired results when their values differed from their defaults. -=== `timestamp` and `ttl` in index requests +==== `timestamp` and `ttl` in index requests `timestamp` and `ttl` are not accepted anymore as parameters of index/update -requests. \ No newline at end of file +requests. + +==== Refresh requests with one or more shard failures return HTTP 500 response instead of 200 + +Refresh requests that are broadcast to multiple shards that can have one or more +shards fail during the request now return a 500 response instead of a 200 +response in the event there is at least one failure.