Skip to content

Commit 31567ce

Browse files
authored
[RCI] Check blocks while having index shard permit in TransportReplicationAction (#35332)
Today, the TransportReplicationAction checks the global level blocks and the index level blocks before routing the operation to the primary, in the ReroutePhase, and it happens at the very beginning of the transport replication action execution. For the upcoming rework of the Close Index API and in order to deal with primary relocation, we'll need to also check for blocks before executing the operation on the primary (while holding a permit) but before routing to the new primary. This pull request change the AsyncPrimaryAction so that it checks for replication action's blocks before executing the operation locally or before routing the primary action to the newly primary shard. The check is done while holding a PrimaryShardReference. Related to #33888
1 parent 5c84708 commit 31567ce

File tree

2 files changed

+234
-136
lines changed

2 files changed

+234
-136
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 75 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,39 @@ protected TransportRequestOptions transportOptions(Settings settings) {
235235
return TransportRequestOptions.EMPTY;
236236
}
237237

238+
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
239+
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
240+
}
241+
242+
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
243+
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
244+
if (globalBlockLevel != null) {
245+
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
246+
if (blockException != null) {
247+
return blockException;
248+
}
249+
}
250+
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
251+
if (indexBlockLevel != null) {
252+
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
253+
if (blockException != null) {
254+
return blockException;
255+
}
256+
}
257+
return null;
258+
}
259+
238260
protected boolean retryPrimaryException(final Throwable e) {
239261
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
240-
|| TransportActions.isShardNotAvailableException(e);
262+
|| TransportActions.isShardNotAvailableException(e)
263+
|| isRetryableClusterBlockException(e);
264+
}
265+
266+
boolean isRetryableClusterBlockException(final Throwable e) {
267+
if (e instanceof ClusterBlockException) {
268+
return ((ClusterBlockException) e).retryable();
269+
}
270+
return false;
241271
}
242272

243273
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
@@ -310,6 +340,15 @@ protected void doRun() throws Exception {
310340
@Override
311341
public void onResponse(PrimaryShardReference primaryShardReference) {
312342
try {
343+
final ClusterState clusterState = clusterService.state();
344+
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
345+
346+
final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
347+
if (blockException != null) {
348+
logger.trace("cluster is blocked, action failed on primary", blockException);
349+
throw blockException;
350+
}
351+
313352
if (primaryShardReference.isRelocated()) {
314353
primaryShardReference.close(); // release shard operation lock as soon as possible
315354
setPhase(replicationTask, "primary_delegation");
@@ -323,7 +362,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
323362
response.readFrom(in);
324363
return response;
325364
};
326-
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
365+
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
327366
transportService.sendRequest(relocatingNode, transportPrimaryAction,
328367
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
329368
transportOptions,
@@ -696,35 +735,42 @@ public void onFailure(Exception e) {
696735
protected void doRun() {
697736
setPhase(task, "routing");
698737
final ClusterState state = observer.setAndGetObservedState();
699-
if (handleBlockExceptions(state)) {
700-
return;
701-
}
702-
703-
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
704-
final String concreteIndex = concreteIndex(state);
705-
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
706-
if (indexMetaData == null) {
707-
retry(new IndexNotFoundException(concreteIndex));
708-
return;
709-
}
710-
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
711-
throw new IndexClosedException(indexMetaData.getIndex());
712-
}
738+
final String concreteIndex = concreteIndex(state, request);
739+
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
740+
if (blockException != null) {
741+
if (blockException.retryable()) {
742+
logger.trace("cluster is blocked, scheduling a retry", blockException);
743+
retry(blockException);
744+
} else {
745+
finishAsFailed(blockException);
746+
}
747+
} else {
748+
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
749+
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
750+
if (indexMetaData == null) {
751+
retry(new IndexNotFoundException(concreteIndex));
752+
return;
753+
}
754+
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
755+
throw new IndexClosedException(indexMetaData.getIndex());
756+
}
713757

714-
// resolve all derived request fields, so we can route and apply it
715-
resolveRequest(indexMetaData, request);
716-
assert request.shardId() != null : "request shardId must be set in resolveRequest";
717-
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
758+
// resolve all derived request fields, so we can route and apply it
759+
resolveRequest(indexMetaData, request);
760+
assert request.shardId() != null : "request shardId must be set in resolveRequest";
761+
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
762+
"request waitForActiveShards must be set in resolveRequest";
718763

719-
final ShardRouting primary = primary(state);
720-
if (retryIfUnavailable(state, primary)) {
721-
return;
722-
}
723-
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
724-
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
725-
performLocalAction(state, primary, node, indexMetaData);
726-
} else {
727-
performRemoteAction(state, primary, node);
764+
final ShardRouting primary = primary(state);
765+
if (retryIfUnavailable(state, primary)) {
766+
return;
767+
}
768+
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
769+
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
770+
performLocalAction(state, primary, node, indexMetaData);
771+
} else {
772+
performRemoteAction(state, primary, node);
773+
}
728774
}
729775
}
730776

@@ -776,44 +822,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
776822
return false;
777823
}
778824

779-
private String concreteIndex(ClusterState state) {
780-
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
781-
}
782-
783825
private ShardRouting primary(ClusterState state) {
784826
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
785827
return indexShard.primaryShard();
786828
}
787829

788-
private boolean handleBlockExceptions(ClusterState state) {
789-
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
790-
if (globalBlockLevel != null) {
791-
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
792-
if (blockException != null) {
793-
handleBlockException(blockException);
794-
return true;
795-
}
796-
}
797-
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
798-
if (indexBlockLevel != null) {
799-
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
800-
if (blockException != null) {
801-
handleBlockException(blockException);
802-
return true;
803-
}
804-
}
805-
return false;
806-
}
807-
808-
private void handleBlockException(ClusterBlockException blockException) {
809-
if (blockException.retryable()) {
810-
logger.trace("cluster is blocked, scheduling a retry", blockException);
811-
retry(blockException);
812-
} else {
813-
finishAsFailed(blockException);
814-
}
815-
}
816-
817830
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
818831
final TransportRequest requestToPerform) {
819832
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

0 commit comments

Comments
 (0)