Skip to content

Commit 07056f5

Browse files
authored
Introduce desired-balance allocator (#91343)
Today when updating the routing table (i.e. within `AllocationService#reroute()`) Elasticsearch computes the desired balance of shards and then identifies some shard movements that work towards that goal. At the end of the computation it discards the computed desired allocation and recomputes it the next time round. It's kind of inefficient to recompute the desired allocation each time, and it makes it hard to predict how long it will take until the goal is reached. The computation also happens on the critical path for cluster state updates. With this commit we introduce a new allocator which keeps hold of the desired balance between iterations. It also computes the desired balance asynchronously, allowing other cluster state updates to happen while the computation is ongoing. Relates #88647, #83777, and many more.
1 parent f66f10f commit 07056f5

File tree

133 files changed

+7078
-655
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+7078
-655
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.benchmark.routing.allocation;
99

1010
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.cluster.ClusterName;
1213
import org.elasticsearch.cluster.ClusterState;
1314
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -150,6 +151,9 @@ private int toInt(String v) {
150151
return Integer.valueOf(v.trim());
151152
}
152153

154+
/**
155+
* Once we use DesiredBalanceShardsAllocator this only measures reconciliation, not the balance calculation
156+
*/
153157
@Benchmark
154158
public ClusterState measureAllocation() {
155159
ClusterState clusterState = initialClusterState;
@@ -162,7 +166,7 @@ public ClusterState measureAllocation() {
162166
.filter(ShardRouting::initializing)
163167
.collect(Collectors.toList())
164168
);
165-
clusterState = strategy.reroute(clusterState, "reroute");
169+
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop());
166170
}
167171
return clusterState;
168172
}

docs/changelog/91343.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 91343
2+
summary: Introduce desired-balance allocator
3+
area: Allocation
4+
type: enhancement
5+
issues: []

modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.datastreams;
1010

1111
import org.elasticsearch.Version;
12+
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.DocWriteRequest;
1314
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1415
import org.elasticsearch.action.admin.indices.rollover.Condition;
@@ -245,7 +246,7 @@ public void setup() throws Exception {
245246
Environment env = mock(Environment.class);
246247
when(env.sharedDataFile()).thenReturn(null);
247248
AllocationService allocationService = mock(AllocationService.class);
248-
when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
249+
when(allocationService.reroute(any(ClusterState.class), any(String.class), any())).then(i -> i.getArguments()[0]);
249250
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
250251
createIndexService = new MetadataCreateIndexService(
251252
Settings.EMPTY,
@@ -277,7 +278,7 @@ public void setup() throws Exception {
277278
);
278279
}
279280

280-
createDataStreamService = new MetadataCreateDataStreamService(clusterService, createIndexService);
281+
createDataStreamService = new MetadataCreateDataStreamService(testThreadPool, clusterService, createIndexService);
281282
}
282283

283284
@After
@@ -306,7 +307,7 @@ private ClusterState createDataStream(ClusterState state, String name, Instant t
306307
TimeValue.ZERO,
307308
false
308309
);
309-
return createDataStreamService.createDataStream(request, state);
310+
return createDataStreamService.createDataStream(request, state, ActionListener.noop());
310311
}
311312

312313
private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, String name, Instant time) throws Exception {

server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.ElasticsearchParseException;
1212
import org.elasticsearch.Version;
1313
import org.elasticsearch.action.ActionFuture;
14+
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.ActionRequest;
1516
import org.elasticsearch.action.ActionRequestBuilder;
1617
import org.elasticsearch.action.ActionResponse;
@@ -105,7 +106,7 @@ public ClusterState execute(ClusterState currentState) {
105106
routingTable.addAsRecovery(updatedState.metadata().index(index));
106107
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
107108

108-
return allocationService.reroute(updatedState, "reroute");
109+
return allocationService.reroute(updatedState, "reroute", ActionListener.noop());
109110
}
110111

111112
@Override

server/src/internalClusterTest/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import org.elasticsearch.cluster.node.DiscoveryNode;
1414
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
1515
import org.elasticsearch.cluster.routing.UnassignedInfo;
16+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
17+
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
18+
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
1619
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
1720
import org.elasticsearch.cluster.service.ClusterService;
1821
import org.elasticsearch.common.Priority;
@@ -425,7 +428,18 @@ public void testDoNotCancelRecoveryForBrokenNode() throws Exception {
425428
);
426429
internalCluster().startDataOnlyNode();
427430
newNodeStarted.countDown();
428-
ensureGreen(indexName);
431+
432+
var allocator = internalCluster().getInstance(ShardsAllocator.class);
433+
if (allocator instanceof BalancedShardsAllocator) {
434+
// BalancedShardsAllocator will try other node once retries are exhausted
435+
ensureGreen(indexName);
436+
} else if (allocator instanceof DesiredBalanceShardsAllocator) {
437+
// DesiredBalanceShardsAllocator will keep shard in the error state if it could not be allocated on the desired node
438+
ensureYellow(indexName);
439+
} else {
440+
fail("Unknown allocator used");
441+
}
442+
429443
transportService.clearAllRules();
430444
}
431445

server/src/internalClusterTest/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void beforeIndexCreated(Index index, Settings indexSettings) {
114114
* Tests that if an *index* structure creation fails on relocation to a new node, the shard
115115
* is not stuck but properly failed.
116116
*/
117-
public void testIndexShardFailedOnRelocation() throws Throwable {
117+
public void testIndexShardFailedOnRelocation() {
118118
String node1 = internalCluster().startNode();
119119
client().admin()
120120
.indices()

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
import org.elasticsearch.cluster.routing.ShardRouting;
5353
import org.elasticsearch.cluster.routing.ShardRoutingState;
5454
import org.elasticsearch.cluster.routing.UnassignedInfo;
55+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
56+
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
57+
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
5558
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
5659
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
5760
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -937,7 +940,18 @@ public void testDoNotInfinitelyWaitForMapping() {
937940
});
938941
}
939942
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
940-
ensureGreen("test");
943+
944+
var allocator = internalCluster().getInstance(ShardsAllocator.class);
945+
if (allocator instanceof BalancedShardsAllocator) {
946+
// BalancedShardsAllocator will try other node once retries are exhausted
947+
ensureGreen("test");
948+
} else if (allocator instanceof DesiredBalanceShardsAllocator) {
949+
// DesiredBalanceShardsAllocator will keep shard in the error state if it could not be allocated on the desired node
950+
ensureYellow("test");
951+
} else {
952+
fail("Unknown allocator used");
953+
}
954+
941955
client().admin().indices().prepareRefresh("test").get();
942956
assertHitCount(client().prepareSearch().get(), numDocs);
943957
}

server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,26 @@
1818
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
1919
import org.elasticsearch.action.support.ActionFilters;
2020
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
21-
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
2221
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.ClusterStateAckListener;
2323
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2424
import org.elasticsearch.cluster.block.ClusterBlockException;
2525
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2626
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2727
import org.elasticsearch.cluster.node.DiscoveryNode;
2828
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2929
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
30+
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionListener;
3031
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
3132
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
3233
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
3334
import org.elasticsearch.cluster.service.ClusterService;
3435
import org.elasticsearch.common.Priority;
3536
import org.elasticsearch.common.Strings;
3637
import org.elasticsearch.common.inject.Inject;
38+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3739
import org.elasticsearch.core.SuppressForbidden;
40+
import org.elasticsearch.core.TimeValue;
3841
import org.elasticsearch.tasks.Task;
3942
import org.elasticsearch.threadpool.ThreadPool;
4043
import org.elasticsearch.transport.TransportService;
@@ -161,12 +164,18 @@ private void verifyThenSubmitUpdate(
161164
private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
162165
submitUnbatchedTask(
163166
TASK_SOURCE,
164-
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, listener.map(response -> {
165-
if (request.dryRun() == false) {
166-
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
167-
}
168-
return response;
169-
}))
167+
new ClusterRerouteResponseAckedClusterStateUpdateTask(
168+
logger,
169+
allocationService,
170+
threadPool.getThreadContext(),
171+
request,
172+
listener.map(response -> {
173+
if (request.dryRun() == false) {
174+
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
175+
}
176+
return response;
177+
})
178+
)
170179
);
171180
}
172181

@@ -175,10 +184,10 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
175184
clusterService.submitUnbatchedStateUpdateTask(source, task);
176185
}
177186

178-
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask {
187+
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends ClusterStateUpdateTask implements ClusterStateAckListener {
179188

180189
private final ClusterRerouteRequest request;
181-
private final ActionListener<ClusterRerouteResponse> listener;
190+
private final AllocationActionListener<ClusterRerouteResponse> listener;
182191
private final Logger logger;
183192
private final AllocationService allocationService;
184193
private volatile ClusterState clusterStateToSend;
@@ -187,46 +196,61 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus
187196
ClusterRerouteResponseAckedClusterStateUpdateTask(
188197
Logger logger,
189198
AllocationService allocationService,
199+
ThreadContext context,
190200
ClusterRerouteRequest request,
191201
ActionListener<ClusterRerouteResponse> listener
192202
) {
193-
super(Priority.IMMEDIATE, request, listener);
203+
super(Priority.IMMEDIATE);
194204
this.request = request;
195-
this.listener = listener;
205+
this.listener = new AllocationActionListener<>(listener, context);
196206
this.logger = logger;
197207
this.allocationService = allocationService;
198208
}
199209

200210
@Override
201-
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
202-
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
211+
public boolean mustAck(DiscoveryNode discoveryNode) {
212+
return true;
213+
}
214+
215+
@Override
216+
public TimeValue ackTimeout() {
217+
return request.ackTimeout();
218+
}
219+
220+
@Override
221+
public void onAllNodesAcked() {
222+
listener.clusterStateUpdate().onResponse(new ClusterRerouteResponse(true, clusterStateToSend, explanations));
223+
}
224+
225+
@Override
226+
public void onAckFailure(Exception e) {
227+
listener.clusterStateUpdate().onResponse(new ClusterRerouteResponse(false, clusterStateToSend, explanations));
203228
}
204229

205230
@Override
206231
public void onAckTimeout() {
207-
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
232+
listener.clusterStateUpdate().onResponse(new ClusterRerouteResponse(false, clusterStateToSend, new RoutingExplanations()));
208233
}
209234

210235
@Override
211236
public void onFailure(Exception e) {
212237
logger.debug("failed to perform [" + TASK_SOURCE + "]", e);
213-
super.onFailure(e);
238+
listener.clusterStateUpdate().onFailure(e);
214239
}
215240

216241
@Override
217242
public ClusterState execute(ClusterState currentState) {
218-
AllocationService.CommandsResult commandsResult = allocationService.reroute(
243+
var result = allocationService.reroute(
219244
currentState,
220245
request.getCommands(),
221246
request.explain(),
222-
request.isRetryFailed()
247+
request.isRetryFailed(),
248+
request.dryRun(),
249+
listener.reroute()
223250
);
224-
clusterStateToSend = commandsResult.clusterState();
225-
explanations = commandsResult.explanations();
226-
if (request.dryRun()) {
227-
return currentState;
228-
}
229-
return commandsResult.clusterState();
251+
clusterStateToSend = result.clusterState();
252+
explanations = result.explanations();
253+
return request.dryRun() ? currentState : result.clusterState();
230254
}
231255
}
232256
}

0 commit comments

Comments
 (0)