Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.benchmark.routing.allocation;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -150,6 +151,9 @@ private int toInt(String v) {
return Integer.valueOf(v.trim());
}

/**
* Once we use DesiredBalanceShardsAllocator this only measures reconciliation, not the balance calculation
*/
@Benchmark
public ClusterState measureAllocation() {
ClusterState clusterState = initialClusterState;
Expand All @@ -162,7 +166,7 @@ public ClusterState measureAllocation() {
.filter(ShardRouting::initializing)
.collect(Collectors.toList())
);
clusterState = strategy.reroute(clusterState, "reroute");
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop());
}
return clusterState;
}
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/91343.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91343
summary: Introduce desired-balance allocator
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.Condition;
Expand Down Expand Up @@ -245,7 +246,7 @@ public void setup() throws Exception {
Environment env = mock(Environment.class);
when(env.sharedDataFile()).thenReturn(null);
AllocationService allocationService = mock(AllocationService.class);
when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
when(allocationService.reroute(any(ClusterState.class), any(String.class), any())).then(i -> i.getArguments()[0]);
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
createIndexService = new MetadataCreateIndexService(
Settings.EMPTY,
Expand Down Expand Up @@ -277,7 +278,7 @@ public void setup() throws Exception {
);
}

createDataStreamService = new MetadataCreateDataStreamService(clusterService, createIndexService);
createDataStreamService = new MetadataCreateDataStreamService(testThreadPool, clusterService, createIndexService);
}

@After
Expand Down Expand Up @@ -306,7 +307,7 @@ private ClusterState createDataStream(ClusterState state, String name, Instant t
TimeValue.ZERO,
false
);
return createDataStreamService.createDataStream(request, state);
return createDataStreamService.createDataStream(request, state, ActionListener.noop());
}

private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, String name, Instant time) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -105,7 +106,7 @@ public ClusterState execute(ClusterState currentState) {
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();

return allocationService.reroute(updatedState, "reroute");
return allocationService.reroute(updatedState, "reroute", ActionListener.noop());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -425,7 +428,18 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception {
);
internalCluster().startDataOnlyNode();
newNodeStarted.countDown();
ensureGreen(indexName);

var allocator = internalCluster().getInstance(ShardsAllocator.class);
if (allocator instanceof BalancedShardsAllocator) {
// BalancedShardsAllocator will try other node once retries are exhausted
ensureGreen(indexName);
} else if (allocator instanceof DesiredBalanceShardsAllocator) {
// DesiredBalanceShardsAllocator will keep shard in the error state if it could not be allocated on the desired node
ensureYellow(indexName);
} else {
fail("Unknown allocator used");
}

transportService.clearAllRules();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void beforeIndexCreated(Index index, Settings indexSettings) {
* Tests that if an *index* structure creation fails on relocation to a new node, the shard
* is not stuck but properly failed.
*/
public void testIndexShardFailedOnRelocation() throws Throwable {
public void testIndexShardFailedOnRelocation() {
String node1 = internalCluster().startNode();
client().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand Down Expand Up @@ -937,7 +940,18 @@ public void testDoNotInfinitelyWaitForMapping() {
});
}
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
ensureGreen("test");

var allocator = internalCluster().getInstance(ShardsAllocator.class);
if (allocator instanceof BalancedShardsAllocator) {
// BalancedShardsAllocator will try other node once retries are exhausted
ensureGreen("test");
} else if (allocator instanceof DesiredBalanceShardsAllocator) {
// DesiredBalanceShardsAllocator will keep shard in the error state if it could not be allocated on the desired node
ensureYellow("test");
} else {
fail("Unknown allocator used");
}

client().admin().indices().prepareRefresh("test").get();
assertHitCount(client().prepareSearch().get(), numDocs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionListener;
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -161,12 +164,18 @@ private void verifyThenSubmitUpdate(
private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
submitUnbatchedTask(
TASK_SOURCE,
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, listener.map(response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
return response;
}))
new ClusterRerouteResponseAckedClusterStateUpdateTask(
logger,
allocationService,
threadPool.getThreadContext(),
request,
listener.map(response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
return response;
})
)
);
}

Expand All @@ -175,10 +184,10 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
clusterService.submitUnbatchedStateUpdateTask(source, task);
}

static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask {
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends ClusterStateUpdateTask implements ClusterStateAckListener {

private final ClusterRerouteRequest request;
private final ActionListener<ClusterRerouteResponse> listener;
private final AllocationActionListener<ClusterRerouteResponse> listener;
private final Logger logger;
private final AllocationService allocationService;
private volatile ClusterState clusterStateToSend;
Expand All @@ -187,46 +196,61 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus
ClusterRerouteResponseAckedClusterStateUpdateTask(
Logger logger,
AllocationService allocationService,
ThreadContext context,
ClusterRerouteRequest request,
ActionListener<ClusterRerouteResponse> listener
) {
super(Priority.IMMEDIATE, request, listener);
super(Priority.IMMEDIATE);
this.request = request;
this.listener = listener;
this.listener = new AllocationActionListener<>(listener, context);
this.logger = logger;
this.allocationService = allocationService;
}

@Override
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}

@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}

@Override
public void onAllNodesAcked() {
listener.clusterStateUpdate().onResponse(new ClusterRerouteResponse(true, clusterStateToSend, explanations));
}

@Override
public void onAckFailure(Exception e) {
listener.clusterStateUpdate().onResponse(new ClusterRerouteResponse(false, clusterStateToSend, explanations));
}

@Override
public void onAckTimeout() {
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
listener.clusterStateUpdate().onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
}

@Override
public void onFailure(Exception e) {
logger.debug("failed to perform [" + TASK_SOURCE + "]", e);
super.onFailure(e);
listener.clusterStateUpdate().onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) {
AllocationService.CommandsResult commandsResult = allocationService.reroute(
var result = allocationService.reroute(
currentState,
request.getCommands(),
request.explain(),
request.isRetryFailed()
request.isRetryFailed(),
request.dryRun(),
listener.reroute()
);
clusterStateToSend = commandsResult.clusterState();
explanations = commandsResult.explanations();
if (request.dryRun()) {
return currentState;
}
return commandsResult.clusterState();
clusterStateToSend = result.clusterState();
explanations = result.explanations();
return request.dryRun() ? currentState : result.clusterState();
}
}
}
Loading