Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.MessageSupplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
Expand Down Expand Up @@ -273,6 +275,7 @@ public void onTimeout(TimeValue timeout) {
}, changePredicate);
}

// TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code
private static class ShardFailedTransportHandler implements TransportRequestHandler<FailedShardEntry> {
private final ClusterService clusterService;
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
Expand All @@ -298,10 +301,16 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel,
new ClusterStateTaskListener() {
@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage("{} unexpected failure while failing shard [{}]", request.shardId, request),
e
final MessageSupplier msg = () -> new ParameterizedMessage(
"{} unexpected failure while failing shard [{}]",
request.shardId,
request
);
if (e instanceof FailedToCommitClusterStateException) {
logger.debug(msg, e);
} else {
logger.error(msg, e);
}
try {
channel.sendResponse(e);
} catch (Exception channelException) {
Expand All @@ -320,7 +329,7 @@ public void onFailure(Exception e) {

@Override
public void onNoLongerMaster() {
logger.error("{} no longer master while failing shard [{}]", request.shardId, request);
logger.debug("{} no longer master while failing shard [{}]", request.shardId, request);
try {
channel.sendResponse(new NotMasterException(TASK_SOURCE));
} catch (Exception channelException) {
Expand Down Expand Up @@ -594,6 +603,7 @@ public void shardStarted(
);
}

// TODO: Make this a TransportMasterNodeAction and remove duplication of master failover retrying from upstream code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly 👍 except that I believe TransportMasterNodeAction requires a timeout today but these things should not time out. Relates #82185 too I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, but we can just do what we did for the snapshot shard state update and set it to the max value. We can even do nicer here and create an override of the master node request that doesn't physically write the always-max value redundantly.
I actually mostly implemented this already this morning but then figured this one I can get merged more quickly and it actually helps my benchmarks :)

private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
private final ClusterService clusterService;
private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor;
Expand All @@ -609,20 +619,44 @@ private static class ShardStartedTransportHandler implements TransportRequestHan
@Override
public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardId, request);
final ChannelActionListener<TransportResponse.Empty, StartedShardEntry> listener = new ChannelActionListener<>(
channel,
SHARD_STARTED_ACTION_NAME,
request
);
clusterService.submitStateUpdateTask(
"shard-started " + request,
request,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
e -> {
if (e instanceof FailedToCommitClusterStateException || e instanceof NotMasterException) {
logger.debug(new ParameterizedMessage("failure during [shard-started {}]", request), e);
} else {
logger.error(new ParameterizedMessage("unexpected failure during [shard-started {}]", request), e);
new ClusterStateTaskListener() {
@Override
public void onFailure(Exception e) {
final MessageSupplier msg = () -> new ParameterizedMessage(
"{} unexpected failure while starting shard [{}]",
request.shardId,
request
);
if (e instanceof FailedToCommitClusterStateException) {
logger.debug(msg, e);
} else {
logger.error(msg, e);
}
listener.onFailure(e);
}

@Override
public void onNoLongerMaster() {
logger.debug("{} no longer master while starting shard [{}]", request.shardId, request);
listener.onFailure(new NotMasterException("shard-started"));
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}
}
);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

Expand Down