From 78ce193c5a42d32bfb3214b36c6b622702b13072 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Jan 2022 10:59:48 +0100 Subject: [PATCH 1/4] Make Shard Started Response Handling only Return after the CS Update Completes Somewhat lazy solution by copying the approach from the failed handler 1:1 for now. Added a todo to clean up this thing. closes #81628 --- .../action/shard/ShardStateAction.java | 62 +++++++++++++++++-- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 23f8f7976cfc3..030348220c9a5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -273,6 +273,7 @@ public void onTimeout(TimeValue timeout) { }, changePredicate); } + // TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code private static class ShardFailedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; @@ -594,6 +595,7 @@ public void shardStarted( ); } + // TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code private static class ShardStartedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor; @@ -614,15 +616,63 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel, request, ClusterStateTaskConfig.build(Priority.URGENT), shardStartedClusterStateTaskExecutor, - e -> { - if (e instanceof FailedToCommitClusterStateException || e instanceof NotMasterException) { - logger.debug(new ParameterizedMessage("failure during [shard-started {}]", request), e); - } else { - logger.error(new ParameterizedMessage("unexpected failure during [shard-started {}]", request), e); + new ClusterStateTaskListener() { + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", request.shardId, request), + e + ); + try { + channel.sendResponse(e); + } catch (Exception channelException) { + channelException.addSuppressed(e); + logger.warn( + () -> new ParameterizedMessage( + "{} failed to send failure [{}] while starting shard [{}]", + request.shardId, + e, + request + ), + channelException + ); + } + } + + @Override + public void onNoLongerMaster() { + logger.error("{} no longer master while starting shard [{}]", request.shardId, request); + try { + channel.sendResponse(new NotMasterException("shard-started")); + } catch (Exception channelException) { + logger.warn( + () -> new ParameterizedMessage( + "{} failed to send no longer master while starting shard [{}]", + request.shardId, + request + ), + channelException + ); + } + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (Exception channelException) { + logger.warn( + () -> new ParameterizedMessage( + "{} failed to send response while starting shard [{}]", + request.shardId, + request + ), + channelException + ); + } } } ); - channel.sendResponse(TransportResponse.Empty.INSTANCE); } } From 58ce4f190c8f132d866d042b99aa6e9f3a17d7e7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Jan 2022 12:00:56 +0100 Subject: [PATCH 2/4] nicer --- .../action/shard/ShardStateAction.java | 51 ++++--------------- 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 030348220c9a5..eebf47ccc6ba7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ResultDeduplicator; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStatePublicationEvent; @@ -321,7 +322,7 @@ public void onFailure(Exception e) { @Override public void onNoLongerMaster() { - logger.error("{} no longer master while failing shard [{}]", request.shardId, request); + logger.debug("{} no longer master while failing shard [{}]", request.shardId, request); try { channel.sendResponse(new NotMasterException(TASK_SOURCE)); } catch (Exception channelException) { @@ -611,6 +612,11 @@ private static class ShardStartedTransportHandler implements TransportRequestHan @Override public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) throws Exception { logger.debug("{} received shard started for [{}]", request.shardId, request); + final ChannelActionListener listener = new ChannelActionListener<>( + channel, + SHARD_STARTED_ACTION_NAME, + request + ); clusterService.submitStateUpdateTask( "shard-started " + request, request, @@ -623,53 +629,18 @@ public void onFailure(Exception e) { () -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", request.shardId, request), e ); - try { - channel.sendResponse(e); - } catch (Exception channelException) { - channelException.addSuppressed(e); - logger.warn( - () -> new ParameterizedMessage( - "{} failed to send failure [{}] while starting shard [{}]", - request.shardId, - e, - request - ), - channelException - ); - } + listener.onFailure(e); } @Override public void onNoLongerMaster() { - logger.error("{} no longer master while starting shard [{}]", request.shardId, request); - try { - channel.sendResponse(new NotMasterException("shard-started")); - } catch (Exception channelException) { - logger.warn( - () -> new ParameterizedMessage( - "{} failed to send no longer master while starting shard [{}]", - request.shardId, - request - ), - channelException - ); - } + logger.debug("{} no longer master while starting shard [{}]", request.shardId, request); + listener.onFailure(new NotMasterException("shard-started")); } @Override public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (Exception channelException) { - logger.warn( - () -> new ParameterizedMessage( - "{} failed to send response while starting shard [{}]", - request.shardId, - request - ), - channelException - ); - } + listener.onResponse(TransportResponse.Empty.INSTANCE); } } ); From e5da5d1e06af2e3c29114ba3cf9a839cf7d9e795 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Jan 2022 12:04:19 +0100 Subject: [PATCH 3/4] debug --- .../cluster/action/shard/ShardStateAction.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index eebf47ccc6ba7..347afc6ecb31a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -68,6 +68,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; public class ShardStateAction { @@ -300,10 +301,16 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel, new ClusterStateTaskListener() { @Override public void onFailure(Exception e) { - logger.error( - () -> new ParameterizedMessage("{} unexpected failure while failing shard [{}]", request.shardId, request), - e + final Supplier msg = () -> new ParameterizedMessage( + "{} unexpected failure while failing shard [{}]", + request.shardId, + request ); + if (e instanceof FailedToCommitClusterStateException) { + logger.debug(msg, e); + } else { + logger.error(msg, e); + } try { channel.sendResponse(e); } catch (Exception channelException) { From c67d9f258cbabc2e9166e3ec2fd96d979daf7164 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 19 Jan 2022 12:11:49 +0100 Subject: [PATCH 4/4] debug --- .../cluster/action/shard/ShardStateAction.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 347afc6ecb31a..ac1df9cfff0ec 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.MessageSupplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -68,7 +69,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.Predicate; -import java.util.function.Supplier; public class ShardStateAction { @@ -301,7 +301,7 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel, new ClusterStateTaskListener() { @Override public void onFailure(Exception e) { - final Supplier msg = () -> new ParameterizedMessage( + final MessageSupplier msg = () -> new ParameterizedMessage( "{} unexpected failure while failing shard [{}]", request.shardId, request @@ -632,10 +632,16 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel, new ClusterStateTaskListener() { @Override public void onFailure(Exception e) { - logger.error( - () -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", request.shardId, request), - e + final MessageSupplier msg = () -> new ParameterizedMessage( + "{} unexpected failure while starting shard [{}]", + request.shardId, + request ); + if (e instanceof FailedToCommitClusterStateException) { + logger.debug(msg, e); + } else { + logger.error(msg, e); + } listener.onFailure(e); }