diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index be757a02c3982..88d0da8ab8f7f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -86,14 +86,18 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, } @Override - protected PrimaryResult shardOperationOnPrimary(final ShardRequest shardRequest, - final IndexShard primary) throws Exception { - executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, + ActionListener> listener) { + try { + executeShardOperation(shardRequest, primary); + listener.onResponse(new PrimaryResult<>(shardRequest, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception { + protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) { executeShardOperation(shardRequest, replica); return new ReplicaResult(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 344a817fa8b83..921a5003c36b6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -51,11 +52,15 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest, - IndexShard primary) { - primary.flush(shardRequest.getRequest()); - logger.trace("{} flush request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary, + ActionListener> listener) { + try { + primary.flush(shardRequest.getRequest()); + logger.trace("{} flush request executed on primary", primary.shardId()); + listener.onResponse(new PrimaryResult<>(shardRequest, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 0b5975cf025af..6acd7f2fdf367 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -53,10 +54,15 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) { - primary.refresh("api"); - logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary, + ActionListener> listener) { + try { + primary.refresh("api"); + logger.trace("{} refresh request executed on primary", primary.shardId()); + listener.onResponse(new PrimaryResult(shardRequest, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java index 1f228b0f355e0..5a38f0f43e070 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.shard.ShardId; @@ -27,6 +28,6 @@ public interface MappingUpdatePerformer { /** * Update the mappings on the master. */ - void updateMappings(Mapping update, ShardId shardId, String type); + void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f182c2985815d..52b94816b4dd5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -30,7 +31,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -44,7 +44,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -57,7 +56,6 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -71,6 +69,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -114,157 +113,162 @@ protected boolean resolveIndex() { } @Override - protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) - throws Exception { + protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary, + ActionListener> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); - CheckedRunnable waitForMappingUpdate = () -> { - PlainActionFuture waitingFuture = new PlainActionFuture<>(); - observer.waitForNextChange(new ClusterStateObserver.Listener() { + final Executor exec = threadPool.executor(ThreadPool.Names.WRITE); + performOnPrimary(request, primary, updateHelper, threadPool::relativeTimeInMillis, + (update, shardId, type, mappingListener) -> { + assert update != null; + assert shardId != null; + // can throw timeout exception when updating mappings or ISE for attempting to + // update default mappings which are bubbled up + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + }, + mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - waitingFuture.onResponse(null); + mappingUpdateListener.onResponse(null); } @Override public void onClusterServiceClose() { - waitingFuture.onFailure(new NodeClosedException(clusterService.localNode())); + mappingUpdateListener.onFailure(new NodeClosedException(clusterService.localNode())); } @Override public void onTimeout(TimeValue timeout) { - waitingFuture.onFailure( + mappingUpdateListener.onFailure( new MapperException("timed out while waiting for a dynamic mapping update")); } - }); - waitingFuture.get(); - }; - return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, - new ConcreteMappingUpdatePerformer(), waitForMappingUpdate); + }), listener, exec::execute + ); } - public static WritePrimaryResult performOnPrimary( + public static void performOnPrimary( BulkShardRequest request, IndexShard primary, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, - CheckedRunnable waitForMappingUpdate) throws Exception { + Consumer> waitForMappingUpdate, + ActionListener> listener, + Consumer executor) { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); - return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); - } - - private static WritePrimaryResult performOnPrimary( - BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { - - while (context.hasMoreOperationsToExecute()) { - executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); - assert context.isInitial(); // either completed and moved to next or reset - } - return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), - null, context.getPrimary(), logger); + new AsyncBulkItemLoop( + context, executor, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate, listener).onResponse(false); } /** Executes bulk item requests and handles request execution exceptions */ - static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) - throws Exception { - final DocWriteRequest.OpType opType = context.getCurrent().opType(); - - final UpdateHelper.Result updateResult; - if (opType == DocWriteRequest.OpType.UPDATE) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - try { - updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); - } catch (Exception failure) { - // we may fail translating a update to index or delete operation - // we use index result to communicate failure while translating update request - final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); - context.setRequestToExecute(updateRequest); - context.markOperationAsExecuted(result); - context.markAsCompleted(context.getExecutionResult()); - return; - } - // execute translated update request - switch (updateResult.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = updateResult.action(); - IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData(); - MappingMetaData mappingMd = metaData.mappingOrDefault(); - indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); - context.setRequestToExecute(indexRequest); - break; - case DELETED: - context.setRequestToExecute(updateResult.action()); - break; - case NOOP: - context.markOperationAsNoOp(updateResult.action()); + static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, Consumer> waitForMappingUpdate, + ActionListener itemDoneListener) { + try { + final DocWriteRequest.OpType opType = context.getCurrent().opType(); + + final UpdateHelper.Result updateResult; + if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + try { + updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); + } catch (Exception failure) { + // we may fail translating a update to index or delete operation + // we use index result to communicate failure while translating update request + final Engine.Result result = + new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); + context.setRequestToExecute(updateRequest); + context.markOperationAsExecuted(result); context.markAsCompleted(context.getExecutionResult()); - return; - default: - throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); - } - } else { - context.setRequestToExecute(context.getCurrent()); - updateResult = null; - } - - assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - - if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { - executeDeleteRequestOnPrimary(context, mappingUpdater); - } else { - executeIndexRequestOnPrimary(context, mappingUpdater); - } - - if (context.requiresWaitingForMappingUpdate()) { - try { - waitForMappingUpdate.run(); - context.resetForExecutionForRetry(); - } catch (Exception e) { - context.failOnMappingUpdate(e); + return true; + } + // execute translated update request + switch (updateResult.getResponseResult()) { + case CREATED: + case UPDATED: + IndexRequest indexRequest = updateResult.action(); + IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData(); + MappingMetaData mappingMd = metaData.mappingOrDefault(); + indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); + context.setRequestToExecute(indexRequest); + break; + case DELETED: + context.setRequestToExecute(updateResult.action()); + break; + case NOOP: + context.markOperationAsNoOp(updateResult.action()); + context.markAsCompleted(context.getExecutionResult()); + return true; + default: + throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); + } + } else { + context.setRequestToExecute(context.getCurrent()); + updateResult = null; } - return; - } - assert context.isOperationExecuted(); + assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state + + final ActionListener mappingUpdatedListener = ActionListener.wrap( + result -> { + if (result == null) { + assert context.requiresWaitingForMappingUpdate(); + waitForMappingUpdate.accept( + new ActionListener() { + @Override + public void onResponse(Void aVoid) { + context.resetForExecutionForRetry(); + itemDoneListener.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + context.failOnMappingUpdate(e); + itemDoneListener.onResponse(true); + } + } + ); + } else { + itemDoneListener.onResponse(true); + } + }, e -> { + context.failOnMappingUpdate(e); + itemDoneListener.onResponse(true); + } + ); - if (opType == DocWriteRequest.OpType.UPDATE && - context.getExecutionResult().isFailed() && - isConflictException(context.getExecutionResult().getFailure().getCause())) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - if (context.getRetryCounter() < updateRequest.retryOnConflict()) { - context.resetForExecutionForRetry(); - return; + if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { + return executeDeleteRequestOnPrimary(context, mappingUpdater, mappingUpdatedListener, updateResult); + } else { + return executeIndexRequestOnPrimary(context, mappingUpdater, mappingUpdatedListener, updateResult); } + } catch (Exception e) { + itemDoneListener.onFailure(e); + return true; } - - finalizePrimaryOperationOnCompletion(context, opType, updateResult); } private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionContext context, DocWriteRequest.OpType opType, UpdateHelper.Result updateResult) { final BulkItemResponse executionResult = context.getExecutionResult(); + final BulkItemResponse response; if (opType == DocWriteRequest.OpType.UPDATE) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - context.markAsCompleted( - processUpdateResponse(updateRequest, context.getConcreteIndex(), executionResult, updateResult)); - } else if (executionResult.isFailed()) { - final Exception failure = executionResult.getFailure().getCause(); - final DocWriteRequest docWriteRequest = context.getCurrent(); - if (TransportShardBulkAction.isConflictException(failure)) { - logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); - } else { - logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); - } - - context.markAsCompleted(executionResult); + response = processUpdateResponse( + (UpdateRequest) context.getCurrent(), context.getConcreteIndex(), executionResult, updateResult); } else { - context.markAsCompleted(executionResult); + if (executionResult.isFailed()) { + final Exception failure = executionResult.getFailure().getCause(); + final DocWriteRequest docWriteRequest = context.getCurrent(); + if (TransportShardBulkAction.isConflictException(failure)) { + logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); + } else { + logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); + } + } + response = executionResult; } + context.markAsCompleted(response); assert context.isInitial(); } @@ -441,63 +445,128 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse } /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext context, - MappingUpdatePerformer mappingUpdater) throws Exception { + private static boolean executeIndexRequestOnPrimary(BulkPrimaryExecutionContext context, + MappingUpdatePerformer mappingUpdater, + ActionListener mappingUpdatedListener, UpdateHelper.Result updateResult) { final IndexRequest request = context.getRequestToExecute(); final IndexShard primary = context.getPrimary(); final SourceToParse sourceToParse = new SourceToParse(request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()); - executeOnPrimaryWhileHandlingMappingUpdates(context, + return executeOnPrimaryWhileHandlingMappingUpdates( () -> primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()), - e -> primary.getFailedIndexResult(e, request.version()), - context::markOperationAsExecuted, - mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); + e -> primary.getFailedIndexResult(e, request.version()), r -> onComplete(r, context, updateResult), + mappingUpdater, primary.shardId(), request, context, mappingUpdatedListener + ); } - private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context, - MappingUpdatePerformer mappingUpdater) throws Exception { + private static boolean executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context, + MappingUpdatePerformer mappingUpdater, + ActionListener mappingUpdatedListener, UpdateHelper.Result updateResult) { final DeleteRequest request = context.getRequestToExecute(); final IndexShard primary = context.getPrimary(); - executeOnPrimaryWhileHandlingMappingUpdates(context, + return executeOnPrimaryWhileHandlingMappingUpdates( () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()), - e -> primary.getFailedDeleteResult(e, request.version()), - context::markOperationAsExecuted, - mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); + e -> primary.getFailedDeleteResult(e, request.version()), r -> onComplete(r, context, updateResult), + mappingUpdater, primary.shardId(), request, context, mappingUpdatedListener + ); } - private static void executeOnPrimaryWhileHandlingMappingUpdates( - BulkPrimaryExecutionContext context, CheckedSupplier toExecute, - Function exceptionToResult, Consumer onComplete, Consumer mappingUpdater) - throws IOException { - T result = toExecute.get(); - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // try to update the mappings and mark the context as needing to try again. - try { - mappingUpdater.accept(result.getRequiredMappingUpdate()); - context.markAsRequiringMappingUpdate(); - } catch (Exception e) { - // failure to update the mapping should translate to a failure of specific requests. Other requests - // still need to be executed and replicated. - onComplete.accept(exceptionToResult.apply(e)); + private static , U extends Engine.Result> boolean executeOnPrimaryWhileHandlingMappingUpdates( + CheckedSupplier toExecute, Function exceptionToResult, Consumer onComplete, + MappingUpdatePerformer mappingUpdater, ShardId shardId, T request, BulkPrimaryExecutionContext context, + ActionListener mappingUpdatedListener) { + try { + U result = toExecute.get(); + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), shardId, request.type(), + new ActionListener() { + @Override + public void onResponse(Void aVoid) { + context.markAsRequiringMappingUpdate(); + mappingUpdatedListener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + final U res = exceptionToResult.apply(e); + onComplete.accept(res); + mappingUpdatedListener.onResponse(res); + } + }); + return false; + } else { + onComplete.accept(result); + return true; + } + } catch (Exception e) { + onComplete.accept(exceptionToResult.apply(e)); + return true; + } + } + + private static void onComplete(T r, BulkPrimaryExecutionContext context, UpdateHelper.Result updateResult) { + context.markOperationAsExecuted(r); + final DocWriteRequest.OpType opType = context.getCurrent().opType(); + if (opType == DocWriteRequest.OpType.UPDATE && context.getExecutionResult().isFailed() + && isConflictException(context.getExecutionResult().getFailure().getCause())) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + if (context.getRetryCounter() < updateRequest.retryOnConflict()) { + context.resetForExecutionForRetry(); return; } - } else { - onComplete.accept(result); } + finalizePrimaryOperationOnCompletion(context, opType, updateResult); } - class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { + private static final class AsyncBulkItemLoop implements ActionListener { + private final Consumer executor; + private final ActionListener> listener; + private final Runnable action; + + AsyncBulkItemLoop(BulkPrimaryExecutionContext context, Consumer executor, UpdateHelper updateHelper, + LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, Consumer> waitForMappingUpdate, + ActionListener> listener) { + this.executor = executor; + this.listener = listener; + action = () -> { + try { + while (context.hasMoreOperationsToExecute()) { + if (executeBulkItemRequest( + context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate, this) == false) { + // We are waiting for a mapping update on another thread, that will invoke this action again once its done + // so we just break out here. + return; + } + assert context.isInitial(); // either completed and moved to next or reset + } + listener.onResponse( + new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), + null, context.getPrimary(), logger)); + } catch (Exception e) { + onFailure(e); + } + }; + } + + @Override + public void onResponse(Boolean async) { + try { + if (async) { + executor.accept(action); + } else { + action.run(); + } + } catch (Exception e) { + onFailure(e); + } + } @Override - public void updateMappings(final Mapping update, final ShardId shardId, final String type) { - assert update != null; - assert shardId != null; - // can throw timeout exception when updating mappings or ISE for attempting to - // update default mappings which are bubbled up - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update); + public void onFailure(Exception e) { + listener.onFailure(e); } } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java index 892daae4bb275..03ee359b628b2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -70,27 +70,30 @@ protected void doExecute(Task task, final Request request, final ActionListener< } @Override - protected WritePrimaryResult shardOperationOnPrimary( - Request request, final IndexShard primary) throws Exception { + protected void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener) { BulkItemRequest[] itemRequests = new BulkItemRequest[1]; WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); - WritePrimaryResult bulkResult = - shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); - assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; - BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; - final Response response; - final Exception failure; - if (itemResponse.isFailed()) { - failure = itemResponse.getFailure().getCause(); - response = null; - } else { - response = (Response) itemResponse.getResponse(); - failure = null; - } - return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger); + shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary, + ActionListener.map(listener, bulkResult -> { + assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; + BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; + final Response response; + final Exception failure; + if (itemResponse.isFailed()) { + failure = itemResponse.getFailure().getCause(); + response = null; + } else { + response = (Response) itemResponse.getResponse(); + failure = null; + } + return new WritePrimaryResult<>( + request, response, + ((WritePrimaryResult) bulkResult).location, failure, primary, logger); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index e9a6e7b48152d..280066a11b92e 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -116,10 +116,14 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected WritePrimaryResult shardOperationOnPrimary( - ResyncReplicationRequest request, IndexShard primary) throws Exception { - final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary); - return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger); + protected void shardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary, + ActionListener> listener) { + try { + final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary); + listener.onResponse(new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger)); + } catch (Exception e) { + listener.onFailure(e); + } } public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index f001d9a29e29d..2244d0140603f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -99,33 +99,36 @@ public void execute() throws Exception { totalShards.incrementAndGet(); pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination - primaryResult = primary.perform(request); - primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); - final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); - if (replicaRequest != null) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); - } + primary.perform(request, ActionListener.wrap(primaryResult -> { + this.primaryResult = primaryResult; + primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); + final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); + if (replicaRequest != null) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + } - // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. - // we have to make sure that every operation indexed into the primary after recovery start will also be replicated - // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. - // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint - // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset - // of the sampled replication group, and advanced further than what the given replication group would allow it to. - // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. - final long globalCheckpoint = primary.globalCheckpoint(); - // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of - // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on. - final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); - assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; - final ReplicationGroup replicationGroup = primary.getReplicationGroup(); - markUnavailableShardsAsStale(replicaRequest, replicationGroup); - performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); - } + // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. + // we have to make sure that every operation indexed into the primary after recovery start will also be replicated + // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. + // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint + // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset + // of the sampled replication group, and advanced further than what the given replication group would allow it to. + // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. + final long globalCheckpoint = primary.globalCheckpoint(); + // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of + // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed + // on. + final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); + assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; + final ReplicationGroup replicationGroup = primary.getReplicationGroup(); + markUnavailableShardsAsStale(replicaRequest, replicationGroup); + performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); + } - successfulShards.incrementAndGet(); // mark primary as successful - decPendingAndFinishIfNeeded(); + successfulShards.incrementAndGet(); // mark primary as successful + decPendingAndFinishIfNeeded(); + }, resultListener::onFailure)); } private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) { @@ -297,9 +300,9 @@ public interface Primary< * also complete after. Deal with it. * * @param request the request to perform - * @return the request to send to the replicas + * @param listener result listener */ - PrimaryResultT perform(RequestT request) throws Exception; + void perform(RequestT request, ActionListener listener); /** * Notifies the primary of a local checkpoint for the given allocation. diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 326f7bacdb8f6..8e5a0606aef49 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -198,8 +199,8 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r * @param shardRequest the request to the primary shard * @param primary the primary shard to perform the operation on */ - protected abstract PrimaryResult shardOperationOnPrimary( - Request shardRequest, IndexShard primary) throws Exception; + protected abstract void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener); /** * Synchronously execute the specified replica operation. This is done under a permit from @@ -480,7 +481,7 @@ protected ReplicationOperation, + public static class PrimaryResult, Response extends ReplicationResponse> implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; @@ -1030,11 +1031,15 @@ public void failShard(String reason, Exception e) { } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); - assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() - + "] with a primary failure [" + result.finalFailure + "]"; - return result; + public void perform(Request request, ActionListener> listener) { + if (Assertions.ENABLED) { + listener = ActionListener.map(listener, result -> { + assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; + return result; + }); + } + shardOperationOnPrimary(request, indexShard, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index f44694f55d960..5bd7f004f598c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -103,12 +103,12 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { /** * Called on the primary with a reference to the primary {@linkplain IndexShard} to modify. * - * @return the result of the operation on primary, including current translog location and operation response and failure - * async refresh is performed on the primary shard according to the Request refresh policy + * @param listener listener for the result of the operation on primary, including current translog location and operation response + * and failure async refresh is performed on the primary shard according to the Request refresh policy */ @Override - protected abstract WritePrimaryResult shardOperationOnPrimary( - Request request, IndexShard primary) throws Exception; + protected abstract void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener); /** * Called once per replica with a reference to the replica {@linkplain IndexShard} to modify. diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 14c360168f904..c1895af0db254 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,7 +19,9 @@ package org.elasticsearch.cluster.action.index; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -29,6 +31,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; @@ -57,34 +60,50 @@ private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeou this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout; } - public void setClient(Client client) { this.client = client.admin().indices(); } - private PutMappingRequestBuilder updateMappingRequest(Index index, String type, Mapping mappingUpdate, final TimeValue timeout) { - if (type.equals(MapperService.DEFAULT_MAPPING)) { - throw new IllegalArgumentException("_default_ mapping should not be updated"); - } - return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) - .setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO); - } - - /** - * Same as {@link #updateMappingOnMaster(Index, String, Mapping, TimeValue)} - * using the default timeout. - */ - public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate) { - updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout); - } - /** * Update mappings on the master node, waiting for the change to be committed, * but not for the mapping update to be applied on all nodes. The timeout specified by * {@code timeout} is the master node timeout ({@link MasterNodeRequest#masterNodeTimeout()}), * potentially waiting for a master node to be available. */ - public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue masterNodeTimeout) { - updateMappingRequest(index, type, mappingUpdate, masterNodeTimeout).get(); + public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, ActionListener listener) { + if (type.equals(MapperService.DEFAULT_MAPPING)) { + throw new IllegalArgumentException("_default_ mapping should not be updated"); + } + client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) + .setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO) + .execute(new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(unwrapException(e)); + } + }); + } + + private static Exception unwrapException(Exception e) { + final Throwable cause = e.getCause(); + if (cause instanceof ElasticsearchException) { + ElasticsearchException esEx = (ElasticsearchException) cause; + Throwable root = esEx.unwrapCause(); + if (root instanceof ElasticsearchException) { + return (ElasticsearchException) root; + } else if (root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } else if (cause instanceof RuntimeException) { + return (RuntimeException) cause; + } else { + return e; + } } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java index ea02aebb0aaad..d4d15850771fb 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java @@ -76,4 +76,20 @@ public String toString() { } return sb.toString(); } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return paths.equals(((BlobPath) o).paths); + } + + @Override + public int hashCode() { + return paths.hashCode(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 9b55cff8cff9a..a3fb68934134a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -117,10 +117,14 @@ protected void sendReplicaRequest( } @Override - protected PrimaryResult shardOperationOnPrimary( - final Request request, final IndexShard indexShard) throws Exception { - maybeSyncTranslog(indexShard); - return new PrimaryResult<>(request, new ReplicationResponse()); + protected void shardOperationOnPrimary(Request request, IndexShard indexShard, + ActionListener> listener) { + try { + maybeSyncTranslog(indexShard); + listener.onResponse(new PrimaryResult<>(request, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index d454c2de75b28..4da562779bbf3 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -121,14 +121,18 @@ public void backgroundSync( } @Override - protected PrimaryResult shardOperationOnPrimary( + protected void shardOperationOnPrimary( final Request request, - final IndexShard primary) throws WriteStateException { - assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); - Objects.requireNonNull(request); - Objects.requireNonNull(primary); - primary.persistRetentionLeases(); - return new PrimaryResult<>(request, new ReplicationResponse()); + final IndexShard primary, ActionListener> listener) { + try { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.persistRetentionLeases(); + listener.onResponse(new PrimaryResult<>(request, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index d4845d92a3a6f..3aed373fbbf50 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -123,14 +123,17 @@ public void sync( } @Override - protected WritePrimaryResult shardOperationOnPrimary( - final Request request, - final IndexShard primary) throws WriteStateException { - assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); - Objects.requireNonNull(request); - Objects.requireNonNull(primary); - primary.persistRetentionLeases(); - return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); + protected void shardOperationOnPrimary(Request request, IndexShard primary, + ActionListener> listener) { + try { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.persistRetentionLeases(); + listener.onResponse(new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index 77bddbe215652..bef05ecda9fd8 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -110,7 +110,7 @@ public List getNamedXContents() { ); } - private Map getMappers(List mapperPlugins) { + public static Map getMappers(List mapperPlugins) { Map mappers = new LinkedHashMap<>(); // builtin mappers @@ -168,7 +168,7 @@ private static Map initBuiltInMetadataMa return Collections.unmodifiableMap(builtInMetadataMappers); } - private static Map getMetadataMappers(List mapperPlugins) { + public static Map getMetadataMappers(List mapperPlugins) { Map metadataMappers = new LinkedHashMap<>(); int i = 0; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 687b01680704e..050e682b0737f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.close; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; @@ -60,6 +61,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -133,18 +136,28 @@ public static void afterClass() { threadPool = null; } - private void executeOnPrimaryOrReplica() throws Exception { + private void executeOnPrimaryOrReplica() throws Throwable { final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong()); final TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId); - if (randomBoolean()) { - assertNotNull(action.shardOperationOnPrimary(request, indexShard)); - } else { - assertNotNull(action.shardOperationOnPrimary(request, indexShard)); + final CompletableFuture res = new CompletableFuture<>(); + action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap( + r -> { + assertNotNull(r); + res.complete(null); + }, + res::completeExceptionally + )); + try { + res.get(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } catch (ExecutionException e) { + throw e.getCause(); } } - public void testShardIsFlushed() throws Exception { + public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0])); @@ -171,7 +184,7 @@ public void testOperationFailsWithNoBlock() { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testVerifyShardBeforeIndexClosing() throws Exception { + public void testVerifyShardBeforeIndexClosing() throws Throwable { executeOnPrimaryOrReplica(); verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); @@ -271,8 +284,9 @@ public ReplicationGroup getReplicationGroup() { } @Override - public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception { - return new PrimaryResult(request); + public void perform( + TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener listener) { + listener.onResponse(new PrimaryResult(request)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 8fa8060b38536..da12116c90478 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -22,13 +22,16 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.TransportWriteAction.WritePrimaryResult; import org.elasticsearch.action.update.UpdateHelper; @@ -50,6 +53,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Assert; import java.io.IOException; import java.util.Collections; @@ -75,6 +80,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { + private static final ActionListener ASSERTING_DONE_LISTENER = + ActionTestUtils.assertNoFailureListener(Assert::assertTrue); + private final ShardId shardId = new ShardId("index", "_na_", 0); private final Settings idxSettings = Settings.builder() .put("index.number_of_shards", 1) @@ -146,10 +154,9 @@ public void testExecuteBulkIndexRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); - UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems @@ -174,8 +181,9 @@ public void testExecuteBulkIndexRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(secondContext, null, + threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), + listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); @@ -224,37 +232,44 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { final ElasticsearchStatusException rejectionCause = new ElasticsearchStatusException("testing rejection", rejectionStatus); rejectItem.abort("index", rejectionCause); - UpdateHelper updateHelper = null; - WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), - () -> {}); + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, null, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + listener -> {}, ActionListener.runAfter( + ActionTestUtils.assertNoFailureListener(result -> { + // since at least 1 item passed, the tran log location should exist, + assertThat(((WritePrimaryResult) result).location, notNullValue()); + // and the response should exist and match the item count + assertThat(result.finalResponseIfSuccessful, notNullValue()); + assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); + + // check each response matches the input item, including the rejection + for (int i = 0; i < items.length; i++) { + BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; + assertThat(response.getItemId(), equalTo(i)); + assertThat(response.getIndex(), equalTo("index")); + assertThat(response.getType(), equalTo("_doc")); + assertThat(response.getId(), equalTo("id_" + i)); + assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + if (response.getItemId() == rejectItem.id()) { + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); + assertThat(response.status(), equalTo(rejectionStatus)); + } else { + assertFalse(response.isFailed()); + } + } + + // Check that the non-rejected updates made it to the shard + try { + assertDocCount(shard, items.length - 1); + closeShards(shard); + } catch (IOException e) { + throw new AssertionError(e); + } + }), latch::countDown), r -> threadPool.executor(ThreadPool.Names.WRITE).execute(r)); - // since at least 1 item passed, the tran log location should exist, - assertThat(result.location, notNullValue()); - // and the response should exist and match the item count - assertThat(result.finalResponseIfSuccessful, notNullValue()); - assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); - - // check each response matches the input item, including the rejection - for (int i = 0; i < items.length; i++) { - BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; - assertThat(response.getItemId(), equalTo(i)); - assertThat(response.getIndex(), equalTo("index")); - assertThat(response.getType(), equalTo("_doc")); - assertThat(response.getId(), equalTo("id_" + i)); - assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); - if (response.getItemId() == rejectItem.id()) { - assertTrue(response.isFailed()); - assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); - assertThat(response.status(), equalTo(rejectionStatus)); - } else { - assertFalse(response.isFailed()); - } - } - - // Check that the non-rejected updates made it to the shard - assertDocCount(shard, items.length - 1); - closeShards(shard); + latch.await(); } public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { @@ -281,11 +296,12 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type) -> { + (update, shardId, type, listener) -> { // There should indeed be a mapping update assertNotNull(update); updateCalled.incrementAndGet(); - }, () -> {}); + listener.onResponse(null); + }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertTrue(context.isInitial()); assertTrue(context.hasMoreOperationsToExecute()); @@ -298,7 +314,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); + (update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {}, + ASSERTING_DONE_LISTENER); // Verify that the shard "executed" the operation only once (1 for previous invocations plus @@ -325,8 +342,6 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - UpdateHelper updateHelper = null; - // Return an exception when trying to update the mapping, or when waiting for it to come RuntimeException err = new RuntimeException("some kind of exception"); @@ -335,9 +350,23 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.executeBulkItemRequest( + context, null, threadPool::relativeTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), - errorOnWait ? () -> { throw err; } : () -> {}); + errorOnWait ? listener -> listener.onFailure(err) : listener -> listener.onResponse(null), + new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(Boolean async) { + assertTrue(async); + } + + @Override + public void onFailure(final Exception e) { + assertEquals(err, e); + } + }, latch)); + latch.await(); assertFalse(context.hasMoreOperationsToExecute()); // Translog shouldn't be synced, as there were conflicting mappings @@ -371,13 +400,12 @@ public void testExecuteBulkDeleteRequest() throws Exception { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Translog.Location location = new Translog.Location(0, 0, 0); - UpdateHelper updateHelper = null; randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, even though the document didn't exist @@ -418,8 +446,8 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, because the document was deleted @@ -475,7 +503,7 @@ public void testNoopUpdateRequest() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); @@ -521,7 +549,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Since this was not a conflict failure, the primary response @@ -571,7 +599,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); @@ -618,7 +646,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced @@ -664,7 +692,7 @@ public void testUpdateWithDelete() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced @@ -698,7 +726,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); @@ -731,7 +759,7 @@ public void testTranslogPositionToSync() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); while (context.hasMoreOperationsToExecute()) { TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); } assertTrue(shard.isSyncNeeded()); @@ -814,18 +842,23 @@ public void testRetries() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), - () -> {}); + listener -> listener.onResponse(null), + new LatchedActionListener<>( + ActionTestUtils.assertNoFailureListener(result -> { + assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation)); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getSeqNo(), equalTo(13L)); + }), latch), r -> threadPool.executor(ThreadPool.Names.WRITE).execute(r)); - assertThat(result.location, equalTo(resultLocation)); - BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); - assertThat(primaryResponse.getItemId(), equalTo(0)); - assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); - DocWriteResponse response = primaryResponse.getResponse(); - assertThat(response.status(), equalTo(RestStatus.CREATED)); - assertThat(response.getSeqNo(), equalTo(13L)); + latch.await(); } private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { @@ -875,7 +908,8 @@ public Translog.Location getTranslogLocation() { /** Doesn't perform any mapping updates */ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer { @Override - public void updateMappings(Mapping update, ShardId shardId, String type) { + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + listener.onResponse(null); } } @@ -888,8 +922,8 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { } @Override - public void updateMappings(Mapping update, ShardId shardId, String type) { - throw e; + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + listener.onFailure(e); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8fa10c4ee26d7..f3ac1a9513ab8 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -269,11 +269,12 @@ public void testAddedReplicaAfterPrimaryOperation() throws Exception { final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard(); final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) { @Override - public Result perform(Request request) throws Exception { - Result result = super.perform(request); - replicationGroup.set(updatedReplicationGroup); - logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); - return result; + public void perform(Request request, ActionListener listener) { + super.perform(request, ActionListener.map(listener, result -> { + replicationGroup.set(updatedReplicationGroup); + logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); + return result; + })); } }; @@ -467,11 +468,11 @@ public void failShard(String message, Exception exception) { } @Override - public Result perform(Request request) throws Exception { + public void perform(Request request, ActionListener listener) { if (request.processedOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - return new Result(request); + listener.onResponse(new Result(request)); } static class Result implements ReplicationOperation.PrimaryResult { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 110ab9bcb99a2..5451269b67098 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse; @@ -684,16 +685,16 @@ public void testPrimaryReference() throws Exception { }; TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(); - Request replicaRequest = (Request) primary.perform(request).replicaRequest; + primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> { + final ElasticsearchException exception = new ElasticsearchException("testing"); + primary.failShard("test", exception); - final ElasticsearchException exception = new ElasticsearchException("testing"); - primary.failShard("test", exception); + verify(shard).failShard("test", exception); - verify(shard).failShard("test", exception); + primary.close(); - primary.close(); - - assertTrue(closed.get()); + assertTrue(closed.get()); + })); } public void testReplicaProxy() throws InterruptedException, ExecutionException { @@ -1223,10 +1224,11 @@ protected TestResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new TestResponse()); + listener.onResponse(new PrimaryResult(shardRequest, new TestResponse())); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 1cb1bfde34ea8..1e6a4bf168b6f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -377,12 +377,13 @@ public String getActionName() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { executedOnPrimary.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the primary shard assertSame(primary, shard); - return new PrimaryResult<>(shardRequest, new Response()); + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); } @Override @@ -464,10 +465,11 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { assertNoBlocks("block must not exist when executing the operation on primary shard: it should have been blocked before"); assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - return super.shardOperationOnPrimary(shardRequest, shard); + super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override @@ -509,9 +511,10 @@ protected void acquireReplicaOperationPermit(IndexShard shard, Request request, } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - return super.shardOperationOnPrimary(shardRequest, shard); + super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 6e1ec3c76797d..06da3a2e68c77 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -138,14 +139,15 @@ public void testPrimaryNoRefreshCall() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNotNull(listener.response); - assertNull(listener.failure); - verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any()); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + verify(indexShard, never()).refresh(any()); + verify(indexShard, never()).addRefreshListener(any(), any()); + })); } public void testReplicaNoRefreshCall() throws Exception { @@ -166,15 +168,16 @@ public void testPrimaryImmediateRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNotNull(listener.response); - assertNull(listener.failure); - assertTrue(listener.response.forcedRefresh); - verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any()); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + assertTrue(listener.response.forcedRefresh); + verify(indexShard).refresh("refresh_flag_index"); + verify(indexShard, never()).addRefreshListener(any(), any()); + })); } public void testReplicaImmediateRefresh() throws Exception { @@ -196,23 +199,24 @@ public void testPrimaryWaitForRefresh() throws Exception { request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNull(listener.response); // Haven't reallresponded yet - - @SuppressWarnings({ "unchecked", "rawtypes" }) - ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); - verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), refreshListener.capture()); - - // Now we can fire the listener manually and we'll get a response - boolean forcedRefresh = randomBoolean(); - refreshListener.getValue().accept(forcedRefresh); - assertNotNull(listener.response); - assertNull(listener.failure); - assertEquals(forcedRefresh, listener.response.forcedRefresh); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNull(listener.response); // Haven't really responded yet + + @SuppressWarnings({"unchecked", "rawtypes"}) + ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); + verify(indexShard, never()).refresh(any()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + + // Now we can fire the listener manually and we'll get a response + boolean forcedRefresh = randomBoolean(); + refreshListener.getValue().accept(forcedRefresh); + assertNotNull(listener.response); + assertNull(listener.failure); + assertEquals(forcedRefresh, listener.response.forcedRefresh); + })); } public void testReplicaWaitForRefresh() throws Exception { @@ -238,12 +242,13 @@ public void testReplicaWaitForRefresh() throws Exception { public void testDocumentFailureInShardOperationOnPrimary() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(true, true); - TransportWriteAction.WritePrimaryResult writePrimaryResult = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - writePrimaryResult.respond(listener); - assertNull(listener.response); - assertNotNull(listener.failure); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(writePrimaryResult -> { + CapturingActionListener listener = new CapturingActionListener<>(); + writePrimaryResult.respond(listener); + assertNull(listener.response); + assertNotNull(listener.failure); + })); } public void testDocumentFailureInShardOperationOnReplica() throws Exception { @@ -431,15 +436,15 @@ protected TestResponse newResponseInstance() { } @Override - protected WritePrimaryResult shardOperationOnPrimary( - TestRequest request, IndexShard primary) throws Exception { + protected void shardOperationOnPrimary( + TestRequest request, IndexShard primary, ActionListener> listener) { final WritePrimaryResult primaryResult; if (withDocumentFailureOnPrimary) { primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger); } else { primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger); } - return primaryResult; + listener.onResponse(primaryResult); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 97fc9e9defaa9..cec3c05b28438 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -18,6 +18,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -113,7 +114,7 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { new IndexNameExpressionResolver()); final GlobalCheckpointSyncAction.Request primaryRequest = new GlobalCheckpointSyncAction.Request(indexShard.shardId()); if (randomBoolean()) { - action.shardOperationOnPrimary(primaryRequest, indexShard); + action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); } else { action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6ad7d5039ae8b..8d527b287eaac 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -21,8 +21,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -46,6 +47,7 @@ import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; @@ -66,6 +68,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { private TransportService transportService; private ShardStateAction shardStateAction; + @Override public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); @@ -83,6 +86,7 @@ public void setUp() throws Exception { shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); } + @Override public void tearDown() throws Exception { try { IOUtils.close(transportService, clusterService, transport); @@ -92,7 +96,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateException { + public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws InterruptedException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -119,12 +123,19 @@ public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateE final RetentionLeaseBackgroundSyncAction.Request request = new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); - final ReplicationOperation.PrimaryResult result = - action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be persisted - verify(indexShard).persistRetentionLeases(); - // we should forward the request containing the current retention leases to the replica - assertThat(result.replicaRequest(), sameInstance(request)); + final CountDownLatch latch = new CountDownLatch(1); + action.shardOperationOnPrimary(request, indexShard, + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + // the retention leases on the shard should be persisted + try { + verify(indexShard).persistRetentionLeases(); + } catch (WriteStateException e) { + throw new AssertionError(e); + } + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + }), latch)); + latch.await(); } public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateException { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 9b9ad6a0962c1..fa7e50b5fd01e 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -90,7 +91,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException { + public void testRetentionLeaseSyncActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -115,15 +116,20 @@ public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException { new IndexNameExpressionResolver()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - - final TransportWriteAction.WritePrimaryResult result = - action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be persisted - verify(indexShard).persistRetentionLeases(); - // we should forward the request containing the current retention leases to the replica - assertThat(result.replicaRequest(), sameInstance(request)); - // we should start with an empty replication response - assertNull(result.finalResponseIfSuccessful.getShardInfo()); + action.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + // the retention leases on the shard should be persisted + try { + verify(indexShard).persistRetentionLeases(); + } catch (WriteStateException e) { + throw new AssertionError(e); + } + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + // we should start with an empty replication response + assertNull(result.finalResponseIfSuccessful.getShardInfo()); + } + )); } public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { @@ -156,7 +162,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); - // the retention leases on the shard should be persisteed + // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ece8bbd7194e9..2a6deba869570 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -34,18 +34,42 @@ import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchExecutionStatsCollector; +import org.elasticsearch.action.search.SearchPhaseController; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTransportService; +import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterModule; @@ -54,6 +78,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; @@ -68,6 +93,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; +import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -100,7 +127,9 @@ import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -109,6 +138,8 @@ import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; @@ -116,6 +147,9 @@ import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -138,11 +172,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -165,9 +200,20 @@ public class SnapshotResiliencyTests extends ESTestCase { private Path tempDir; + /** + * Whether to use eventually consistent blob store in tests. + */ + private boolean eventuallyConsistent; + + private MockEventuallyConsistentRepository.Context blobStoreContext; + @Before public void createServices() { tempDir = createTempDir(); + eventuallyConsistent = randomBoolean(); + if (eventuallyConsistent) { + blobStoreContext = new MockEventuallyConsistentRepository.Context(); + } deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random()); } @@ -185,10 +231,12 @@ public void testSuccessfulSnapshot() { final String index = "test"; final int shards = randomIntBetween(1, 10); - + final int documents = randomIntBetween(0, 100); TestClusterNode masterNode = testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final AtomicBoolean createdSnapshot = new AtomicBoolean(); + final AtomicBoolean snapshotRestored = new AtomicBoolean(); + final AtomicBoolean documentCountVerified = new AtomicBoolean(); masterNode.client.admin().cluster().preparePutRepository(repoName) .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) .execute( @@ -197,12 +245,56 @@ public void testSuccessfulSnapshot() { new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) .settings(defaultIndexSettings(shards)), assertNoFailureListener( - () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .execute(assertNoFailureListener(() -> createdSnapshot.set(true))))))); - - deterministicTaskQueue.runAllRunnableTasks(); - + () -> { + final Runnable afterIndexing = () -> + masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(assertNoFailureListener(() -> { + createdSnapshot.set(true); + masterNode.client.admin().indices().delete( + new DeleteIndexRequest(index), + assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), + ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> { + snapshotRestored.set(true); + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + masterNode.client.search( + new SearchRequest(index).source( + new SearchSourceBuilder().size(0).trackTotalHits(true) + ), + ActionTestUtils.assertNoFailureListener(r -> { + assertEquals( + (long) documents, + Objects.requireNonNull(r.getHits().getTotalHits()).value + ); + documentCountVerified.set(true); + })); + }) + ))); + })); + final AtomicInteger countdown = new AtomicInteger(documents); + for (int i = 0; i < documents; ++i) { + masterNode.client.bulk( + new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + ActionTestUtils.assertNoFailureListener( + bulkResponse -> { + assertFalse( + "Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), + bulkResponse.hasFailures()); + if (countdown.decrementAndGet() == 0) { + afterIndexing.run(); + } + })); + } + if (documents == 0) { + afterIndexing.run(); + } + })))); + runUntil( + () -> createdSnapshot.get() && snapshotRestored.get() && documentCountVerified.get(), TimeUnit.MINUTES.toMillis(5L)); assertTrue(createdSnapshot.get()); + assertTrue(snapshotRestored.get()); + assertTrue(documentCountVerified.get()); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); @@ -357,7 +449,7 @@ public void testSnapshotPrimaryRelocations() { new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) .settings(defaultIndexSettings(shards)), assertNoFailureListener( - () -> masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener( + () -> masterAdminClient.cluster().state(new ClusterStateRequest(), ActionTestUtils.assertNoFailureListener( clusterStateResponse -> { final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); @@ -368,34 +460,38 @@ public void testSnapshotPrimaryRelocations() { final Runnable maybeForceAllocate = new Runnable() { @Override public void run() { - masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener( - resp -> { - final ShardRouting shardRouting = resp.getState().routingTable() - .shardRoutingTable(shardToRelocate.shardId()).primaryShard(); - if (shardRouting.unassigned() - && shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) { - if (masterNodeCount > 1) { - scheduleNow(() -> testClusterNodes.stopNode(masterNode)); + masterAdminClient.cluster().state(new ClusterStateRequest(), + ActionTestUtils.assertNoFailureListener( + resp -> { + final ShardRouting shardRouting = resp.getState().routingTable() + .shardRoutingTable(shardToRelocate.shardId()).primaryShard(); + if (shardRouting.unassigned() && + shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) { + if (masterNodeCount > 1) { + scheduleNow(() -> testClusterNodes.stopNode(masterNode)); + } + testClusterNodes.randomDataNodeSafe().client.admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .execute(ActionListener.wrap(() -> { + testClusterNodes.randomDataNodeSafe().client.admin().cluster() + .deleteSnapshot( + new DeleteSnapshotRequest( + repoName, snapshotName), noopListener()); + createdSnapshot.set(true); + })); + scheduleNow( + () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster() + .reroute( + new ClusterRerouteRequest().add( + new AllocateEmptyPrimaryAllocationCommand( + index, shardRouting.shardId().id(), + otherNode.node.getName(), true)), + noopListener())); + } else { + scheduleSoon(this); } - testClusterNodes.randomDataNodeSafe().client.admin().cluster() - .prepareCreateSnapshot(repoName, snapshotName) - .execute(ActionListener.wrap(() -> { - testClusterNodes.randomDataNodeSafe().client.admin().cluster() - .deleteSnapshot( - new DeleteSnapshotRequest(repoName, snapshotName), noopListener()); - createdSnapshot.set(true); - })); - scheduleNow( - () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute( - new ClusterRerouteRequest().add( - new AllocateEmptyPrimaryAllocationCommand( - index, shardRouting.shardId().id(), otherNode.node.getName(), true) - ), noopListener())); - } else { - scheduleSoon(this); } - } - )); + )); } }; scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode)); @@ -511,20 +607,6 @@ private static Settings defaultIndexSettings(int shards) { .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0).build(); } - private static ActionListener assertNoFailureListener(Consumer consumer) { - return new ActionListener() { - @Override - public void onResponse(final T t) { - consumer.accept(t); - } - - @Override - public void onFailure(final Exception e) { - throw new AssertionError(e); - } - }; - } - private static ActionListener assertNoFailureListener(Runnable r) { return new ActionListener() { @Override @@ -803,19 +885,7 @@ public void onFailure(final Exception e) { final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); repositoriesService = new RepositoriesService( settings, clusterService, transportService, - Collections.singletonMap(FsRepository.TYPE, metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { - @Override - protected void assertSnapshotOrGenericThread() { - // eliminate thread name check as we create repo in the test thread - } - }; - repository.start(); - return repository; - } - ), - emptyMap(), - threadPool + Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool ); snapshotsService = new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool); @@ -826,6 +896,11 @@ protected void assertSnapshotOrGenericThread() { allocationService = ESAllocationTestCase.createAllocationService(settings); final IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS); + final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test"); + final MapperRegistry mapperRegistry = new MapperRegistry( + IndicesModule.getMappers(Collections.emptyList()), + IndicesModule.getMetadataMappers(Collections.emptyList()), MapperPlugin.NOOP_FIELD_FILTER + ); indicesService = new IndicesService( settings, mock(PluginsService.class), @@ -834,12 +909,12 @@ protected void assertSnapshotOrGenericThread() { new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap()), indexNameExpressionResolver, - new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER), + mapperRegistry, namedWriteableRegistry, threadPool, indexScopedSettings, new NoneCircuitBreakerService(), - new BigArrays(new PageCacheRecycler(settings), null, "test"), + bigArrays, scriptService, client, new MetaStateService(nodeEnv, namedXContentRegistry), @@ -856,6 +931,7 @@ protected void assertSnapshotOrGenericThread() { new RoutingService(clusterService, allocationService), threadPool ); + final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService); indicesClusterStateService = new IndicesClusterStateService( settings, indicesService, @@ -863,7 +939,7 @@ protected void assertSnapshotOrGenericThread() { threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), shardStateAction, - new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)), + new NodeMappingRefreshAction(transportService, metaDataMappingService), repositoriesService, mock(SearchService.class), new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), @@ -908,14 +984,61 @@ protected void assertSnapshotOrGenericThread() { actionFilters, indexNameExpressionResolver)); Map actions = new HashMap<>(); + final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService, + indicesService, + allocationService, new AliasValidator(), environment, indexScopedSettings, + threadPool, namedXContentRegistry, false); actions.put(CreateIndexAction.INSTANCE, new TransportCreateIndexAction( transportService, clusterService, threadPool, - new MetaDataCreateIndexService(settings, clusterService, indicesService, - allocationService, new AliasValidator(), environment, indexScopedSettings, - threadPool, namedXContentRegistry, false), + metaDataCreateIndexService, actionFilters, indexNameExpressionResolver )); + final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings); + mappingUpdatedAction.setClient(client); + final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, + clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), + actionFilters, indexNameExpressionResolver); + actions.put(BulkAction.INSTANCE, + new TransportBulkAction(threadPool, transportService, clusterService, + new IngestService( + clusterService, threadPool, environment, scriptService, + new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), + Collections.emptyList()), + transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) + )); + final RestoreService restoreService = new RestoreService( + clusterService, repositoriesService, allocationService, + metaDataCreateIndexService, + new MetaDataIndexUpgradeService( + settings, namedXContentRegistry, + mapperRegistry, + indexScopedSettings, + Collections.emptyList() + ), + clusterSettings + ); + actions.put(PutMappingAction.INSTANCE, + new TransportPutMappingAction(transportService, clusterService, threadPool, metaDataMappingService, + actionFilters, indexNameExpressionResolver, new TransportPutMappingAction.RequestValidators(Collections.emptyList()))); + final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); + final SearchTransportService searchTransportService = new SearchTransportService(transportService, + SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); + final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService, + bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService); + actions.put(SearchAction.INSTANCE, + new TransportSearchAction(threadPool, transportService, searchService, + searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService, + actionFilters, indexNameExpressionResolver)); + actions.put(RestoreSnapshotAction.INSTANCE, + new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters, + indexNameExpressionResolver)); + actions.put(DeleteIndexAction.INSTANCE, + new TransportDeleteIndexAction( + transportService, clusterService, threadPool, + new MetaDataDeleteIndexService(settings, clusterService, allocationService), actionFilters, + indexNameExpressionResolver, new DestructiveOperations(settings, clusterSettings))); actions.put(PutRepositoryAction.INSTANCE, new TransportPutRepositoryAction( transportService, clusterService, repositoriesService, threadPool, @@ -946,6 +1069,29 @@ allocationService, new AliasValidator(), environment, indexScopedSettings, client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); } + private Repository.Factory getRepoFactory(final Environment environment) { + // Run half the tests with the eventually consistent repository + if (eventuallyConsistent) { + return metaData -> { + final Repository repository = new MockEventuallyConsistentRepository( + metaData, environment, xContentRegistry(), deterministicTaskQueue, blobStoreContext); + repository.start(); + return repository; + }; + } else { + return metaData -> { + final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } + }; + repository.start(); + return repository; + }; + } + } + public void restart() { testClusterNodes.disconnectNode(this); final ClusterState oldState = this.clusterService.state(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java new file mode 100644 index 0000000000000..cae43039ada44 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots.mockstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Mock Repository that simulates the mechanics of an eventually consistent blobstore. + */ +public class MockEventuallyConsistentRepository extends FsRepository { + private static final Logger logger = LogManager.getLogger(MockEventuallyConsistentRepository.class); + + private final DeterministicTaskQueue deterministicTaskQueue; + + private final Context context; + + public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, + NamedXContentRegistry namedXContentRegistry, DeterministicTaskQueue deterministicTaskQueue, Context context) { + super(overrideSettings(metadata, environment), environment, namedXContentRegistry); + this.deterministicTaskQueue = deterministicTaskQueue; + this.context = context; + } + + @Override + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { + super.initializeSnapshot(snapshotId, indices, clusterMetadata); + } + + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } + + private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { + // TODO: use another method of testing not being able to read the test file written by the master... + // this is super duper hacky + if (metadata.settings().getAsBoolean("localize_location", false)) { + Path location = PathUtils.get(metadata.settings().get("location")); + location = location.resolve(Integer.toString(environment.hashCode())); + return new RepositoryMetaData(metadata.name(), metadata.type(), + Settings.builder().put(metadata.settings()).put("location", location.toAbsolutePath()).build()); + } else { + return metadata; + } + } + + @Override + protected void doStop() { + super.doStop(); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + return new MockBlobStore(super.createBlobStore()); + } + + public static final class Context { + + private final Map, Map>> state = new HashMap<>(); + + public Tuple, Map> getState(BlobPath path) { + return state.computeIfAbsent(path, p -> new Tuple<>(new HashSet<>(), new HashMap<>())); + } + + } + + private class MockBlobStore extends BlobStoreWrapper { + + MockBlobStore(BlobStore delegate) { + super(delegate); + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + return new MockBlobContainer(super.blobContainer(path), context.getState(path)); + } + + private class MockBlobContainer extends BlobContainerWrapper { + + private final Set cachedMisses; + + private final Map pendingWrites; + + MockBlobContainer(BlobContainer delegate, Tuple, Map> state) { + super(delegate); + cachedMisses = state.v1(); + pendingWrites = state.v2(); + } + + @Override + public boolean blobExists(String blobName) { + ensureReadAfterWrite(blobName); + final boolean result = super.blobExists(blobName); + if (result == false) { + cachedMisses.add(blobName); + } + return result; + } + + @Override + public InputStream readBlob(String name) throws IOException { + ensureReadAfterWrite(name); + return super.readBlob(name); + } + + private void ensureReadAfterWrite(String blobName) { + if (cachedMisses.contains(blobName) == false && pendingWrites.containsKey(blobName)) { + pendingWrites.remove(blobName).run(); + } + } + + @Override + public void deleteBlob(String blobName) throws IOException { + super.deleteBlob(blobName); + } + + @Override + public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { + super.deleteBlobIgnoringIfNotExists(blobName); + } + + @Override + public Map listBlobs() throws IOException { + return super.listBlobs(); + } + + @Override + public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { + return super.listBlobsByPrefix(blobNamePrefix); + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Streams.copy(inputStream, baos); + pendingWrites.put(blobName, () -> { + try { + super.writeBlob(blobName, new ByteArrayInputStream(baos.toByteArray()), blobSize, failIfAlreadyExists); + if (cachedMisses.contains(blobName)) { + deterministicTaskQueue.scheduleNow(() -> cachedMisses.remove(blobName)); + } + } catch (NoSuchFileException | FileAlreadyExistsException e) { + // Ignoring, assuming a previous concurrent delete removed the parent path and that overwrites are not + // detectable with this kind of store + } catch (IOException e) { + throw new AssertionError(e); + } + }); + deterministicTaskQueue.scheduleNow(() -> { + if (pendingWrites.containsKey(blobName)) { + pendingWrites.remove(blobName).run(); + } + }); + } + + @Override + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + final boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + } + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index cdc76292be06a..6da7b4474ddee 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -19,9 +19,12 @@ package org.elasticsearch.action.support; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import java.util.function.Consumer; + import static org.elasticsearch.action.support.PlainActionFuture.newFuture; public class ActionTestUtils { @@ -34,4 +37,18 @@ Response executeBlocking(TransportAction action, Request requ action.execute(request, future); return future.actionGet(); } + + public static ActionListener assertNoFailureListener(Consumer consumer) { + return new ActionListener() { + @Override + public void onResponse(T t) { + consumer.accept(t); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d20d57aed64a9..317688e03fd8c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; @@ -90,6 +91,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -192,14 +194,15 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { sync(shardId, retentionLeases, ActionListener.wrap( r -> { }, e -> { - throw new AssertionError("failed to backgroun sync retention lease", e); + throw new AssertionError("failed to background sync retention lease", e); })); } }; protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); + primary = newShard( + primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); replicas = new CopyOnWriteArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -255,9 +258,8 @@ public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception { private BulkItemResponse executeWriteRequest( DocWriteRequest writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), - listener::onFailure); + final ActionListener wrapBulkListener = + ActionListener.map(listener, bulkShardResponse -> bulkShardResponse.getResponses()[0]); BulkItemRequest[] items = new BulkItemRequest[1]; items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items); @@ -310,8 +312,7 @@ public IndexShard addReplica() throws IOException { } public synchronized void addReplica(IndexShard replica) throws IOException { - assert shardRoutings().stream() - .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : + assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())) == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); if (replicationTargets != null) { @@ -534,10 +535,8 @@ private synchronized ReplicationTargets getReplicationTargets() { } protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener listener) { - RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(shardId, leases); - ActionListener wrappedListener = ActionListener.wrap( - r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); - new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute(); + new SyncRetentionLeases(new RetentionLeaseSyncAction.Request(shardId, leases), this, + ActionListener.map(listener, r -> new ReplicationResponse())).execute(); } public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, @@ -612,17 +611,8 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - new ActionListener() { - @Override - public void onResponse(PrimaryResult result) { - result.respond(listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, new ReplicasRef(), logger, opType).execute(); + ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType + ).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -632,7 +622,7 @@ IndexShard getPrimaryShard() { return replicationTargets.primary; } - protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; + protected abstract void performOnPrimary(IndexShard primary, Request request, ActionListener listener); protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; @@ -649,8 +639,8 @@ public void failShard(String message, Exception exception) { } @Override - public PrimaryResult perform(Request request) throws Exception { - return performOnPrimary(getPrimaryShard(), request); + public void perform(Request request, ActionListener listener) { + performOnPrimary(getPrimaryShard(), request, listener); } @Override @@ -767,10 +757,9 @@ class WriteReplicationAction extends ReplicationAction - result = executeShardBulkOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + protected void performOnPrimary(IndexShard primary, BulkShardRequest request, ActionListener listener) { + executeShardBulkOnPrimary(primary, request, + ActionListener.map(listener, result -> new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful))); } @Override @@ -780,8 +769,8 @@ protected void performOnReplica(BulkShardRequest request, IndexShard replica) th } } - private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary( - IndexShard primary, BulkShardRequest request) throws Exception { + private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request, + ActionListener> listener) { for (BulkItemRequest itemRequest : request.items()) { if (itemRequest.request() instanceof IndexRequest) { ((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName()); @@ -789,21 +778,38 @@ private TransportWriteAction.WritePrimaryResult permitAcquiredFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); - final TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { - MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, - null); + MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {}; + TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + null, ActionTestUtils.assertNoFailureListener(result -> { + TransportWriteActionTestHelper.performPostWriteActions(primary, request, + ((TransportWriteAction.WritePrimaryResult) result).location, logger); + listener.onResponse((TransportWriteAction.WritePrimaryResult) result); + }), + r -> threadPool.executor(ThreadPool.Names.WRITE).execute(r)); + } catch (Exception e) { + listener.onFailure(e); } - TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); - return result; } - private - BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception { + private BulkShardRequest executeReplicationRequestOnPrimary( + IndexShard primary, Request request) throws Exception { final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), new BulkItemRequest[]{new BulkItemRequest(0, request)}); - return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest(); + final CompletableFuture res = new CompletableFuture<>(); + executeShardBulkOnPrimary( + primary, bulkShardRequest, new ActionListener>() { + @Override + public void onResponse(TransportWriteAction.WritePrimaryResult result) { + res.complete(result.replicaRequest()); + } + + @Override + public void onFailure(Exception e) { + res.completeExceptionally(e); + } + }); + return res.get(); } private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, @@ -866,10 +872,14 @@ class GlobalCheckpointSync extends ReplicationAction< } @Override - protected PrimaryResult performOnPrimary( - final IndexShard primary, final GlobalCheckpointSyncAction.Request request) throws Exception { - primary.sync(); - return new PrimaryResult(request, new ReplicationResponse()); + protected void performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.Request request, + ActionListener listener) { + try { + primary.sync(); + listener.onResponse(new PrimaryResult(request, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override @@ -885,10 +895,14 @@ class ResyncAction extends ReplicationAction result = - executeResyncOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + protected void performOnPrimary(IndexShard primary, ResyncReplicationRequest request, ActionListener listener) { + try { + final TransportWriteAction.WritePrimaryResult result = + executeResyncOnPrimary(primary, request); + listener.onResponse(new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful)); + } catch (Exception e) { + listener.onFailure(e); + } } @Override @@ -899,7 +913,7 @@ protected void performOnReplica(ResyncReplicationRequest request, IndexShard rep } private TransportWriteAction.WritePrimaryResult executeResyncOnPrimary( - IndexShard primary, ResyncReplicationRequest request) throws Exception { + IndexShard primary, ResyncReplicationRequest request) { final TransportWriteAction.WritePrimaryResult result = new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request, primary), new ResyncReplicationResponse(), null, null, primary, logger); @@ -928,9 +942,14 @@ class SyncRetentionLeases extends ReplicationAction< } @Override - protected PrimaryResult performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request) throws Exception { - primary.persistRetentionLeases(); - return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); + protected void performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request, + ActionListener listener) { + try { + primary.persistRetentionLeases(); + listener.onResponse(new PrimaryResult(request, new RetentionLeaseSyncAction.Response())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 7c3b2da32e88f..2e4ae81fff326 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -61,13 +61,17 @@ public TransportBulkShardOperationsAction( } @Override - protected WritePrimaryResult shardOperationOnPrimary( - final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { + protected void shardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, + ActionListener> listener) { if (logger.isTraceEnabled()) { logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry()); } - return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(), - request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); + try { + listener.onResponse(shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(), + request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger)); + } catch (Exception e) { + listener.onFailure(e); + } } public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index dd495ee5ca587..505077e19c165 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -641,20 +641,25 @@ class CcrAction extends ReplicationAction listener) { final PlainActionFuture permitFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request); final TransportWriteAction.WritePrimaryResult ccrResult; - try (Releasable ignored = permitFuture.get()) { - ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), - request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); - } - return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { - @Override - public void respond(ActionListener listener) { - ccrResult.respond(listener); + try { + try (Releasable ignored = permitFuture.get()) { + ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), + request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } - }; + listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { + @Override + public void respond(ActionListener listener) { + ccrResult.respond(listener); + } + }); + } catch (Exception e) { + listener.onFailure(e); + } } @Override