Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5c634a7
wip cluster info simulation
idegtiarenko Sep 14, 2022
342ba59
use cluster info simulator
idegtiarenko Sep 14, 2022
d0fa4c6
null check
idegtiarenko Sep 15, 2022
105fcef
Fix MockDiskUsagesIT
idegtiarenko Sep 15, 2022
7293edd
do not keep reserved info
idegtiarenko Sep 15, 2022
779016b
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 19, 2022
5c919f9
multiple data path test case
idegtiarenko Sep 20, 2022
103610a
add range check
idegtiarenko Sep 20, 2022
5aaad09
testComputeConsideringShardSizes
idegtiarenko Sep 21, 2022
cb7a174
keep node below watermark
idegtiarenko Sep 21, 2022
5faf34e
move class
idegtiarenko Sep 21, 2022
a8072ca
fix move
idegtiarenko Sep 21, 2022
2985353
rework test
idegtiarenko Sep 21, 2022
07be67f
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 21, 2022
480ad7a
Update server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
idegtiarenko Sep 21, 2022
ee9a19f
Update server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
idegtiarenko Sep 21, 2022
8c92948
make sure test is failing without simulated disk space
idegtiarenko Sep 21, 2022
2f1e6fb
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 26, 2022
67ee7dd
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 29, 2022
52fa7d3
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Sep 29, 2022
93a72dc
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Oct 3, 2022
b64afdd
Merge branch 'feature/desired-balance-allocator' into simulate_disk_u…
idegtiarenko Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class ClusterInfoSimulator {

private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
private final Map<String, Long> shardSizes;
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<ClusterInfo.NodeAndShard, String> dataPath;

public ClusterInfoSimulator(ClusterInfo clusterInfo) {
this.leastAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeLeastAvailableDiskUsages());
this.mostAvailableSpaceUsage = new HashMap<>(clusterInfo.getNodeMostAvailableDiskUsages());
this.shardSizes = new HashMap<>(clusterInfo.shardSizes);
this.shardDataSetSizes = new HashMap<>(clusterInfo.shardDataSetSizes);
this.dataPath = new HashMap<>(clusterInfo.dataPath);
}

public void simulate(ShardRouting shard) {
assert shard.initializing();

var size = getEstimatedShardSize(shard);
if (size != null && size > 0) {
if (shard.relocatingNodeId() != null) {
// relocation
modifyDiskUsage(shard.relocatingNodeId(), getShardPath(shard.relocatingNodeId(), mostAvailableSpaceUsage), size);
modifyDiskUsage(shard.currentNodeId(), getShardPath(shard.currentNodeId(), leastAvailableSpaceUsage), -size);
} else {
// new shard
modifyDiskUsage(shard.currentNodeId(), getShardPath(shard.currentNodeId(), leastAvailableSpaceUsage), -size);
shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shard), size);
}
}
}

private Long getEstimatedShardSize(ShardRouting routing) {
if (routing.relocatingNodeId() != null) {
// relocation existing shard, get size of the source shard
return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing));
} else if (routing.primary() == false) {
// initializing new replica, get size of the source primary shard
return shardSizes.get(ClusterInfo.shardIdentifierFromRouting(routing.shardId(), true));
} else {
// initializing new (empty) primary
return 0L;
}
}

private String getShardPath(String nodeId, Map<String, DiskUsage> defaultSpaceUsage) {
var diskUsage = defaultSpaceUsage.get(nodeId);
return diskUsage != null ? diskUsage.getPath() : null;
}

private void modifyDiskUsage(String nodeId, String path, long delta) {
var leastUsage = leastAvailableSpaceUsage.get(nodeId);
if (leastUsage != null && Objects.equals(leastUsage.getPath(), path)) {
// ensure new value is within bounds
leastAvailableSpaceUsage.put(nodeId, updateWithFreeBytes(leastUsage, delta));
}
var mostUsage = mostAvailableSpaceUsage.get(nodeId);
if (mostUsage != null && Objects.equals(mostUsage.getPath(), path)) {
// ensure new value is within bounds
mostAvailableSpaceUsage.put(nodeId, updateWithFreeBytes(mostUsage, delta));
}
}

private static DiskUsage updateWithFreeBytes(DiskUsage usage, long delta) {
// free bytes might go out of range in case when multiple data path are used
// we might not know exact disk used to allocate a shard and conservatively update
// most used disk on a target node and least used disk on a source node
var freeBytes = withinRange(0, usage.getTotalBytes(), usage.freeBytes() + delta);
return usage.copyWithFreeBytes(freeBytes);
}

private static long withinRange(long min, long max, long value) {
return Math.max(min, Math.min(max, value));
}

public ClusterInfo getClusterInfo() {
return new ClusterInfo(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, Map.of());
}
}
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/DiskUsage.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public String toString() {
+ "]";
}

public DiskUsage copyWithFreeBytes(long freeBytes) {
return new DiskUsage(nodeId, nodeName, path, totalBytes, freeBytes);
}

/**
* Finds the path with the least available disk space and returns its disk usage. It returns null if there is no
* file system data in the NodeStats or if the total bytes are a negative number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class RoutingAllocation {

private final ClusterState clusterState;

private final ClusterInfo clusterInfo;
private ClusterInfo clusterInfo;

private final SnapshotShardSizeInfo shardSizeInfo;

Expand Down Expand Up @@ -407,6 +407,11 @@ public boolean isSimulating() {
return isSimulating;
}

public void setSimulatedClusterInfo(ClusterInfo clusterInfo) {
assert isSimulating : "Should be called only while simulating";
this.clusterInfo = clusterInfo;
}

public RoutingAllocation immutableClone() {
return new RoutingAllocation(deciders, clusterState, clusterInfo, shardSizeInfo, currentNanoTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterInfoSimulator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -58,7 +59,7 @@ public DesiredBalance compute(
final var changes = routingAllocation.changes();
final var ignoredShards = desiredBalanceInput.ignoredShards();
final var knownNodeIds = routingAllocation.nodes().stream().map(DiscoveryNode::getId).collect(toSet());
final var unassignedPrimaries = new HashSet<ShardId>();
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation.clusterInfo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we enhance DesiredBalanceComputerTests to verify we're accounting for disk usage in all the right places?


if (routingNodes.isEmpty()) {
return new DesiredBalance(desiredBalanceInput.index(), Map.of());
Expand All @@ -68,14 +69,15 @@ public DesiredBalance compute(
for (final var routingNode : routingNodes) {
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
clusterInfoSimulator.simulate(shardRouting);
routingNodes.startShard(logger, shardRouting, changes, 0L);
// TODO adjust disk usage info to reflect the assumed shard movement
}
}
}

// we are not responsible for allocating unassigned primaries of existing shards, and we're only responsible for allocating
// unassigned replicas if the ReplicaShardAllocator gives up, so we must respect these ignored shards
final var unassignedPrimaries = new HashSet<ShardId>();
final var shardRoutings = new HashMap<ShardId, ShardRoutings>();
for (final var primary : new boolean[] { true, false }) {
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
Expand Down Expand Up @@ -134,12 +136,9 @@ public DesiredBalance compute(
for (final var shardRouting : shardsToRelocate) {
assert shardRouting.started();
if (targetNodesIterator.hasNext()) {
routingNodes.startShard(
logger,
routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2(),
changes,
0L
);
ShardRouting shardToRelocate = routingNodes.relocateShard(shardRouting, targetNodesIterator.next(), 0L, changes).v2();
clusterInfoSimulator.simulate(shardToRelocate);
routingNodes.startShard(logger, shardToRelocate, changes, 0L);
} else {
break;
}
Expand All @@ -163,7 +162,9 @@ public DesiredBalance compute(
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
if (nodeIds != null && nodeIds.isEmpty() == false) {
final String nodeId = nodeIds.removeFirst();
routingNodes.startShard(logger, unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes), changes, 0L);
ShardRouting shardToInitialized = unassignedPrimaryIterator.initialize(nodeId, null, 0L, changes);
clusterInfoSimulator.simulate(shardToInitialized);
routingNodes.startShard(logger, shardToInitialized, changes, 0L);
}
}
}
Expand All @@ -175,7 +176,9 @@ public DesiredBalance compute(
final var nodeIds = unassignedShardsToInitialize.get(shardRouting);
if (nodeIds != null && nodeIds.isEmpty() == false) {
final String nodeId = nodeIds.removeFirst();
routingNodes.startShard(logger, unassignedReplicaIterator.initialize(nodeId, null, 0L, changes), changes, 0L);
ShardRouting shardToInitialize = unassignedReplicaIterator.initialize(nodeId, null, 0L, changes);
clusterInfoSimulator.simulate(shardToInitialize);
routingNodes.startShard(logger, shardToInitialize, changes, 0L);
}
}
}
Expand Down Expand Up @@ -215,6 +218,7 @@ public DesiredBalance compute(
// TODO test that we reset ignored shards properly
}

routingAllocation.setSimulatedClusterInfo(clusterInfoSimulator.getClusterInfo());
logger.trace("running delegate allocator");
delegateAllocator.allocate(routingAllocation);
assert routingNodes.unassigned().size() == 0; // any unassigned shards should now be ignored
Expand All @@ -224,9 +228,9 @@ public DesiredBalance compute(
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
hasChanges = true;
clusterInfoSimulator.simulate(shardRouting);
routingNodes.startShard(logger, shardRouting, changes, 0L);
logger.trace("starting shard {}", shardRouting);
// TODO adjust disk usage info to reflect the assumed shard movement
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoSimulator;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.test.ESTestCase;

import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.equalTo;

public class ClusterInfoSimulatorTests extends ESTestCase {

public void testInitializeNewPrimary() {

var newPrimary = newShardRouting("index-1", 0, "node-0", true, INITIALIZING);

var simulator = new ClusterInfoSimulator(
new ClusterInfoTestBuilder() //
.withNode("node-0", new DiskUsageBuilder(1000, 1000))
.withNode("node-1", new DiskUsageBuilder(1000, 1000))
.withShard(newPrimary, 0)
.build()
);
simulator.simulate(newPrimary);

assertThat(
simulator.getClusterInfo(),
equalTo(
new ClusterInfoTestBuilder() //
.withNode("node-0", new DiskUsageBuilder(1000, 1000))
.withNode("node-1", new DiskUsageBuilder(1000, 1000))
.withShard(newPrimary, 0)
.build()
)
);
}

public void testInitializeNewReplica() {

var existingPrimary = newShardRouting("index-1", 0, "node-0", true, STARTED);
var newReplica = newShardRouting("index-1", 0, "node-1", false, INITIALIZING);

var simulator = new ClusterInfoSimulator(
new ClusterInfoTestBuilder() //
.withNode("node-0", new DiskUsageBuilder(1000, 900))
.withNode("node-1", new DiskUsageBuilder(1000, 1000))
.withShard(existingPrimary, 100)
.withShard(newReplica, 0)
.build()
);
simulator.simulate(newReplica);

assertThat(
simulator.getClusterInfo(),
equalTo(
new ClusterInfoTestBuilder() //
.withNode("node-0", new DiskUsageBuilder(1000, 900))
.withNode("node-1", new DiskUsageBuilder(1000, 900))
.withShard(existingPrimary, 100)
.withShard(newReplica, 100)
.build()
)
);
}

public void testRelocateShard() {

var fromNodeId = "node-0";
var toNodeId = "node-1";

var shard = newShardRouting("index-1", 0, toNodeId, fromNodeId, true, INITIALIZING);

var simulator = new ClusterInfoSimulator(
new ClusterInfoTestBuilder() //
.withNode(fromNodeId, new DiskUsageBuilder(1000, 900))
.withNode(toNodeId, new DiskUsageBuilder(1000, 1000))
.withShard(shard, 100)
.build()
);
simulator.simulate(shard);

assertThat(
simulator.getClusterInfo(),
equalTo(
new ClusterInfoTestBuilder() //
.withNode(fromNodeId, new DiskUsageBuilder(1000, 1000))
.withNode(toNodeId, new DiskUsageBuilder(1000, 900))
.withShard(shard, 100)
.build()
)
);
}

public void testRelocateShardWithMultipleDataPath1() {

var fromNodeId = "node-0";
var toNodeId = "node-1";

var shard = newShardRouting("index-1", 0, toNodeId, fromNodeId, true, INITIALIZING);

var simulator = new ClusterInfoSimulator(
new ClusterInfoTestBuilder() //
.withNode(fromNodeId, new DiskUsageBuilder("/data-1", 1000, 500), new DiskUsageBuilder("/data-2", 1000, 750))
.withNode(toNodeId, new DiskUsageBuilder("/data-1", 1000, 750), new DiskUsageBuilder("/data-2", 1000, 900))
.withShard(shard, 100)
.build()
);
simulator.simulate(shard);

assertThat(
simulator.getClusterInfo(),
equalTo(
new ClusterInfoTestBuilder() //
.withNode(fromNodeId, new DiskUsageBuilder("/data-1", 1000, 500), new DiskUsageBuilder("/data-2", 1000, 850))
.withNode(toNodeId, new DiskUsageBuilder("/data-1", 1000, 650), new DiskUsageBuilder("/data-2", 1000, 900))
.withShard(shard, 100)
.build()
)
);
}

private static class ClusterInfoTestBuilder {

private final Map<String, DiskUsage> leastAvailableSpaceUsage = new HashMap<>();
private final Map<String, DiskUsage> mostAvailableSpaceUsage = new HashMap<>();
private final Map<String, Long> shardSizes = new HashMap<>();

public ClusterInfoTestBuilder withNode(String name, DiskUsageBuilder diskUsageBuilderBuilder) {
leastAvailableSpaceUsage.put(name, diskUsageBuilderBuilder.toDiskUsage(name));
mostAvailableSpaceUsage.put(name, diskUsageBuilderBuilder.toDiskUsage(name));
return this;
}

public ClusterInfoTestBuilder withNode(String name, DiskUsageBuilder leastAvailableSpace, DiskUsageBuilder mostAvailableSpace) {
leastAvailableSpaceUsage.put(name, leastAvailableSpace.toDiskUsage(name));
mostAvailableSpaceUsage.put(name, mostAvailableSpace.toDiskUsage(name));
return this;
}

public ClusterInfoTestBuilder withShard(ShardRouting shard, long size) {
shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shard), size);
return this;
}

public ClusterInfo build() {
return new ClusterInfo(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), Map.of());
}
}

private record DiskUsageBuilder(String path, long total, long free) {

private DiskUsageBuilder(long total, long free) {
this("/data", total, free);
}

public DiskUsage toDiskUsage(String name) {
return new DiskUsage(name, name, name + '/' + path, total, free);
}
}
}
Loading