Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ public void handleException(TransportException exp) {
onReplicaFailure(nodeId, exp);
} else {
logger.warn("{} failed to perform {} on node {}", exp, shardId, transportReplicaAction, node);
shardStateAction.shardFailed(shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
shardStateAction.shardFailed(clusterService.state(), shard, indexUUID, "failed to perform " + transportReplicaAction + " on replica on node " + node, exp, shardFailedTimeout, new ReplicationFailedShardStateListener(nodeId, exp));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -58,49 +59,38 @@

import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;


public class ShardStateAction extends AbstractComponent {
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";

private final TransportService transportService;
private final ClusterService clusterService;
private final AllocationService allocationService;
private final RoutingService routingService;

@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) {
super(settings);
this.clusterService = clusterService;
this.transportService = transportService;
this.allocationService = allocationService;
this.routingService = routingService;

transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler());
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler());
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
}

public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(clusterState, shardRouting, indexUUID, message, failure, null, listener);
}

public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
shardFailed(shardRouting, indexUUID, message, failure, null, listener);
public void resendShardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message);
shardFailed(clusterState, shardRouting, indexUUID, message, failure, listener);
}

public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
public void shardFailed(final ClusterState clusterState, final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, TimeValue timeout, Listener listener) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) {
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
logger.warn("{} no master known to fail shard [{}]", shardRouting.shardId(), shardRouting);
listener.onShardFailedNoMaster();
return;
}
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, timeout, listener);
}

public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure, Listener listener) {
logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
innerShardFailed(shardRouting, indexUUID, masterNode, message, failure, null, listener);
}

private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure, TimeValue timeout, Listener listener) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
TransportRequestOptions options = TransportRequestOptions.EMPTY;
if (timeout != null) {
Expand All @@ -115,33 +105,49 @@ public void handleResponse(TransportResponse.Empty response) {

@Override
public void handleException(TransportException exp) {
logger.warn("unexpected failure while sending request to [{}] to fail shard [{}]", exp, masterNode, shardRoutingEntry);
logger.warn("{} unexpected failure while sending request to [{}] to fail shard [{}]", exp, shardRoutingEntry.shardRouting.shardId(), masterNode, shardRoutingEntry);
listener.onShardFailedFailure(masterNode, exp);
}
});
}

private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private static class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private final ClusterService clusterService;
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
private final ESLogger logger;

public ShardFailedTransportHandler(ClusterService clusterService, ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor, ESLogger logger) {
this.clusterService = clusterService;
this.shardFailedClusterStateTaskExecutor = shardFailedClusterStateTaskExecutor;
this.logger = logger;
}

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request, new ClusterStateTaskListener() {
logger.warn("{} received shard failed for {}", request.failure, request.shardRouting.shardId(), request);
clusterService.submitStateUpdateTask(
"shard-failed (" + request.shardRouting + "), message [" + request.message + "]",
request,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateTaskExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure while failing shard [{}]", t, request.shardRouting);
logger.error("{} unexpected failure while failing shard [{}]", t, request.shardRouting.shardId(), request.shardRouting);
try {
channel.sendResponse(t);
} catch (Throwable channelThrowable) {
logger.warn("failed to send failure [{}] while failing shard [{}]", channelThrowable, t, request.shardRouting);
logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), t, request.shardRouting);
}
}

@Override
public void onNoLongerMaster(String source) {
logger.error("no longer master while failing shard [{}]", request.shardRouting);
logger.error("{} no longer master while failing shard [{}]", request.shardRouting.shardId(), request.shardRouting);
try {
channel.sendResponse(new NotMasterException(source));
} catch (Throwable channelThrowable) {
logger.warn("failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting);
logger.warn("{} failed to send no longer master while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), request.shardRouting);
}
}

Expand All @@ -150,15 +156,25 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable channelThrowable) {
logger.warn("failed to send response while failing shard [{}]", channelThrowable, request.shardRouting);
logger.warn("{} failed to send response while failing shard [{}]", channelThrowable, request.shardRouting.shardId(), request.shardRouting);
}
}
}
);
}
}

class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> {
private static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final ESLogger logger;

public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) {
this.allocationService = allocationService;
this.routingService = routingService;
this.logger = logger;
}

@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
Expand Down Expand Up @@ -192,48 +208,56 @@ public void clusterStatePublished(ClusterState newClusterState) {
}
}

private final ShardFailedClusterStateHandler shardFailedClusterStateHandler = new ShardFailedClusterStateHandler();

private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) {
logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
listener);
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
public void shardStarted(final ClusterState clusterState, final ShardRouting shardRouting, String indexUUID, final String reason) {
DiscoveryNode masterNode = clusterState.nodes().masterNode();
if (masterNode == null) {
logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
logger.warn("{} no master known to start shard [{}]", shardRouting.shardId(), shardRouting);
return;
}
shardStarted(shardRouting, indexUUID, reason, masterNode);
}

public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
logger.debug("sending start shard [{}]", shardRoutingEntry);
transportService.sendRequest(masterNode,
SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to send shard started to [{}]", exp, masterNode);
logger.warn("{} failure sending start shard [{}] to [{}]", exp, shardRouting.shardId(), masterNode, shardRouting);
}
});
}

class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private static class ShardStartedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
private final ClusterService clusterService;
private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor;
private final ESLogger logger;

public ShardStartedTransportHandler(ClusterService clusterService, ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor, ESLogger logger) {
this.clusterService = clusterService;
this.shardStartedClusterStateTaskExecutor = shardStartedClusterStateTaskExecutor;
this.logger = logger;
}

@Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardStartedOnMaster(request);
logger.debug("{} received shard started for [{}]", request.shardRouting.shardId(), request);
clusterService.submitStateUpdateTask(
"shard-started (" + request.shardRouting + "), reason [" + request.message + "]",
request,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
shardStartedClusterStateTaskExecutor);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
private static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
private final AllocationService allocationService;
private final ESLogger logger;

public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, ESLogger logger) {
this.allocationService = allocationService;
this.logger = logger;
}

@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
Expand Down Expand Up @@ -262,19 +286,6 @@ public void onFailure(String source, Throwable t) {
}
}

private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = new ShardStartedClusterStateHandler();

private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);

clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler,
shardStartedClusterStateHandler);
}

public static class ShardRoutingEntry extends TransportRequest {
ShardRouting shardRouting;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
Expand Down
Loading