diff --git a/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java b/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java index 00562af99c5ce..c4b3e2ad738b9 100644 --- a/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java +++ b/core/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java @@ -35,12 +35,12 @@ public NoShardAvailableActionException(ShardId shardId) { this(shardId, null); } - public NoShardAvailableActionException(ShardId shardId, String msg) { - this(shardId, msg, null); + public NoShardAvailableActionException(ShardId shardId, String msg, Object... args) { + this(shardId, msg, null, args); } - public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) { - super(msg, cause); + public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause, Object... args) { + super(msg, cause, args); setShard(shardId); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2ca2dfe142ac2..25428eda262c8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -471,7 +471,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: [{}]", update); } operation.execute(indexShard); location = locationToSync(location, operation.getTranslogLocation()); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 97620829f3a48..0f684e66d63eb 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -195,7 +195,7 @@ protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) { } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { - throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: [{}]", update); } operation.execute(indexShard); processAfter(request, indexShard, operation.getTranslogLocation()); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 39a3dae75698b..c74597afcdd2f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -24,10 +24,10 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -41,12 +41,11 @@ public abstract class ReplicationRequest extends A public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - ShardId internalShardId; + ShardRouting internalShardRouting; protected TimeValue timeout = DEFAULT_TIMEOUT; protected String index; - private boolean threadedOperation = true; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private volatile boolean canHaveDuplicates = false; @@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) { super(originalRequest); this.timeout = request.timeout(); this.index = request.index(); - this.threadedOperation = request.operationThreaded(); this.consistencyLevel = request.consistencyLevel(); } @@ -91,23 +89,6 @@ public boolean canHaveDuplicates() { return canHaveDuplicates; } - /** - * Controls if the operation will be executed on a separate thread when executed locally. - */ - public final boolean operationThreaded() { - return threadedOperation; - } - - /** - * Controls if the operation will be executed on a separate thread when executed locally. Defaults - * to true when running in embedded mode. - */ - @SuppressWarnings("unchecked") - public final T operationThreaded(boolean threadedOperation) { - this.threadedOperation = threadedOperation; - return (T) this; - } - /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ @@ -174,19 +155,18 @@ public ActionRequestValidationException validate() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); if (in.readBoolean()) { - internalShardId = ShardId.readShardId(in); + internalShardRouting = ShardRouting.readShardRoutingEntry(in); } consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); timeout = TimeValue.readTimeValue(in); index = in.readString(); canHaveDuplicates = in.readBoolean(); - // no need to serialize threaded* parameters, since they only matter locally } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeOptionalStreamable(internalShardId); + out.writeOptionalStreamable(internalShardRouting); out.writeByte(consistencyLevel.id()); timeout.writeTo(out); out.writeString(index); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java index 7762eaee7dc0e..bafb33be6a73e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequestBuilder.java @@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Actiontrue when running in embedded mode. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setOperationThreaded(boolean threadedOperation) { - request.operationThreaded(threadedOperation); - return (RequestBuilder) this; - } - /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index bc4094acc54f4..9f30b7025e95b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -21,10 +21,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; -import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.*; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest.OpType; @@ -48,6 +45,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.support.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -85,6 +83,7 @@ public abstract class TransportReplicationAction T response() { class OperationTransportHandler implements TransportRequestHandler { @Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - // if we have a local operation, execute it on a thread since we don't spawn - request.operationThreaded(true); execute(request, new ActionListener() { @Override public void onResponse(Response result) { @@ -242,10 +241,179 @@ public void messageReceived(final ReplicaRequest request, final TransportChannel } } + class PrimaryOperationTransportHandler implements TransportRequestHandler { + + @Override + public void messageReceived(Request request, final TransportChannel channel) throws Exception { + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (Throwable t) { + onFailure(t); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Throwable t) { + logger.warn("failed to send response for a write request", t); + } + } + }; + new AsyncPrimaryAction(request, actionListener).run(); + } + } + + class AsyncPrimaryAction extends AbstractRunnable { + + private final ClusterStateObserver observer; + private final ActionListener listener; + private final InternalRequest internalRequest; + private final Request request; + private final AtomicBoolean finished = new AtomicBoolean(false); + private volatile Releasable indexShardReference; + + public AsyncPrimaryAction(Request request, ActionListener listener) { + this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger); + this.listener = listener; + this.request = request; + this.internalRequest = new InternalRequest(request); + this.internalRequest.concreteIndex(request.internalShardRouting.getIndex()); + } + + @Override + public void onFailure(Throwable t) { + finishAsFailed(t); + } + + @Override + protected void doRun() throws Exception { + final ShardRouting primary = request.internalShardRouting; + // Although this gets executed locally, this more of an assertion, but if change the primary action + // to be performed remotely this check is important to check before performing the action: + if (clusterService.localNode().id().equals(primary.currentNodeId()) == false) { + throw new NoShardAvailableActionException(primary.shardId(), "shard [{}] not assigned to this node [{}], but node [{}]", primary.shardId(), clusterService.localNode().id(), primary.currentNodeId()); + } + final ShardIterator shardIt = shards(observer.observedState(), internalRequest); + performOnPrimary(primary, shardIt); + } + + /** + * perform the operation on the node holding the primary + */ + void performOnPrimary(ShardRouting primary, final ShardIterator shardsIt) { + final String writeConsistencyFailure = checkWriteConsistency(primary); + if (writeConsistencyFailure != null) { + throw new UnavailableShardsException(primary.shardId(), writeConsistencyFailure); + } + final ReplicationPhase replicationPhase; + try { + indexShardReference = getIndexShardOperationsCounter(primary.shardId()); + PrimaryOperationRequest por = new PrimaryOperationRequest(primary.shardId(), request); + Tuple primaryResponse = shardOperationOnPrimary(observer.observedState(), por); + logger.trace("operation completed on primary [{}]", primary); + replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference); + } catch (Throwable e) { + if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { + if (logger.isTraceEnabled()) { + logger.trace(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); + } + } + Releasables.close(indexShardReference); + indexShardReference = null; + onFailure(e); + return; + } + finishAndMoveToReplication(replicationPhase); + } + + /** + * checks whether we can perform a write based on the write consistency setting + * returns **null* if OK to proceed, or a string describing the reason to stop + */ + String checkWriteConsistency(ShardRouting shard) { + if (checkWriteConsistency == false) { + return null; + } + + final WriteConsistencyLevel consistencyLevel; + if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) { + consistencyLevel = internalRequest.request().consistencyLevel(); + } else { + consistencyLevel = defaultWriteConsistencyLevel; + } + final int sizeActive; + final int requiredNumber; + IndexRoutingTable indexRoutingTable = observer.observedState().getRoutingTable().index(shard.index()); + if (indexRoutingTable != null) { + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId()); + if (shardRoutingTable != null) { + sizeActive = shardRoutingTable.activeShards().size(); + if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { + // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) + requiredNumber = (shardRoutingTable.getSize() / 2) + 1; + } else if (consistencyLevel == WriteConsistencyLevel.ALL) { + requiredNumber = shardRoutingTable.getSize(); + } else { + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + + if (sizeActive < requiredNumber) { + String message = LoggerMessageFormat.format( + "not enough active copies to meet write consistency of [{}] (have [{}], needed [{}])", + consistencyLevel, sizeActive, requiredNumber + ); + logger.trace(message + ", scheduling a retry"); + return message; + } else { + return null; + } + } + + void finishAsFailed(Throwable failure) { + if (finished.compareAndSet(false, true)) { + Releasables.close(indexShardReference); + indexShardReference = null; + logger.trace("operation failed", failure); + listener.onFailure(failure); + } else { + assert false : "finishAsFailed called but operation is already finished"; + } + } + + /** + * upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} + */ + void finishAndMoveToReplication(ReplicationPhase replicationPhase) { + if (finished.compareAndSet(false, true)) { + replicationPhase.run(); + } else { + assert false : "finishAndMoveToReplication called but operation is already finished"; + } + } + + } + public static class RetryOnReplicaException extends ElasticsearchException { - public RetryOnReplicaException(ShardId shardId, String msg) { - super(msg); + public RetryOnReplicaException(ShardId shardId, String msg, Object... args) { + super(msg, args); setShard(shardId); } @@ -274,7 +442,23 @@ public void onFailure(Throwable t) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - threadPool.executor(executor).execute(AsyncReplicaAction.this); + // TODO: maybe we should have dedicated method for local executions? (we can overload this one, which doesn't have DiscoverNode parameter) + transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + + @Override + public void handleResponse(TransportResponse.Empty response) { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (IOException e) { + logger.error("failed to send response", e); + } + } + + @Override + public void handleException(TransportException exp) { + responseWithFailure(exp); + } + }); } @Override @@ -289,9 +473,9 @@ public void onTimeout(TimeValue timeout) { }); } else { try { - failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t); + failReplicaIfNeeded(request.internalShardRouting.getIndex(), request.internalShardRouting.id(), t); } catch (Throwable unexpected) { - logger.error("{} unexpected error while failing replica", request.internalShardId.id(), unexpected); + logger.error("{} unexpected error while failing replica", request.internalShardRouting.id(), unexpected); } finally { responseWithFailure(t); } @@ -309,8 +493,8 @@ protected void responseWithFailure(Throwable t) { @Override protected void doRun() throws Exception { - try (Releasable shardReference = getIndexShardOperationsCounter(request.internalShardId)) { - shardOperationOnReplica(request.internalShardId, request); + try (Releasable shardReference = getIndexShardOperationsCounter(request.internalShardRouting.shardId())) { + shardOperationOnReplica(request.internalShardRouting.shardId(), request); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -320,15 +504,16 @@ protected class PrimaryOperationRequest { public final ShardId shardId; public final Request request; - public PrimaryOperationRequest(int shardId, String index, Request request) { - this.shardId = new ShardId(index, shardId); + public PrimaryOperationRequest(ShardId shardId, Request request) { + this.shardId = shardId; this.request = request; } } public static class RetryOnPrimaryException extends ElasticsearchException { - public RetryOnPrimaryException(ShardId shardId, String msg) { - super(msg); + + public RetryOnPrimaryException(ShardId shardId, String msg, Object... args) { + super(msg, args); setShard(shardId); } @@ -348,8 +533,6 @@ final class PrimaryPhase extends AbstractRunnable { private final InternalRequest internalRequest; private final ClusterStateObserver observer; private final AtomicBoolean finished = new AtomicBoolean(false); - private volatile Releasable indexShardReference; - PrimaryPhase(Request request, ActionListener listener) { this.internalRequest = new InternalRequest(request); @@ -382,7 +565,7 @@ protected void doRun() { retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); return; } - routeRequestOrPerformLocally(primary, shardIt); + routeRequestOrPerformPrimaryActionLocally(primary); } /** @@ -437,29 +620,55 @@ protected ShardRouting resolvePrimary(ShardIterator shardIt) { /** * send the request to the node holding the primary or execute if local */ - protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) { + protected void routeRequestOrPerformPrimaryActionLocally(final ShardRouting primary) { + DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId()); if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { - try { - if (internalRequest.request().operationThreaded()) { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - public void onFailure(Throwable t) { - finishAsFailed(t); - } + Request request = internalRequest.request(); + request.internalShardRouting = primary; + // this call is always local, but in the future we can send to remote nodes as well + transportService.sendRequest(node, transportPrimaryAction, request, transportOptions, new BaseTransportResponseHandler() { - @Override - protected void doRun() throws Exception { - performOnPrimary(primary, shardsIt); + @Override + public Response newInstance() { + return newResponseInstance(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(Response response) { + finishOnRemoteSuccess(response); + } + + @Override + public void handleException(TransportException exp) { + try { + Throwable cause = exp.getCause(); + // if we got disconnected from the node, or the node / shard is not in the right state (being closed) + if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || + cause instanceof UnavailableShardsException || retryPrimaryException(cause)) { + internalRequest.request().setCanHaveDuplicates(); + // we already marked it as started when we executed it (removed the listener) so pass false + // to re-add to the cluster listener + logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); + if (cause instanceof UnavailableShardsException) { + UnavailableShardsException use = (UnavailableShardsException) cause; + retryBecauseUnavailable(use.getShardId(), use.getMessage()); + } else { + retry(exp); + } + } else { + finishAsFailed(exp); } - }); - } else { - performOnPrimary(primary, shardsIt); + } catch (Throwable t) { + finishWithUnexpectedFailure(t); + } } - } catch (Throwable t) { - finishAsFailed(t); - } + }); } else { - DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId()); transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler() { @Override @@ -506,9 +715,6 @@ void retry(Throwable failure) { finishAsFailed(failure); return; } - // make it threaded operation so we fork on the discovery listener thread - internalRequest.request().operationThreaded(true); - observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { @@ -528,21 +734,8 @@ public void onTimeout(TimeValue timeout) { }); } - /** - * upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} - */ - void finishAndMoveToReplication(ReplicationPhase replicationPhase) { - if (finished.compareAndSet(false, true)) { - replicationPhase.run(); - } else { - assert false : "finishAndMoveToReplication called but operation is already finished"; - } - } - - void finishAsFailed(Throwable failure) { if (finished.compareAndSet(false, true)) { - Releasables.close(indexShardReference); logger.trace("operation failed", failure); listener.onFailure(failure); } else { @@ -553,7 +746,6 @@ void finishAsFailed(Throwable failure) { void finishWithUnexpectedFailure(Throwable failure) { logger.warn("unexpected error during the primary phase for action [{}]", failure, actionName); if (finished.compareAndSet(false, true)) { - Releasables.close(indexShardReference); listener.onFailure(failure); } else { assert false : "finishWithUnexpectedFailure called but operation is already finished"; @@ -569,99 +761,6 @@ void finishOnRemoteSuccess(Response response) { } } - /** - * perform the operation on the node holding the primary - */ - void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) { - final String writeConsistencyFailure = checkWriteConsistency(primary); - if (writeConsistencyFailure != null) { - retryBecauseUnavailable(primary.shardId(), writeConsistencyFailure); - return; - } - final ReplicationPhase replicationPhase; - try { - indexShardReference = getIndexShardOperationsCounter(primary.shardId()); - PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request()); - Tuple primaryResponse = shardOperationOnPrimary(observer.observedState(), por); - logger.trace("operation completed on primary [{}]", primary); - replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference); - } catch (Throwable e) { - internalRequest.request.setCanHaveDuplicates(); - // shard has not been allocated yet, retry it here - if (retryPrimaryException(e)) { - logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage()); - // We have to close here because when we retry we will increment get a new reference on index shard again and we do not want to - // increment twice. - Releasables.close(indexShardReference); - // We have to reset to null here because whe we retry it might be that we never get to the point where we assign a new reference - // (for example, in case the operation was rejected because queue is full). In this case we would release again once one of the finish methods is called. - indexShardReference = null; - retry(e); - return; - } - if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { - if (logger.isTraceEnabled()) { - logger.trace(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); - } - } - finishAsFailed(e); - return; - } - finishAndMoveToReplication(replicationPhase); - } - - /** - * checks whether we can perform a write based on the write consistency setting - * returns **null* if OK to proceed, or a string describing the reason to stop - */ - String checkWriteConsistency(ShardRouting shard) { - if (checkWriteConsistency == false) { - return null; - } - - final WriteConsistencyLevel consistencyLevel; - if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) { - consistencyLevel = internalRequest.request().consistencyLevel(); - } else { - consistencyLevel = defaultWriteConsistencyLevel; - } - final int sizeActive; - final int requiredNumber; - IndexRoutingTable indexRoutingTable = observer.observedState().getRoutingTable().index(shard.index()); - if (indexRoutingTable != null) { - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId()); - if (shardRoutingTable != null) { - sizeActive = shardRoutingTable.activeShards().size(); - if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { - // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) - requiredNumber = (shardRoutingTable.getSize() / 2) + 1; - } else if (consistencyLevel == WriteConsistencyLevel.ALL) { - requiredNumber = shardRoutingTable.getSize(); - } else { - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - - if (sizeActive < requiredNumber) { - logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.", - shard.shardId(), consistencyLevel, sizeActive, requiredNumber); - return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ")."; - } else { - return null; - } - } - void retryBecauseUnavailable(ShardId shardId, String message) { retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() + "], request: " + internalRequest.request().toString())); } @@ -881,68 +980,25 @@ void performOnReplica(final ShardRouting shard, final String nodeId) { return; } - replicaRequest.internalShardId = shardIt.shardId(); - - if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { - final DiscoveryNode node = observer.observedState().nodes().get(nodeId); - transportService.sendRequest(node, transportReplicaAction, replicaRequest, - transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty vResponse) { - onReplicaSuccess(); - } - - @Override - public void handleException(TransportException exp) { - onReplicaFailure(nodeId, exp); - logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest); - if (ignoreReplicaException(exp) == false) { - logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node); - shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp); - } - } - - }); - } else { - if (replicaRequest.operationThreaded()) { - try { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - protected void doRun() { - try { - shardOperationOnReplica(shard.shardId(), replicaRequest); - onReplicaSuccess(); - } catch (Throwable e) { - onReplicaFailure(nodeId, e); - failReplicaIfNeeded(shard.index(), shard.id(), e); - } - } - - // we must never reject on because of thread pool capacity on replicas - @Override - public boolean isForceExecution() { - return true; - } + replicaRequest.internalShardRouting = shard; + final DiscoveryNode node = observer.observedState().nodes().get(nodeId); + transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty vResponse) { + onReplicaSuccess(); + } - @Override - public void onFailure(Throwable t) { - onReplicaFailure(nodeId, t); - } - }); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - onReplicaFailure(nodeId, e); - } - } else { - try { - shardOperationOnReplica(shard.shardId(), replicaRequest); - onReplicaSuccess(); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - onReplicaFailure(nodeId, e); + @Override + public void handleException(TransportException exp) { + onReplicaFailure(nodeId, exp); + logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest); + if (ignoreReplicaException(exp) == false) { + logger.warn("{} failed to perform {} on node {}", exp, shardIt.shardId(), actionName, node); + shardStateAction.shardFailed(shard, indexMetaData.getIndexUUID(), "failed to perform " + actionName + " on replica on node " + node, exp); } } - } + + }); } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index c9401ca5392ec..fba2f23852e15 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -131,7 +131,6 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) { .routing(request.routing()) .parent(request.parent()) .consistencyLevel(request.consistencyLevel()); - indexRequest.operationThreaded(false); if (request.versionType() != VersionType.INTERNAL) { // in all but the internal versioning mode, we want to create the new document using the given version. indexRequest.version(request.version()).versionType(request.versionType()); @@ -227,13 +226,11 @@ protected Result prepare(UpdateRequest request, final GetResult getResult) { .consistencyLevel(request.consistencyLevel()) .timestamp(timestamp).ttl(ttl) .refresh(request.refresh()); - indexRequest.operationThreaded(false); return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType); } else if ("delete".equals(operation)) { DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .version(updateVersion).versionType(request.versionType()) .consistencyLevel(request.consistencyLevel()); - deleteRequest.operationThreaded(false); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 69f06cef1f129..209ab686ce5b3 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -50,9 +50,6 @@ public RestDeleteAction(Settings settings, RestController controller, Client cli @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id")); - - deleteRequest.operationThreaded(true); - deleteRequest.routing(request.param("routing")); deleteRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index 3d3ecdfa880ed..d0d0fe68a13c1 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -70,7 +70,6 @@ public void handleRequest(RestRequest request, RestChannel channel, final Client @Override public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); - indexRequest.operationThreaded(true); indexRequest.routing(request.param("routing")); indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing indexRequest.timestamp(request.param("timestamp")); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b70589ce52d9a..c5ee29de64d20 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -77,7 +77,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { } }); - private final TransportService.Adapter adapter; + protected final TransportService.Adapter adapter; // tracer log diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 10b344933348f..e3df5792dc4e5 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -194,7 +194,7 @@ public void testAnalyze() { @Test public void testIndex() { - String[] indexShardActions = new String[]{IndexAction.NAME, IndexAction.NAME + "[r]"}; + String[] indexShardActions = new String[]{IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; interceptTransportActions(indexShardActions); IndexRequest indexRequest = new IndexRequest(randomIndexOrAlias(), "type", "id").source("field", "value"); @@ -206,7 +206,7 @@ public void testIndex() { @Test public void testDelete() { - String[] deleteShardActions = new String[]{DeleteAction.NAME, DeleteAction.NAME + "[r]"}; + String[] deleteShardActions = new String[]{DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"}; interceptTransportActions(deleteShardActions); DeleteRequest deleteRequest = new DeleteRequest(randomIndexOrAlias(), "type", "id"); @@ -219,7 +219,7 @@ public void testDelete() { @Test public void testUpdate() { //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -235,7 +235,7 @@ public void testUpdate() { @Test public void testUpdateUpsert() { //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[p]", IndexAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -250,7 +250,7 @@ public void testUpdateUpsert() { @Test public void testUpdateDelete() { //update action goes to the primary, delete op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[p]", DeleteAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -265,7 +265,7 @@ public void testUpdateDelete() { @Test public void testBulk() { - String[] bulkShardActions = new String[]{BulkAction.NAME + "[s]", BulkAction.NAME + "[s][r]"}; + String[] bulkShardActions = new String[]{BulkAction.NAME + "[s][p]", BulkAction.NAME + "[s][r]"}; interceptTransportActions(bulkShardActions); List indices = new ArrayList<>(); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java index d08840b077184..9ea557630b7c5 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationTests.java @@ -58,10 +58,7 @@ import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -103,7 +100,48 @@ public void setUp() throws Exception { super.setUp(); transport = new CapturingTransport(); clusterService = new TestClusterService(threadPool); - transportService = new TransportService(transport, threadPool); + transportService = new TransportService(transport, threadPool) { + + @Override + @SuppressWarnings("unchecked") + public void sendRequest(DiscoveryNode node, final String action, TransportRequest request, TransportRequestOptions options, final TransportResponseHandler handler) { + if (action.endsWith("[p]")) { + try { + adapter.getRequestHandler(action).getHandler().messageReceived(request, new TransportChannel() { + @Override + public String action() { + return action; + } + + @Override + public String getProfileName() { + return "default"; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + handler.handleResponse((T) response); + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + sendResponse(response); + } + + @Override + public void sendResponse(Throwable error) throws IOException { + handler.handleException(new TransportException("error", error)); + } + }); + } catch (Exception e) { + handler.handleException(new TransportException("error", e)); + } + } else { + super.sendRequest(node, action, request, options, handler); + } + } + + }; transportService.start(); action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); count.set(1); @@ -126,7 +164,8 @@ void assertListenerThrows(String msg, PlainActionFuture listener, Class listener = new PlainActionFuture<>(); ClusterBlocks.Builder block = ClusterBlocks.builder() @@ -140,13 +179,13 @@ public void testBlocks() throws ExecutionException, InterruptedException { .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); listener = new PlainActionFuture<>(); - primaryPhase = action.new PrimaryPhase(new Request().timeout("5ms"), listener); + primaryPhase = action.new PrimaryPhase(new Request(shardId).timeout("5ms"), listener); assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks()); assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); listener = new PlainActionFuture<>(); - primaryPhase = action.new PrimaryPhase(new Request(), listener); + primaryPhase = action.new PrimaryPhase(new Request(shardId), listener); assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks()); assertFalse("primary phase should wait on retryable block", listener.isDone()); @@ -304,7 +343,7 @@ public void testRoutingToPrimary() { TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); assertTrue(primaryPhase.checkBlocks()); - primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt()); + primaryPhase.routeRequestOrPerformPrimaryActionLocally(shardRoutingTable.primaryShard()); if (primaryNodeId.equals(clusterService.localNode().id())) { logger.info("--> primary is assigned locally, testing for execution"); assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get()); @@ -333,6 +372,7 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti final int totalShards = 1 + assignedReplicas + unassignedReplicas; final boolean passesWriteConsistency; Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); + request.internalShardRouting = ShardRouting.newUnassigned(index, 0, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); switch (request.consistencyLevel()) { case ONE: passesWriteConsistency = true; @@ -368,8 +408,9 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti PlainActionFuture listener = new PlainActionFuture<>(); TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + TransportReplicationAction.AsyncPrimaryAction primaryAction = action.new AsyncPrimaryAction(request, listener); if (passesWriteConsistency) { - assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); + assertThat(primaryAction.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); primaryPhase.run(); assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get()); if (assignedReplicas > 0) { @@ -378,7 +419,7 @@ public void testWriteConsistency() throws ExecutionException, InterruptedExcepti assertIndexShardCounter(1); } } else { - assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue()); + assertThat(primaryAction.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue()); primaryPhase.run(); assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get()); assertIndexShardUninitialized(); @@ -445,7 +486,7 @@ protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int as final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final ShardIterator shardIt = shardRoutingTable.shardsIt(); final ShardId shardId = shardIt.shardId(); - final Request request = new Request(); + final Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); @@ -589,18 +630,20 @@ public void testReplicasCounter() throws Exception { Thread t = new Thread() { public void run() { try { - replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel()); + Request r = new Request(shardId); + replicaOperationTransportHandler.messageReceived(r, createTransportChannel()); } catch (Exception e) { + logger.error("Error while handler request", e); } } }; t.start(); // shard operation should be ongoing, so the counter is at 2 // we have to wait here because increment happens in thread - awaitBusy(new Predicate() { + assertBusy(new Runnable() { @Override - public boolean apply(@Nullable Object input) { - return count.get() == 2; + public void run() { + assertThat(count.get(), equalTo(2)); } }); ((ActionWithDelay) action).countDownLatch.countDown(); @@ -656,31 +699,26 @@ public void close() { } static class Request extends ReplicationRequest { - int shardId; public AtomicBoolean processedOnPrimary = new AtomicBoolean(); public AtomicInteger processedOnReplicas = new AtomicInteger(); Request() { - this.operationThreaded(randomBoolean()); } Request(ShardId shardId) { - this(); - this.shardId = shardId.id(); this.index(shardId.index().name()); + this.internalShardRouting = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), "_id", true, ShardRoutingState.STARTED, 1); // keep things simple } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(shardId); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardId = in.readVInt(); } } @@ -716,7 +754,7 @@ protected void shardOperationOnReplica(ShardId shardId, Request request) { @Override protected ShardIterator shards(ClusterState clusterState, InternalRequest request) { - return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().shardId).shardsIt(); + return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().internalShardRouting.getId()).shardsIt(); } @Override @@ -784,7 +822,7 @@ private Tuple throwException(ShardId shardId) { @Override protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) { - throwException(shardRequest.internalShardId); + throwException(shardRequest.internalShardRouting.shardId()); } } diff --git a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java b/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java index 50637cb801aa1..bb2fd17b0de36 100644 --- a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java +++ b/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java @@ -57,7 +57,7 @@ public void testWriteConsistencyLevelReplication2() throws Exception { fail("can't index, does not match consistency"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); + assertThat(e.getMessage(), equalTo("[test][0] not enough active copies to meet write consistency of [QUORUM] (have [1], needed [2]) Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); // but really, all is well } @@ -80,7 +80,7 @@ public void testWriteConsistencyLevelReplication2() throws Exception { fail("can't index, does not match consistency"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); + assertThat(e.getMessage(), equalTo("[test][0] not enough active copies to meet write consistency of [ALL] (have [2], needed [3]) Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}")); // but really, all is well } diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index 898415152ca78..35d54ca8e0296 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -426,12 +426,12 @@ public boolean apply(Object o) { * state processing when a recover starts and only unblocking it shortly after the node receives * the ShardActiveRequest. */ - static class ReclocationStartEndTracer extends MockTransportService.Tracer { + public static class ReclocationStartEndTracer extends MockTransportService.Tracer { private final ESLogger logger; private final CountDownLatch beginRelocationLatch; private final CountDownLatch receivedShardExistsRequestLatch; - ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { + public ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { this.logger = logger; this.beginRelocationLatch = beginRelocationLatch; this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; diff --git a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java index 6a55fbd2577b3..aa652dca1e228 100644 --- a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -255,6 +254,10 @@ public void close() throws ElasticsearchException { throw new UnsupportedOperationException(); } + public Collection getListeners() { + return listeners; + } + class NotifyTimeout implements Runnable { final TimeoutClusterStateListener listener; final TimeValue timeout; diff --git a/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java b/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java index 476b89aa1a903..3ae2b5f38fda5 100644 --- a/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.test.transport; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; @@ -33,7 +32,8 @@ /** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ public class CapturingTransport implements Transport { - private TransportServiceAdapter adapter; + + protected TransportServiceAdapter adapter; static public class CapturedRequest { final public DiscoveryNode node;