From 9b064859f9fd3c682d47fdf6f85c798da4ea1d12 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 22 Jul 2015 12:41:15 +0200 Subject: [PATCH 01/10] Split the actual primary operation from the primary phase into a dedicated AsyncPrimaryAction class (similar to AsyncReplicaAction) Removed threaded_operation option from replication based transport actions. Let the threading be handled by by the transport service and drop forking from new threads from the transport replication action. --- .../action/bulk/TransportShardBulkAction.java | 2 +- .../action/index/TransportIndexAction.java | 2 +- .../replication/ReplicationRequest.java | 28 +- .../ReplicationRequestBuilder.java | 10 - .../TransportReplicationAction.java | 468 +++++++++--------- .../action/update/UpdateHelper.java | 3 - .../rest/action/delete/RestDeleteAction.java | 3 - .../rest/action/index/RestIndexAction.java | 1 - .../transport/TransportService.java | 2 +- .../action/IndicesRequestIT.java | 12 +- .../replication/ShardReplicationTests.java | 70 ++- .../WriteConsistencyLevelIT.java | 4 +- .../indices/state/RareClusterStateIT.java | 1 + .../test/cluster/TestClusterService.java | 5 +- .../test/transport/CapturingTransport.java | 4 +- 15 files changed, 308 insertions(+), 307 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2ca2dfe142ac2..25428eda262c8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -471,7 +471,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: [{}]", update); } operation.execute(indexShard); location = locationToSync(location, operation.getTranslogLocation()); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 97620829f3a48..0f684e66d63eb 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -195,7 +195,7 @@ protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) { } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: [{}]", update); } operation.execute(indexShard); processAfter(request, indexShard, operation.getTranslogLocation()); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 39a3dae75698b..c74597afcdd2f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -24,10 +24,10 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -41,12 +41,11 @@ public abstract class ReplicationRequest extends A public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - ShardId internalShardId; + ShardRouting internalShardRouting; protected TimeValue timeout = DEFAULT_TIMEOUT; protected String index; - private boolean threadedOperation = true; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private volatile boolean canHaveDuplicates = false; @@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) { super(originalRequest); this.timeout = request.timeout(); this.index = request.index(); - this.threadedOperation = request.operationThreaded(); this.consistencyLevel = request.consistencyLevel(); } @@ -91,23 +89,6 @@ public boolean canHaveDuplicates() { return canHaveDuplicates; } - /** - * Controls if the operation will be executed on a separate thread when executed locally. - */ - public final boolean operationThreaded() { - return threadedOperation; - } - - /** - * Controls if the operation will be executed on a separate thread when executed locally. Defaults - * to true when running in embedded mode. - */ - @SuppressWarnings("unchecked") - public final T operationThreaded(boolean threadedOperation) { - this.threadedOperation = threadedOperation; - return (T) this; - } - /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ @@ -174,19 +155,18 @@ public ActionRequestValidationException validate() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); if (in.readBoolean()) { - internalShardId = ShardId.readShardId(in); + internalShardRouting = ShardRouting.readShardRoutingEntry(in); } consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); timeout = TimeValue.readTimeValue(in); index = in.readString(); canHaveDuplicates = in.readBoolean(); - // no need to serialize threaded* parameters, since they only matter locally } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeOptionalStreamable(internalShardId); + out.writeOptionalStreamable(internalShardRouting); out.writeByte(consistencyLevel.id()); timeout.writeTo(out); out.writeString(index); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java index 7762eaee7dc0e..bafb33be6a73e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java @@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Actiontrue when running in embedded mode. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setOperationThreaded(boolean threadedOperation) { - request.operationThreaded(threadedOperation); - return (RequestBuilder) this; - } - /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index bc4094acc54f4..c65908e77de04 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -85,6 +86,7 @@ public abstract class TransportReplicationAction T response() { class OperationTransportHandler implements TransportRequestHandler { @Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - // if we have a local operation, execute it on a thread since we don't spawn - request.operationThreaded(true); execute(request, new ActionListener() { @Override public void onResponse(Response result) { @@ -242,10 +244,174 @@ public void messageReceived(final ReplicaRequest request, final TransportChannel } } + class PrimaryOperationTransportHandler implements TransportRequestHandler { + + @Override + public void messageReceived(Request request, final TransportChannel channel) throws Exception { + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (Throwable t) { + onFailure(t); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Throwable t) { + logger.warn("failed to send response for get", t); + } + } + }; + new AsyncPrimaryAction(request, actionListener).run(); + } + } + + class AsyncPrimaryAction extends AbstractRunnable { + + private final ClusterStateObserver observer; + private final ActionListener listener; + private final InternalRequest internalRequest; + private final Request request; + private final AtomicBoolean finished = new AtomicBoolean(false); + private volatile Releasable indexShardReference; + + public AsyncPrimaryAction(Request request, ActionListener listener) { + this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger); + this.listener = listener; + this.request = request; + this.internalRequest = new InternalRequest(request); + this.internalRequest.concreteIndex(request.internalShardRouting.getIndex()); + } + + @Override + public void onFailure(Throwable t) { + finishAsFailed(t); + } + + @Override + protected void doRun() throws Exception { + final ShardRouting primary = request.internalShardRouting; + if (clusterService.localNode().id().equals(primary.currentNodeId()) == false) { + throw new RetryOnPrimaryException(primary.shardId(), "shard [{}] not assigned to this node [{}], but node [{}]", primary.shardId(), clusterService.localNode().id(), primary.currentNodeId()); + } + final ShardIterator shardIt = shards(observer.observedState(), internalRequest); + performOnPrimary(primary, shardIt); + } + + /** + * perform the operation on the node holding the primary + */ + void performOnPrimary(ShardRouting primary, final ShardIterator shardsIt) { + final String writeConsistencyFailure = checkWriteConsistency(primary); + if (writeConsistencyFailure != null) { + throw new UnavailableShardsException(primary.shardId(), writeConsistencyFailure); + } + final ReplicationPhase replicationPhase; + try { + indexShardReference = getIndexShardOperationsCounter(primary.shardId()); + PrimaryOperationRequest por = new PrimaryOperationRequest(primary.shardId(), request); + Tuple primaryResponse = shardOperationOnPrimary(observer.observedState(), por); + logger.trace("operation completed on primary [{}]", primary); + replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference); + } catch (Throwable e) { + if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { + if (logger.isTraceEnabled()) { + logger.trace(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); + } + } + finishAsFailed(e); + return; + } + finishAndMoveToReplication(replicationPhase); + } + + /** + * checks whether we can perform a write based on the write consistency setting + * returns **null* if OK to proceed, or a string describing the reason to stop + */ + String checkWriteConsistency(ShardRouting shard) { + if (checkWriteConsistency == false) { + return null; + } + + final WriteConsistencyLevel consistencyLevel; + if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) { + consistencyLevel = internalRequest.request().consistencyLevel(); + } else { + consistencyLevel = defaultWriteConsistencyLevel; + } + final int sizeActive; + final int requiredNumber; + IndexRoutingTable indexRoutingTable = observer.observedState().getRoutingTable().index(shard.index()); + if (indexRoutingTable != null) { + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId()); + if (shardRoutingTable != null) { + sizeActive = shardRoutingTable.activeShards().size(); + if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { + // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) + requiredNumber = (shardRoutingTable.getSize() / 2) + 1; + } else if (consistencyLevel == WriteConsistencyLevel.ALL) { + requiredNumber = shardRoutingTable.getSize(); + } else { + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + + if (sizeActive < requiredNumber) { + String message = LoggerMessageFormat.format( + "not enough active copies to meet write consistency of [{}] (have [{}], needed [{}])", + consistencyLevel, sizeActive, requiredNumber + ); + logger.trace(message + ", scheduling a retry"); + return message; + } else { + return null; + } + } + + void finishAsFailed(Throwable failure) { + if (finished.compareAndSet(false, true)) { + Releasables.close(indexShardReference); + logger.trace("operation failed", failure); + listener.onFailure(failure); + } else { + assert false : "finishAsFailed called but operation is already finished"; + } + } + + /** + * upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} + */ + void finishAndMoveToReplication(ReplicationPhase replicationPhase) { + if (finished.compareAndSet(false, true)) { + replicationPhase.run(); + } else { + assert false : "finishAndMoveToReplication called but operation is already finished"; + } + } + + } + public static class RetryOnReplicaException extends ElasticsearchException { - public RetryOnReplicaException(ShardId shardId, String msg) { - super(msg); + public RetryOnReplicaException(ShardId shardId, String msg, Object... args) { + super(msg, args); setShard(shardId); } @@ -289,9 +455,9 @@ public void onTimeout(TimeValue timeout) { }); } else { try { - failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t); + failReplicaIfNeeded(request.internalShardRouting.getIndex(), request.internalShardRouting.id(), t); } catch (Throwable unexpected) { - logger.error("{} unexpected error while failing replica", request.internalShardId.id(), unexpected); + logger.error("{} unexpected error while failing replica", request.internalShardRouting.id(), unexpected); } finally { responseWithFailure(t); } @@ -309,8 +475,8 @@ protected void responseWithFailure(Throwable t) { @Override protected void doRun() throws Exception { - try (Releasable shardReference = getIndexShardOperationsCounter(request.internalShardId)) { - shardOperationOnReplica(request.internalShardId, request); + try (Releasable shardReference = getIndexShardOperationsCounter(request.internalShardRouting.shardId())) { + shardOperationOnReplica(request.internalShardRouting.shardId(), request); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -320,15 +486,16 @@ protected class PrimaryOperationRequest { public final ShardId shardId; public final Request request; - public PrimaryOperationRequest(int shardId, String index, Request request) { - this.shardId = new ShardId(index, shardId); + public PrimaryOperationRequest(ShardId shardId, Request request) { + this.shardId = shardId; this.request = request; } } public static class RetryOnPrimaryException extends ElasticsearchException { - public RetryOnPrimaryException(ShardId shardId, String msg) { - super(msg); + + public RetryOnPrimaryException(ShardId shardId, String msg, Object... args) { + super(msg, args); setShard(shardId); } @@ -348,8 +515,6 @@ final class PrimaryPhase extends AbstractRunnable { private final InternalRequest internalRequest; private final ClusterStateObserver observer; private final AtomicBoolean finished = new AtomicBoolean(false); - private volatile Releasable indexShardReference; - PrimaryPhase(Request request, ActionListener listener) { this.internalRequest = new InternalRequest(request); @@ -382,7 +547,7 @@ protected void doRun() { retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); return; } - routeRequestOrPerformLocally(primary, shardIt); + moveToPrimaryAction(primary); } /** @@ -437,66 +602,52 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) { /** * send the request to the node holding the primary or execute if local */ - protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) { - if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { - try { - if (internalRequest.request().operationThreaded()) { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - finishAsFailed(t); - } + protected void moveToPrimaryAction(final ShardRouting primary) { + DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId()); + Request request = internalRequest.request(); + request.internalShardRouting = primary; + transportService.sendRequest(node, transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler() { - @Override - protected void doRun() throws Exception { - performOnPrimary(primary, shardsIt); - } - }); - } else { - performOnPrimary(primary, shardsIt); - } - } catch (Throwable t) { - finishAsFailed(t); + @Override + public Response newInstance() { + return newResponseInstance(); } - } else { - DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId()); - transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler() { - - @Override - public Response newInstance() { - return newResponseInstance(); - } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public void handleResponse(Response response) { - finishOnRemoteSuccess(response); - } + @Override + public void handleResponse(Response response) { + finishOnRemoteSuccess(response); + } - @Override - public void handleException(TransportException exp) { - try { - // if we got disconnected from the node, or the node / shard is not in the right state (being closed) - if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || - retryPrimaryException(exp)) { - internalRequest.request().setCanHaveDuplicates(); - // we already marked it as started when we executed it (removed the listener) so pass false - // to re-add to the cluster listener - logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); - retry(exp); + @Override + public void handleException(TransportException exp) { + try { + Throwable cause = exp.getCause(); + // if we got disconnected from the node, or the node / shard is not in the right state (being closed) + if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || + cause instanceof UnavailableShardsException || retryPrimaryException(cause)) { + internalRequest.request().setCanHaveDuplicates(); + // we already marked it as started when we executed it (removed the listener) so pass false + // to re-add to the cluster listener + logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); + if (cause instanceof UnavailableShardsException) { + UnavailableShardsException use = (UnavailableShardsException) cause; + retryBecauseUnavailable(use.getShardId(), use.getMessage()); } else { - finishAsFailed(exp); + retry(exp); } - } catch (Throwable t) { - finishWithUnexpectedFailure(t); + } else { + finishAsFailed(exp); } + } catch (Throwable t) { + finishWithUnexpectedFailure(t); } - }); - } + } + }); } void retry(Throwable failure) { @@ -506,9 +657,6 @@ void retry(Throwable failure) { finishAsFailed(failure); return; } - // make it threaded operation so we fork on the discovery listener thread - internalRequest.request().operationThreaded(true); - observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { @@ -528,21 +676,8 @@ public void onTimeout(TimeValue timeout) { }); } - /** - * upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} - */ - void finishAndMoveToReplication(ReplicationPhase replicationPhase) { - if (finished.compareAndSet(false, true)) { - replicationPhase.run(); - } else { - assert false : "finishAndMoveToReplication called but operation is already finished"; - } - } - - void finishAsFailed(Throwable failure) { if (finished.compareAndSet(false, true)) { - Releasables.close(indexShardReference); logger.trace("operation failed", failure); listener.onFailure(failure); } else { @@ -553,7 +688,6 @@ void finishAsFailed(Throwable failure) { void finishWithUnexpectedFailure(Throwable failure) { logger.warn("unexpected error during the primary phase for action [{}]", failure, actionName); if (finished.compareAndSet(false, true)) { - Releasables.close(indexShardReference); listener.onFailure(failure); } else { assert false : "finishWithUnexpectedFailure called but operation is already finished"; @@ -569,99 +703,6 @@ void finishOnRemoteSuccess(Response response) { } } - /** - * perform the operation on the node holding the primary - */ - void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) { - final String writeConsistencyFailure = checkWriteConsistency(primary); - if (writeConsistencyFailure != null) { - retryBecauseUnavailable(primary.shardId(), writeConsistencyFailure); - return; - } - final ReplicationPhase replicationPhase; - try { - indexShardReference = getIndexShardOperationsCounter(primary.shardId()); - PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request()); - Tuple primaryResponse = shardOperationOnPrimary(observer.observedState(), por); - logger.trace("operation completed on primary [{}]", primary); - replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference); - } catch (Throwable e) { - internalRequest.request.setCanHaveDuplicates(); - // shard has not been allocated yet, retry it here - if (retryPrimaryException(e)) { - logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage()); - // We have to close here because when we retry we will increment get a new reference on index shard again and we do not want to - // increment twice. - Releasables.close(indexShardReference); - // We have to reset to null here because whe we retry it might be that we never get to the point where we assign a new reference - // (for example, in case the operation was rejected because queue is full). In this case we would release again once one of the finish methods is called. - indexShardReference = null; - retry(e); - return; - } - if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { - if (logger.isTraceEnabled()) { - logger.trace(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); - } - } - finishAsFailed(e); - return; - } - finishAndMoveToReplication(replicationPhase); - } - - /** - * checks whether we can perform a write based on the write consistency setting - * returns **null* if OK to proceed, or a string describing the reason to stop - */ - String checkWriteConsistency(ShardRouting shard) { - if (checkWriteConsistency == false) { - return null; - } - - final WriteConsistencyLevel consistencyLevel; - if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) { - consistencyLevel = internalRequest.request().consistencyLevel(); - } else { - consistencyLevel = defaultWriteConsistencyLevel; - } - final int sizeActive; - final int requiredNumber; - IndexRoutingTable indexRoutingTable = observer.observedState().getRoutingTable().index(shard.index()); - if (indexRoutingTable != null) { - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId()); - if (shardRoutingTable != null) { - sizeActive = shardRoutingTable.activeShards().size(); - if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { - // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) - requiredNumber = (shardRoutingTable.getSize() / 2) + 1; - } else if (consistencyLevel == WriteConsistencyLevel.ALL) { - requiredNumber = shardRoutingTable.getSize(); - } else { - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - - if (sizeActive < requiredNumber) { - logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.", - shard.shardId(), consistencyLevel, sizeActive, requiredNumber); - return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ")."; - } else { - return null; - } - } - void retryBecauseUnavailable(ShardId shardId, String message) { retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() + "], request: " + internalRequest.request().toString())); } @@ -881,68 +922,25 @@ void performOnReplica(final ShardRouting shard, final String nodeId) { return; } - replicaRequest.internalShardId = shardIt.shardId(); - - if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { - final DiscoveryNode node = observer.observedState().nodes().get(nodeId); - transportService.sendRequest(node, transportReplicaAction, replicaRequest, - transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty vResponse) { - onReplicaSuccess(); - } - - @Override - public void handleException(TransportException exp) { - onReplicaFailure(nodeId, exp); - logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest); - if (ignoreReplicaException(exp) == false) { - logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node); - shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp); - } - } - - }); - } else { - if (replicaRequest.operationThreaded()) { - try { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - protected void doRun() { - try { - shardOperationOnReplica(shard.shardId(), replicaRequest); - onReplicaSuccess(); - } catch (Throwable e) { - onReplicaFailure(nodeId, e); - failReplicaIfNeeded(shard.index(), shard.id(), e); - } - } - - // we must never reject on because of thread pool capacity on replicas - @Override - public boolean isForceExecution() { - return true; - } + replicaRequest.internalShardRouting = shard; + final DiscoveryNode node = observer.observedState().nodes().get(nodeId); + transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty vResponse) { + onReplicaSuccess(); + } - @Override - public void onFailure(Throwable t) { - onReplicaFailure(nodeId, t); - } - }); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - onReplicaFailure(nodeId, e); - } - } else { - try { - shardOperationOnReplica(shard.shardId(), replicaRequest); - onReplicaSuccess(); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - onReplicaFailure(nodeId, e); + @Override + public void handleException(TransportException exp) { + onReplicaFailure(nodeId, exp); + logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest); + if (ignoreReplicaException(exp) == false) { + logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node); + shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp); } } - } + + }); } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index c9401ca5392ec..fba2f23852e15 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -131,7 +131,6 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) { .routing(request.routing()) .parent(request.parent()) .consistencyLevel(request.consistencyLevel()); - indexRequest.operationThreaded(false); if (request.versionType() != VersionType.INTERNAL) { // in all but the internal versioning mode, we want to create the new document using the given version. indexRequest.version(request.version()).versionType(request.versionType()); @@ -227,13 +226,11 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) { .consistencyLevel(request.consistencyLevel()) .timestamp(timestamp).ttl(ttl) .refresh(request.refresh()); - indexRequest.operationThreaded(false); return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType); } else if ("delete".equals(operation)) { DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .version(updateVersion).versionType(request.versionType()) .consistencyLevel(request.consistencyLevel()); - deleteRequest.operationThreaded(false); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 69f06cef1f129..209ab686ce5b3 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -50,9 +50,6 @@ public RestDeleteAction(Settings settings, RestController controller, Client cli @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id")); - - deleteRequest.operationThreaded(true); - deleteRequest.routing(request.param("routing")); deleteRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index 3d3ecdfa880ed..d0d0fe68a13c1 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -70,7 +70,6 @@ public void handleRequest(RestRequest request, RestChannel channel, final Client @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); - indexRequest.operationThreaded(true); indexRequest.routing(request.param("routing")); indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.timestamp(request.param("timestamp")); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b70589ce52d9a..c5ee29de64d20 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -77,7 +77,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { } }); - private final TransportService.Adapter adapter; + protected final TransportService.Adapter adapter; // tracer log diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 10b344933348f..e3df5792dc4e5 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -194,7 +194,7 @@ public void testAnalyze() { @Test public void testIndex() { - String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[r]"}; + String[] indexShardActions = new String[]{IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; interceptTransportActions(indexShardActions); IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value"); @@ -206,7 +206,7 @@ public void testIndex() { @Test public void testDelete() { - String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[r]"}; + String[] deleteShardActions = new String[]{DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"}; interceptTransportActions(deleteShardActions); DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id"); @@ -219,7 +219,7 @@ public void testDelete() { @Test public void testUpdate() { //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -235,7 +235,7 @@ public void testUpdate() { @Test public void testUpdateUpsert() { //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -250,7 +250,7 @@ public void testUpdateUpsert() { @Test public void testUpdateDelete() { //update action goes to the primary, delete op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -265,7 +265,7 @@ public void testUpdateDelete() { @Test public void testBulk() { - String[] bulkShardActions = new String[]{BulkAction.NAME + "[s]", BulkAction.NAME + "[s][r]"}; + String[] bulkShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(bulkShardActions); List indices = new ArrayList<>(); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index d08840b077184..bd6a7b93e260c 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -58,10 +58,7 @@ import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -69,7 +66,6 @@ import java.io.IOException; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -103,7 +99,48 @@ public void setUp() throws Exception { super.setUp(); transport = new CapturingTransport(); clusterService = new TestClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool) { + + @Override + @SuppressWarnings("unchecked") + public void sendRequest(DiscoveryNode node, final String action, TransportRequest request, TransportRequestOptions options, final TransportResponseHandler handler) { + if (action.endsWith("[p]")) { + try { + adapter.getRequestHandler(action).getHandler().messageReceived(request, new TransportChannel() { + @Override + public String action() { + return action; + } + + @Override + public String getProfileName() { + return "default"; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + handler.handleResponse((T) response); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + sendResponse(response); + } + + @Override + public void sendResponse(Throwable error) throws IOException { + handler.handleException(new TransportException("error", error)); + } + }); + } catch (Exception e) { + handler.handleException(new TransportException("error", e)); + } + } else { + super.sendRequest(node, action, request, options, handler); + } + } + + }; transportService.start(); action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); count.set(1); @@ -304,9 +341,9 @@ public void testRoutingToPrimary() { TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); assertTrue(primaryPhase.checkBlocks()); - primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt()); if (primaryNodeId.equals(clusterService.localNode().id())) { logger.info("--> primary is assigned locally, testing for execution"); + primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard()); assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get()); if (transport.capturedRequests().length > 0) { assertIndexShardCounter(2); @@ -314,11 +351,10 @@ public void testRoutingToPrimary() { assertIndexShardCounter(1); } } else { - logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); - final List capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId); - assertThat(capturedRequests, notNullValue()); - assertThat(capturedRequests.size(), equalTo(1)); - assertThat(capturedRequests.get(0).action, equalTo("testAction")); + logger.info("--> primary is assigned to [{}], checking request is going to be retried at some point", primaryNodeId); + assertThat(clusterService.getListeners().size(), equalTo(0)); + primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard()); + assertThat(clusterService.getListeners().size(), equalTo(1)); assertIndexShardUninitialized(); } } @@ -333,6 +369,7 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti final int totalShards = 1 + assignedReplicas + unassignedReplicas; final boolean passesWriteConsistency; Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); + request.internalShardRouting = ShardRouting.newUnassigned(index, 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); switch (request.consistencyLevel()) { case ONE: passesWriteConsistency = true; @@ -368,8 +405,9 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti PlainActionFuture listener = new PlainActionFuture<>(); TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.AsyncPrimaryAction primaryAction = action.new AsyncPrimaryAction(request, listener); if (passesWriteConsistency) { - assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); + assertThat(primaryAction.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); primaryPhase.run(); assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get()); if (assignedReplicas > 0) { @@ -378,7 +416,7 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti assertIndexShardCounter(1); } } else { - assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue()); + assertThat(primaryAction.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue()); primaryPhase.run(); assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get()); assertIndexShardUninitialized(); @@ -661,11 +699,9 @@ static class Request extends ReplicationRequest { public AtomicInteger processedOnReplicas = new AtomicInteger(); Request() { - this.operationThreaded(randomBoolean()); } Request(ShardId shardId) { - this(); this.shardId = shardId.id(); this.index(shardId.index().name()); // keep things simple @@ -784,7 +820,7 @@ private Tuple throwException(ShardId shardId) { @Override protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) { - throwException(shardRequest.internalShardId); + throwException(shardRequest.internalShardRouting.shardId()); } } diff --git a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java b/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java index 50637cb801aa1..bb2fd17b0de36 100644 --- a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java +++ b/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java @@ -57,7 +57,7 @@ public void testWriteConsistencyLevelReplication2() throws Exception { fail("can't index, does not match consistency"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); + assertThat(e.getMessage(), equalTo("[test][0] not enough active copies to meet write consistency of [QUORUM] (have [1], needed [2]) Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); // but really, all is well } @@ -80,7 +80,7 @@ public void testWriteConsistencyLevelReplication2() throws Exception { fail("can't index, does not match consistency"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); + assertThat(e.getMessage(), equalTo("[test][0] not enough active copies to meet write consistency of [ALL] (have [2], needed [3]) Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); // but really, all is well } diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index a1b250875706b..91e7700bed64f 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -285,6 +285,7 @@ public void onResponse(IndexResponse response) { @Override public void onFailure(Throwable e) { + logger.error("Error during indexing", e); docIndexResponse.set(e); } }); diff --git a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java index 6a55fbd2577b3..aa652dca1e228 100644 --- a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -255,6 +254,10 @@ public void close() throws ElasticsearchException { throw new UnsupportedOperationException(); } + public Collection getListeners() { + return listeners; + } + class NotifyTimeout implements Runnable { final TimeoutClusterStateListener listener; final TimeValue timeout; diff --git a/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java b/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java index 476b89aa1a903..3ae2b5f38fda5 100644 --- a/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.test.transport; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; @@ -33,7 +32,8 @@ /** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ public class CapturingTransport implements Transport { - private TransportServiceAdapter adapter; + + protected TransportServiceAdapter adapter; static public class CapturedRequest { final public DiscoveryNode node; From d5b551c26324efd30ebcbe9bacd60ec01aa47606 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Jul 2015 10:19:10 +0200 Subject: [PATCH 02/10] don't fall back to the coordinating node if RetryOnPrimaryException occurs but retry on the node holding the primary shard --- .../NoShardAvailableActionException.java | 8 ++-- .../TransportReplicationAction.java | 39 +++++++++++++++---- .../indices/state/RareClusterStateIT.java | 1 - 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java b/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java index 00562af99c5ce..c4b3e2ad738b9 100644 --- a/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java +++ b/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java @@ -35,12 +35,12 @@ public NoShardAvailableActionException(ShardId shardId) { this(shardId, null); } - public NoShardAvailableActionException(ShardId shardId, String msg) { - this(shardId, msg, null); + public NoShardAvailableActionException(ShardId shardId, String msg, Object... args) { + this(shardId, msg, null, args); } - public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) { - super(msg, cause); + public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause, Object... args) { + super(msg, cause, args); setShard(shardId); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index c65908e77de04..096c6f6d54a2e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -21,10 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; -import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.*; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest.OpType; @@ -290,14 +287,39 @@ public AsyncPrimaryAction(Request request, ActionListener listener) { @Override public void onFailure(Throwable t) { - finishAsFailed(t); + if (t instanceof RetryOnPrimaryException) { + if (observer.isTimedOut()) { + // we running as a last attempt after a timeout has happened. don't retry + finishAsFailed(t); + return; + } + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + threadPool.executor(executor).execute(AsyncPrimaryAction.this); + } + + @Override + public void onClusterServiceClose() { + finishAsFailed(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + // Try one more time... + threadPool.executor(executor).execute(AsyncPrimaryAction.this); + } + }); + } else { + finishAsFailed(t); + } } @Override protected void doRun() throws Exception { final ShardRouting primary = request.internalShardRouting; if (clusterService.localNode().id().equals(primary.currentNodeId()) == false) { - throw new RetryOnPrimaryException(primary.shardId(), "shard [{}] not assigned to this node [{}], but node [{}]", primary.shardId(), clusterService.localNode().id(), primary.currentNodeId()); + throw new NoShardAvailableActionException(primary.shardId(), "shard [{}] not assigned to this node [{}], but node [{}]", primary.shardId(), clusterService.localNode().id(), primary.currentNodeId()); } final ShardIterator shardIt = shards(observer.observedState(), internalRequest); performOnPrimary(primary, shardIt); @@ -328,7 +350,9 @@ void performOnPrimary(ShardRouting primary, final ShardIterator shardsIt) { logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); } } - finishAsFailed(e); + Releasables.close(indexShardReference); + indexShardReference = null; + onFailure(e); return; } finishAndMoveToReplication(replicationPhase); @@ -388,6 +412,7 @@ String checkWriteConsistency(ShardRouting shard) { void finishAsFailed(Throwable failure) { if (finished.compareAndSet(false, true)) { Releasables.close(indexShardReference); + indexShardReference = null; logger.trace("operation failed", failure); listener.onFailure(failure); } else { diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 91e7700bed64f..a1b250875706b 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -285,7 +285,6 @@ public void onResponse(IndexResponse response) { @Override public void onFailure(Throwable e) { - logger.error("Error during indexing", e); docIndexResponse.set(e); } }); From 5ba42832b47566f98601741978b63aa3cc39071e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Jul 2015 10:54:11 +0200 Subject: [PATCH 03/10] In the event that a primary or replica action needs to be retried locally then use the transport service instead of the threadpool for forking a new thread. This way there is one place where new threads are being spawned. --- .../TransportReplicationAction.java | 49 +++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 096c6f6d54a2e..e2a3f7e202ee3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -296,7 +296,7 @@ public void onFailure(Throwable t) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - threadPool.executor(executor).execute(AsyncPrimaryAction.this); + retryLocally(); } @Override @@ -307,7 +307,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { // Try one more time... - threadPool.executor(executor).execute(AsyncPrimaryAction.this); + retryLocally(); } }); } else { @@ -431,6 +431,33 @@ void finishAndMoveToReplication(ReplicationPhase replicationPhase) { } } + private void retryLocally() { + // TODO: maybe we should have dedicated method for local executions? (we can overload this one, which doesn't have DiscoverNode parameter) + transportService.sendRequest(clusterService.localNode(), transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler() { + @Override + public Response newInstance() { + return newResponseInstance(); + } + + @Override + public void handleResponse(Response response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + // to prevent TransportException wrapping another TransportException: + Throwable failure = exp.getCause() != null ? exp.getCause() : exp; + listener.onFailure(failure); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } + } public static class RetryOnReplicaException extends ElasticsearchException { @@ -465,7 +492,23 @@ public void onFailure(Throwable t) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - threadPool.executor(executor).execute(AsyncReplicaAction.this); + // TODO: maybe we should have dedicated method for local executions? (we can overload this one, which doesn't have DiscoverNode parameter) + transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + + @Override + public void handleResponse(TransportResponse.Empty response) { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (IOException e) { + logger.error("failed to send response", e); + } + } + + @Override + public void handleException(TransportException exp) { + responseWithFailure(exp); + } + }); } @Override From 405402c38c17dedda0a10f49e7139ef2d9d52442 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 23 Jul 2015 21:04:47 +0200 Subject: [PATCH 04/10] test: removed the custom shardId integer and instead use the ReplicationRequest#internalShardRouting --- .../replication/ShardReplicationTests.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index bd6a7b93e260c..b119b95bb8e4c 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -163,7 +163,8 @@ void assertListenerThrows(String msg, PlainActionFuture listener, Class listener = new PlainActionFuture<>(); ClusterBlocks.Builder block = ClusterBlocks.builder() @@ -177,13 +178,13 @@ public void testBlocks() throws ExecutionException, InterruptedException { .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); listener = new PlainActionFuture<>(); - primaryPhase = action.new PrimaryPhase(new Request().timeout("5ms"), listener); + primaryPhase = action.new PrimaryPhase(new Request(shardId).timeout("5ms"), listener); assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks()); assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); listener = new PlainActionFuture<>(); - primaryPhase = action.new PrimaryPhase(new Request(), listener); + primaryPhase = action.new PrimaryPhase(new Request(shardId), listener); assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks()); assertFalse("primary phase should wait on retryable block", listener.isDone()); @@ -483,7 +484,7 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final ShardIterator shardIt = shardRoutingTable.shardsIt(); final ShardId shardId = shardIt.shardId(); - final Request request = new Request(); + final Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); @@ -627,18 +628,20 @@ public void testReplicasCounter() throws Exception { Thread t = new Thread() { public void run() { try { - replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel()); + Request r = new Request(shardId); + replicaOperationTransportHandler.messageReceived(r, createTransportChannel()); } catch (Exception e) { + logger.error("Error while handler request", e); } } }; t.start(); // shard operation should be ongoing, so the counter is at 2 // we have to wait here because increment happens in thread - awaitBusy(new Predicate() { + assertBusy(new Runnable() { @Override - public boolean apply(@Nullable Object input) { - return count.get() == 2; + public void run() { + assertThat(count.get(), equalTo(2)); } }); ((ActionWithDelay) action).countDownLatch.countDown(); @@ -694,7 +697,6 @@ public void close() { } static class Request extends ReplicationRequest { - int shardId; public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicInteger processedOnReplicas = new AtomicInteger(); @@ -702,21 +704,19 @@ static class Request extends ReplicationRequest { } Request(ShardId shardId) { - this.shardId = shardId.id(); this.index(shardId.index().name()); + this.internalShardRouting = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), "_id", true, ShardRoutingState.STARTED, 1); // keep things simple } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(shardId); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardId = in.readVInt(); } } @@ -752,7 +752,7 @@ protected void shardOperationOnReplica(ShardId shardId, Request request) { @Override protected ShardIterator shards(ClusterState clusterState, InternalRequest request) { - return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().shardId).shardsIt(); + return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().internalShardRouting.getId()).shardsIt(); } @Override From 07ef93f8412d88d8693811eeb8789f10b42d61ce Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 4 Aug 2015 17:05:44 +0200 Subject: [PATCH 05/10] Added test from Britta's PR #12574 that shows how an index request can get stuck in an endless redirect loop between nodes due to slow cluster state processing. --- .../discovery/EndlessIndexingLoopIT.java | 181 ++++++++++++++++++ .../store/IndicesStoreIntegrationIT.java | 4 +- 2 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java diff --git a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java new file mode 100644 index 0000000000000..1ffa3b76e0e71 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.store.IndicesStoreIntegrationIT; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.disruption.BlockClusterStateProcessing; +import org.elasticsearch.test.disruption.SingleNodeDisruption; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.Thread.sleep; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +/** + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +public class EndlessIndexingLoopIT extends ESIntegTestCase { + + /** + * When a primary is relocating from node_1 to node_2, there can be a short time where the old primary is removed from the node + * already (closed, not deleted) but the new primary is still in POST_RECOVERY. + * In this case we must make sure node_1 and node_2 do not send an index command back and forth endlessly. + *

+ * Course of events: + * 0. primary ([index][0]) relocates from node_1 to node_2 + * 1. node_2 is done recovering, moves its shard to IndexShardState.POST_RECOVERY and sends a message to master that the shard is ShardRoutingState.STARTED + * Cluster state 1: + * node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1) + * node_2: [index][0] INITIALIZING (ShardRoutingState), (at this point already POST_RECOVERY from IndexShardState perspective on node_2) + * 2. master receives shard started and updates cluster state to: + * Cluster state 2: + * node_1: [index][0] no shard + * node_2: [index][0] STARTED (ShardRoutingState), (at this point still in POST_RECOVERY from IndexShardState perspective on node_2) + * master sends this to node_1 and node_2 + * 3. node_1 receives the new cluster state and removes its shard because it is not allocated on node_1 anymore + * 4. index a document + * At this point node_1 is already on cluster state 2 and does not have the shard anymore so it forwards the request to node_2. + * But node_2 is behind with cluster state processing, is still on cluster state 1 and therefore has the shard in + * IndexShardState.POST_RECOVERY and thinks node_1 has the primary. So it will send the request back to node_1. + * This goes on until either node_2 finally catches up and processes cluster state 2 or both nodes OOM. + */ + @Test + public void testIndexOperationNotSentBackAndForthAllTheTime() throws Exception { + Settings mockTransportSetting = Settings.builder().put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build(); + Future masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(mockTransportSetting); + Future node1Future = internalCluster().startDataOnlyNodeAsync(mockTransportSetting); + final String masterNode = masterNodeFuture.get(); + final String node_1 = node1Future.get(); + + logger.info("--> creating index [test] with one shard and zero replica"); + assertAcked(prepareCreate("test").setSettings( + Settings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + .addMapping("doc", jsonBuilder().startObject().startObject("doc") + .startObject("properties").startObject("text").field("type", "string").endObject().endObject() + .endObject().endObject()) + ); + ensureGreen("test"); + logger.info("--> starting one more data node"); + Future node2NameFuture = internalCluster().startDataOnlyNodeAsync(mockTransportSetting); + final String node_2 = node2NameFuture.get(); + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("3") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + logger.info("--> move shard from node_1 to node_2, and wait for relocation to finish"); + + // register Tracer that will signal when relocations starts and ends + MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); + CountDownLatch beginRelocationLatchNode2 = new CountDownLatch(1); + CountDownLatch endRelocationLatchNode2 = new CountDownLatch(1); + transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatchNode2, endRelocationLatchNode2)); + // register a Tracer that will count the number of sent indexing requests on node_2 + final AtomicInteger numSentIndexRequests = new AtomicInteger(0); + transportServiceNode2.addTracer(new MockTransportService.Tracer() { + @Override + public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { + if (action.equals(IndexAction.NAME)) { + numSentIndexRequests.incrementAndGet(); + } + } + }); + + // node_2 should fall behind with cluster state processing. we start the disruption later when relocation starts + SingleNodeDisruption disruptionNode2 = new BlockClusterStateProcessing(node_2, getRandom()); + internalCluster().setDisruptionScheme(disruptionNode2); + + logger.info("--> move shard from {} to {}", node_1, node_2); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).get(); + + logger.info("--> wait for relocation to start"); + beginRelocationLatchNode2.await(); + // start to block cluster state processing for node_2 so that it will be stuck with the cluster state 1 in above description + disruptionNode2.startDisrupting(); + + logger.info("--> wait for relocation to finish"); + endRelocationLatchNode2.await(); + // now node_2 is still on cluster state 1 but will have have the shard moved to POST_RECOVERY + final Client node1Client = internalCluster().client(node_1); + final Client node2Client = internalCluster().client(node_2); + // wait until node_1 actually has removed the shard + assertBusy(new Runnable() { + @Override + public void run() { + ClusterState clusterState = node1Client.admin().cluster().prepareState().setLocal(true).get().getState(); + // get the node id from the name. TODO: Is there a better way to do this? + String nodeId = null; + for (RoutingNode node : clusterState.getRoutingNodes()) { + if (node.node().name().equals(node_1)) { + nodeId = node.nodeId(); + } + } + assertNotNull(nodeId); + // check that node_1 actually has removed the shard + assertFalse(clusterState.routingNodes().routingNodeIter(nodeId).hasNext()); + } + }); + + logger.info("--> cluster state on {} {}", node_1, node1Client.admin().cluster().prepareState().setLocal(true).get().getState().prettyPrint()); + logger.info("--> cluster state on {} {}", node_2, node2Client.admin().cluster().prepareState().setLocal(true).get().getState().prettyPrint()); + logger.info("--> index doc"); + Future indexResponseFuture = client().prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").execute(); + // wait a little and then see how often the indexing request was sent back and forth + sleep(1000); + // stop disrupting so that node_2 can finally apply cluster state 2 + logger.info("--> stop disrupting"); + disruptionNode2.stopDisrupting(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + indexResponseFuture.get(); + refresh(); + assertThat(client().prepareCount().get().getCount(), equalTo(1l)); + // check that only one indexing request was sent at most + assertThat(numSentIndexRequests.get(), lessThanOrEqualTo(1)); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index 898415152ca78..35d54ca8e0296 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -426,12 +426,12 @@ public boolean apply(Object o) { * state processing when a recover starts and only unblocking it shortly after the node receives * the ShardActiveRequest. */ - static class ReclocationStartEndTracer extends MockTransportService.Tracer { + public static class ReclocationStartEndTracer extends MockTransportService.Tracer { private final ESLogger logger; private final CountDownLatch beginRelocationLatch; private final CountDownLatch receivedShardExistsRequestLatch; - ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { + public ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { this.logger = logger; this.beginRelocationLatch = beginRelocationLatch; this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; From 51bfdb80ef671c4d3dd0f9f68b454ea1c8656ea1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 4 Aug 2015 17:07:01 +0200 Subject: [PATCH 06/10] Execute the retry logic only on the coordinating node. --- .../TransportReplicationAction.java | 54 +------------------ 1 file changed, 1 insertion(+), 53 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index e2a3f7e202ee3..4cbc93f7e0030 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -287,32 +287,7 @@ public AsyncPrimaryAction(Request request, ActionListener listener) { @Override public void onFailure(Throwable t) { - if (t instanceof RetryOnPrimaryException) { - if (observer.isTimedOut()) { - // we running as a last attempt after a timeout has happened. don't retry - finishAsFailed(t); - return; - } - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - retryLocally(); - } - - @Override - public void onClusterServiceClose() { - finishAsFailed(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - // Try one more time... - retryLocally(); - } - }); - } else { - finishAsFailed(t); - } + finishAsFailed(t); } @Override @@ -431,33 +406,6 @@ void finishAndMoveToReplication(ReplicationPhase replicationPhase) { } } - private void retryLocally() { - // TODO: maybe we should have dedicated method for local executions? (we can overload this one, which doesn't have DiscoverNode parameter) - transportService.sendRequest(clusterService.localNode(), transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler() { - @Override - public Response newInstance() { - return newResponseInstance(); - } - - @Override - public void handleResponse(Response response) { - listener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - // to prevent TransportException wrapping another TransportException: - Throwable failure = exp.getCause() != null ? exp.getCause() : exp; - listener.onFailure(failure); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - } - } public static class RetryOnReplicaException extends ElasticsearchException { From b35f12c8d54fb7d6dfb496b2fad6e571482916d7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 10 Aug 2015 18:21:37 +0200 Subject: [PATCH 07/10] applied feedback --- .../support/replication/TransportReplicationAction.java | 2 +- .../action/support/replication/ShardReplicationTests.java | 4 ++++ .../org/elasticsearch/discovery/EndlessIndexingLoopIT.java | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 4cbc93f7e0030..267660ed8bbeb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -260,7 +260,7 @@ public void onFailure(Throwable e) { try { channel.sendResponse(e); } catch (Throwable t) { - logger.warn("failed to send response for get", t); + logger.warn("failed to send response for a write request", t); } } }; diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index b119b95bb8e4c..511a3308b009f 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -352,6 +352,10 @@ public void testRoutingToPrimary() { assertIndexShardCounter(1); } } else { + // The coordinating node says primary shard is on the local node, but the local node doesn' have it, + // We need to retry, something has changed in time between the coordination node received the request + // and the node holding the primary shard processing the write request. + // So we fail and retry (wait on a new cluster update or the timeout to expire) again from the coordinating node. logger.info("--> primary is assigned to [{}], checking request is going to be retried at some point", primaryNodeId); assertThat(clusterService.getListeners().size(), equalTo(0)); primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard()); diff --git a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java index 1ffa3b76e0e71..29d1a0c67996c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java @@ -155,14 +155,14 @@ public void run() { } assertNotNull(nodeId); // check that node_1 actually has removed the shard - assertFalse(clusterState.routingNodes().routingNodeIter(nodeId).hasNext()); + assertFalse(clusterState.getRoutingNodes().routingNodeIter(nodeId).hasNext()); } }); logger.info("--> cluster state on {} {}", node_1, node1Client.admin().cluster().prepareState().setLocal(true).get().getState().prettyPrint()); logger.info("--> cluster state on {} {}", node_2, node2Client.admin().cluster().prepareState().setLocal(true).get().getState().prettyPrint()); logger.info("--> index doc"); - Future indexResponseFuture = client().prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").execute(); + Future indexResponseFuture = client(node_2).prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").execute(); // wait a little and then see how often the indexing request was sent back and forth sleep(1000); // stop disrupting so that node_2 can finally apply cluster state 2 From 2abe7c4d6ad9cf700f80855bec90386b4dc44989 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 19 Aug 2015 09:50:09 +0200 Subject: [PATCH 08/10] use mock transport via plugin instead of setting it directly via a setting --- .../org/elasticsearch/discovery/EndlessIndexingLoopIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java index 29d1a0c67996c..a449ec18be339 100644 --- a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java @@ -35,7 +35,6 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.junit.Test; @@ -80,7 +79,9 @@ public class EndlessIndexingLoopIT extends ESIntegTestCase { */ @Test public void testIndexOperationNotSentBackAndForthAllTheTime() throws Exception { - Settings mockTransportSetting = Settings.builder().put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()).build(); + Settings mockTransportSetting = Settings.builder() + .put("plugin.types", MockTransportService.TestPlugin.class.getName()) + .build(); Future masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(mockTransportSetting); Future node1Future = internalCluster().startDataOnlyNodeAsync(mockTransportSetting); final String masterNode = masterNodeFuture.get(); From 3f640763256159a5349e144d3977f74ccae6c4ea Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Aug 2015 13:28:17 +0200 Subject: [PATCH 09/10] bring back the primary shard chasing to the PrimaryPhase --- .../TransportReplicationAction.java | 116 ++++++++++++------ .../replication/ShardReplicationTests.java | 16 ++- 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 267660ed8bbeb..9f30b7025e95b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -293,6 +293,8 @@ public void onFailure(Throwable t) { @Override protected void doRun() throws Exception { final ShardRouting primary = request.internalShardRouting; + // Although this gets executed locally, this more of an assertion, but if change the primary action + // to be performed remotely this check is important to check before performing the action: if (clusterService.localNode().id().equals(primary.currentNodeId()) == false) { throw new NoShardAvailableActionException(primary.shardId(), "shard [{}] not assigned to this node [{}], but node [{}]", primary.shardId(), clusterService.localNode().id(), primary.currentNodeId()); } @@ -563,7 +565,7 @@ protected void doRun() { retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); return; } - moveToPrimaryAction(primary); + routeRequestOrPerformPrimaryActionLocally(primary); } /** @@ -618,52 +620,92 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) { /** * send the request to the node holding the primary or execute if local */ - protected void moveToPrimaryAction(final ShardRouting primary) { + protected void routeRequestOrPerformPrimaryActionLocally(final ShardRouting primary) { DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId()); - Request request = internalRequest.request(); - request.internalShardRouting = primary; - transportService.sendRequest(node, transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler() { + if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { + Request request = internalRequest.request(); + request.internalShardRouting = primary; + // this call is always local, but in the future we can send to remote nodes as well + transportService.sendRequest(node, transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler() { - @Override - public Response newInstance() { - return newResponseInstance(); - } + @Override + public Response newInstance() { + return newResponseInstance(); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public String executor() { + return ThreadPool.Names.SAME; + } - @Override - public void handleResponse(Response response) { - finishOnRemoteSuccess(response); - } + @Override + public void handleResponse(Response response) { + finishOnRemoteSuccess(response); + } - @Override - public void handleException(TransportException exp) { - try { - Throwable cause = exp.getCause(); - // if we got disconnected from the node, or the node / shard is not in the right state (being closed) - if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || - cause instanceof UnavailableShardsException || retryPrimaryException(cause)) { - internalRequest.request().setCanHaveDuplicates(); - // we already marked it as started when we executed it (removed the listener) so pass false - // to re-add to the cluster listener - logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); - if (cause instanceof UnavailableShardsException) { - UnavailableShardsException use = (UnavailableShardsException) cause; - retryBecauseUnavailable(use.getShardId(), use.getMessage()); + @Override + public void handleException(TransportException exp) { + try { + Throwable cause = exp.getCause(); + // if we got disconnected from the node, or the node / shard is not in the right state (being closed) + if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || + cause instanceof UnavailableShardsException || retryPrimaryException(cause)) { + internalRequest.request().setCanHaveDuplicates(); + // we already marked it as started when we executed it (removed the listener) so pass false + // to re-add to the cluster listener + logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); + if (cause instanceof UnavailableShardsException) { + UnavailableShardsException use = (UnavailableShardsException) cause; + retryBecauseUnavailable(use.getShardId(), use.getMessage()); + } else { + retry(exp); + } } else { + finishAsFailed(exp); + } + } catch (Throwable t) { + finishWithUnexpectedFailure(t); + } + } + }); + } else { + transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler() { + + @Override + public Response newInstance() { + return newResponseInstance(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(Response response) { + finishOnRemoteSuccess(response); + } + + @Override + public void handleException(TransportException exp) { + try { + // if we got disconnected from the node, or the node / shard is not in the right state (being closed) + if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || + retryPrimaryException(exp)) { + internalRequest.request().setCanHaveDuplicates(); + // we already marked it as started when we executed it (removed the listener) so pass false + // to re-add to the cluster listener + logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); retry(exp); + } else { + finishAsFailed(exp); } - } else { - finishAsFailed(exp); + } catch (Throwable t) { + finishWithUnexpectedFailure(t); } - } catch (Throwable t) { - finishWithUnexpectedFailure(t); } - } - }); + }); + } } void retry(Throwable failure) { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index 511a3308b009f..9ea557630b7c5 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -66,6 +66,7 @@ import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -342,9 +343,9 @@ public void testRoutingToPrimary() { TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); assertTrue(primaryPhase.checkBlocks()); + primaryPhase.routeRequestOrPerformPrimaryActionLocally(shardRoutingTable.primaryShard()); if (primaryNodeId.equals(clusterService.localNode().id())) { logger.info("--> primary is assigned locally, testing for execution"); - primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard()); assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get()); if (transport.capturedRequests().length > 0) { assertIndexShardCounter(2); @@ -352,14 +353,11 @@ public void testRoutingToPrimary() { assertIndexShardCounter(1); } } else { - // The coordinating node says primary shard is on the local node, but the local node doesn' have it, - // We need to retry, something has changed in time between the coordination node received the request - // and the node holding the primary shard processing the write request. - // So we fail and retry (wait on a new cluster update or the timeout to expire) again from the coordinating node. - logger.info("--> primary is assigned to [{}], checking request is going to be retried at some point", primaryNodeId); - assertThat(clusterService.getListeners().size(), equalTo(0)); - primaryPhase.moveToPrimaryAction(shardRoutingTable.primaryShard()); - assertThat(clusterService.getListeners().size(), equalTo(1)); + logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); + final List capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId); + assertThat(capturedRequests, notNullValue()); + assertThat(capturedRequests.size(), equalTo(1)); + assertThat(capturedRequests.get(0).action, equalTo("testAction")); assertIndexShardUninitialized(); } } From 356161e68e204fd66475303c7dc70079b0e7b9f2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Aug 2015 13:39:59 +0200 Subject: [PATCH 10/10] removed endless indexing test, since the chasing primary logic was brought back --- .../discovery/EndlessIndexingLoopIT.java | 182 ------------------ 1 file changed, 182 deletions(-) delete mode 100644 core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java diff --git a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java b/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java deleted file mode 100644 index a449ec18be339..0000000000000 --- a/core/src/test/java/org/elasticsearch/discovery/EndlessIndexingLoopIT.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery; - -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.store.IndicesStoreIntegrationIT; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.disruption.BlockClusterStateProcessing; -import org.elasticsearch.test.disruption.SingleNodeDisruption; -import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.lang.Thread.sleep; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -/** - */ -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) -public class EndlessIndexingLoopIT extends ESIntegTestCase { - - /** - * When a primary is relocating from node_1 to node_2, there can be a short time where the old primary is removed from the node - * already (closed, not deleted) but the new primary is still in POST_RECOVERY. - * In this case we must make sure node_1 and node_2 do not send an index command back and forth endlessly. - *

- * Course of events: - * 0. primary ([index][0]) relocates from node_1 to node_2 - * 1. node_2 is done recovering, moves its shard to IndexShardState.POST_RECOVERY and sends a message to master that the shard is ShardRoutingState.STARTED - * Cluster state 1: - * node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1) - * node_2: [index][0] INITIALIZING (ShardRoutingState), (at this point already POST_RECOVERY from IndexShardState perspective on node_2) - * 2. master receives shard started and updates cluster state to: - * Cluster state 2: - * node_1: [index][0] no shard - * node_2: [index][0] STARTED (ShardRoutingState), (at this point still in POST_RECOVERY from IndexShardState perspective on node_2) - * master sends this to node_1 and node_2 - * 3. node_1 receives the new cluster state and removes its shard because it is not allocated on node_1 anymore - * 4. index a document - * At this point node_1 is already on cluster state 2 and does not have the shard anymore so it forwards the request to node_2. - * But node_2 is behind with cluster state processing, is still on cluster state 1 and therefore has the shard in - * IndexShardState.POST_RECOVERY and thinks node_1 has the primary. So it will send the request back to node_1. - * This goes on until either node_2 finally catches up and processes cluster state 2 or both nodes OOM. - */ - @Test - public void testIndexOperationNotSentBackAndForthAllTheTime() throws Exception { - Settings mockTransportSetting = Settings.builder() - .put("plugin.types", MockTransportService.TestPlugin.class.getName()) - .build(); - Future masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(mockTransportSetting); - Future node1Future = internalCluster().startDataOnlyNodeAsync(mockTransportSetting); - final String masterNode = masterNodeFuture.get(); - final String node_1 = node1Future.get(); - - logger.info("--> creating index [test] with one shard and zero replica"); - assertAcked(prepareCreate("test").setSettings( - Settings.builder().put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) - .addMapping("doc", jsonBuilder().startObject().startObject("doc") - .startObject("properties").startObject("text").field("type", "string").endObject().endObject() - .endObject().endObject()) - ); - ensureGreen("test"); - logger.info("--> starting one more data node"); - Future node2NameFuture = internalCluster().startDataOnlyNodeAsync(mockTransportSetting); - final String node_2 = node2NameFuture.get(); - logger.info("--> running cluster_health"); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() - .setWaitForNodes("3") - .setWaitForRelocatingShards(0) - .get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - - logger.info("--> move shard from node_1 to node_2, and wait for relocation to finish"); - - // register Tracer that will signal when relocations starts and ends - MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2); - CountDownLatch beginRelocationLatchNode2 = new CountDownLatch(1); - CountDownLatch endRelocationLatchNode2 = new CountDownLatch(1); - transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatchNode2, endRelocationLatchNode2)); - // register a Tracer that will count the number of sent indexing requests on node_2 - final AtomicInteger numSentIndexRequests = new AtomicInteger(0); - transportServiceNode2.addTracer(new MockTransportService.Tracer() { - @Override - public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) { - if (action.equals(IndexAction.NAME)) { - numSentIndexRequests.incrementAndGet(); - } - } - }); - - // node_2 should fall behind with cluster state processing. we start the disruption later when relocation starts - SingleNodeDisruption disruptionNode2 = new BlockClusterStateProcessing(node_2, getRandom()); - internalCluster().setDisruptionScheme(disruptionNode2); - - logger.info("--> move shard from {} to {}", node_1, node_2); - internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).get(); - - logger.info("--> wait for relocation to start"); - beginRelocationLatchNode2.await(); - // start to block cluster state processing for node_2 so that it will be stuck with the cluster state 1 in above description - disruptionNode2.startDisrupting(); - - logger.info("--> wait for relocation to finish"); - endRelocationLatchNode2.await(); - // now node_2 is still on cluster state 1 but will have have the shard moved to POST_RECOVERY - final Client node1Client = internalCluster().client(node_1); - final Client node2Client = internalCluster().client(node_2); - // wait until node_1 actually has removed the shard - assertBusy(new Runnable() { - @Override - public void run() { - ClusterState clusterState = node1Client.admin().cluster().prepareState().setLocal(true).get().getState(); - // get the node id from the name. TODO: Is there a better way to do this? - String nodeId = null; - for (RoutingNode node : clusterState.getRoutingNodes()) { - if (node.node().name().equals(node_1)) { - nodeId = node.nodeId(); - } - } - assertNotNull(nodeId); - // check that node_1 actually has removed the shard - assertFalse(clusterState.getRoutingNodes().routingNodeIter(nodeId).hasNext()); - } - }); - - logger.info("--> cluster state on {} {}", node_1, node1Client.admin().cluster().prepareState().setLocal(true).get().getState().prettyPrint()); - logger.info("--> cluster state on {} {}", node_2, node2Client.admin().cluster().prepareState().setLocal(true).get().getState().prettyPrint()); - logger.info("--> index doc"); - Future indexResponseFuture = client(node_2).prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").execute(); - // wait a little and then see how often the indexing request was sent back and forth - sleep(1000); - // stop disrupting so that node_2 can finally apply cluster state 2 - logger.info("--> stop disrupting"); - disruptionNode2.stopDisrupting(); - clusterHealth = client().admin().cluster().prepareHealth() - .setWaitForRelocatingShards(0) - .get(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - indexResponseFuture.get(); - refresh(); - assertThat(client().prepareCount().get().getCount(), equalTo(1l)); - // check that only one indexing request was sent at most - assertThat(numSentIndexRequests.get(), lessThanOrEqualTo(1)); - } -} \ No newline at end of file