Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;

Expand All @@ -60,8 +58,6 @@ public class ShardStateAction extends AbstractComponent {
private final AllocationService allocationService;
private final RoutingService routingService;

private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();

@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) {
Expand Down Expand Up @@ -155,7 +151,6 @@ public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<Sh
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
for (ShardRoutingEntry task : tasks) {
task.processed = true;
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
}
ClusterState maybeUpdatedState = currentState;
Expand Down Expand Up @@ -185,60 +180,47 @@ public void onFailure(String source, Throwable t) {
}
}

private final ShardStartedClusterStateHandler shardStartedClusterStateHandler =
new ShardStartedClusterStateHandler();

private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
// this is to optimize the number of "started" events we generate, and batch them
// possibly, we can do time based batching as well, but usually, we would want to
// process started events as fast as possible, to make shards available
startedShardsQueue.add(shardRoutingEntry);

clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
new ClusterStateUpdateTask() {
@Override
public Priority priority() {
return Priority.URGENT;
}

@Override
public ClusterState execute(ClusterState currentState) {

if (shardRoutingEntry.processed) {
return currentState;
}

List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<>();
startedShardsQueue.drainTo(shardRoutingEntries);

// nothing to process (a previous event has processed it already)
if (shardRoutingEntries.isEmpty()) {
return currentState;
}

List<ShardRouting> shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size());

// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
shardRoutingToBeApplied.add(entry.shardRouting);
}
clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler,
shardStartedClusterStateHandler);
}

if (shardRoutingToBeApplied.isEmpty()) {
return currentState;
}
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
for (ShardRoutingEntry task : tasks) {
shardRoutingsToBeApplied.add(task.shardRouting);
}
ClusterState maybeUpdatedState = currentState;
try {
RoutingAllocation.Result result =
allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied, true);
if (result.changed()) {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
builder.successes(tasks);
} catch (Throwable t) {
builder.failures(tasks, t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this made me worry we don't log these failures anymore.. In this specific case I think we are best to just let the exception bubble up, but it does raise a more general issue - if people put exceptions in the builder, it's their responsiblity to report it. we should probably add something to the internal cluster service to auto log it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably add something to the internal cluster service to auto log it.

@bleskes I will address this in a follow up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I opened #15428.

}

RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shardRoutingToBeApplied, true);
if (!routingResult.changed()) {
return currentState;
}
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
return builder.build(maybeUpdatedState);
}

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
}

private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
Expand Down Expand Up @@ -266,8 +248,6 @@ public static class ShardRoutingEntry extends TransportRequest {
String message;
Throwable failure;

volatile boolean processed; // state field, no need to serialize

public ShardRoutingEntry() {
}

Expand Down