From 8d3059a6d6a29e3853697d8d880b62cf40275c79 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 26 Oct 2021 19:53:42 +0200 Subject: [PATCH 1/2] Make MasterService.patchVersions not Rebuild the Full CS This makes the method really just patch the version via a cheap copy constructor. Moreover, it makes the cluster state builder smarter when it comes to updating the routing nodes so they aren't rebuilt so often as well. --- .../elasticsearch/cluster/ClusterState.java | 36 +++++-- .../cluster/metadata/Metadata.java | 98 +++++++++++++++---- .../cluster/routing/RoutingTable.java | 15 ++- .../cluster/service/MasterService.java | 7 +- 4 files changed, 120 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 3d2ccbc33e45d..f6c1e4748cf34 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -121,12 +121,21 @@ default boolean isPrivate() { public ClusterState(long version, String stateUUID, ClusterState state) { this(state.clusterName, version, stateUUID, state.metadata(), state.routingTable(), state.nodes(), state.blocks(), - state.customs(), false); - } - - public ClusterState(ClusterName clusterName, long version, String stateUUID, Metadata metadata, RoutingTable routingTable, - DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, - boolean wasReadFromDiff) { + state.customs(), false, state.routingNodes); + } + + public ClusterState( + ClusterName clusterName, + long version, + String stateUUID, + Metadata metadata, + RoutingTable routingTable, + DiscoveryNodes nodes, + ClusterBlocks blocks, + ImmutableOpenMap customs, + boolean wasReadFromDiff, + RoutingNodes routingNodes + ) { this.version = version; this.stateUUID = stateUUID; this.clusterName = clusterName; @@ -136,6 +145,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met this.blocks = blocks; this.customs = customs; this.wasReadFromDiff = wasReadFromDiff; + this.routingNodes = routingNodes; } public long term() { @@ -476,6 +486,8 @@ public static Builder builder(ClusterState state) { public static class Builder { + private ClusterState previous; + private final ClusterName clusterName; private long version = 0; private String uuid = UNKNOWN_UUID; @@ -487,6 +499,7 @@ public static class Builder { private boolean fromDiff; public Builder(ClusterState state) { + this.previous = state; this.clusterName = state.clusterName; this.version = state.version(); this.uuid = state.stateUUID(); @@ -580,7 +593,16 @@ public ClusterState build() { if (UNKNOWN_UUID.equals(uuid)) { uuid = UUIDs.randomBase64UUID(); } - return new ClusterState(clusterName, version, uuid, metadata, routingTable, nodes, blocks, customs.build(), fromDiff); + final RoutingNodes routingNodes; + if (previous != null && routingTable.indicesRouting() == previous.routingTable.indicesRouting() && nodes == previous.nodes) { + // routing table contents and nodes haven't changed so we can try to reuse the previous state's routing nodes which are + // expensive to compute + routingNodes = previous.routingNodes; + } else { + routingNodes = null; + } + return new ClusterState(clusterName, version, uuid, metadata, routingTable, nodes, blocks, customs.build(), fromDiff, + routingNodes); } public static byte[] toBytes(ClusterState state) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 589739d80dafd..9789b1c83644e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -192,34 +192,41 @@ public interface NonRestorableCustom extends Custom { private SortedMap indicesLookup; - Metadata(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetadata coordinationMetadata, - Settings transientSettings, Settings persistentSettings, DiffableStringMap hashesOfConsistentSettings, - ImmutableOpenMap indices, ImmutableOpenMap templates, - ImmutableOpenMap customs, String[] allIndices, String[] visibleIndices, String[] allOpenIndices, - String[] visibleOpenIndices, String[] allClosedIndices, String[] visibleClosedIndices, - SortedMap indicesLookup) { + private Metadata( + String clusterUUID, + boolean clusterUUIDCommitted, + long version, + CoordinationMetadata coordinationMetadata, + Settings transientSettings, + Settings persistentSettings, + Settings settings, + DiffableStringMap hashesOfConsistentSettings, + int totalNumberOfShards, + int totalOpenIndexShards, + ImmutableOpenMap indices, + ImmutableOpenMap templates, + ImmutableOpenMap customs, + String[] allIndices, + String[] visibleIndices, + String[] allOpenIndices, + String[] visibleOpenIndices, + String[] allClosedIndices, + String[] visibleClosedIndices, + SortedMap indicesLookup + ) { this.clusterUUID = clusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; this.version = version; this.coordinationMetadata = coordinationMetadata; this.transientSettings = transientSettings; this.persistentSettings = persistentSettings; - this.settings = Settings.builder().put(persistentSettings).put(transientSettings).build(); + this.settings = settings; this.hashesOfConsistentSettings = hashesOfConsistentSettings; this.indices = indices; this.customs = customs; this.templates = templates; - int totalNumberOfShards = 0; - int totalOpenIndexShards = 0; - for (IndexMetadata indexMetadata : indices.values()) { - totalNumberOfShards += indexMetadata.getTotalNumberOfShards(); - if (IndexMetadata.State.OPEN.equals(indexMetadata.getState())) { - totalOpenIndexShards += indexMetadata.getTotalNumberOfShards(); - } - } this.totalNumberOfShards = totalNumberOfShards; this.totalOpenIndexShards = totalOpenIndexShards; - this.allIndices = allIndices; this.visibleIndices = visibleIndices; this.allOpenIndices = allOpenIndices; @@ -229,6 +236,31 @@ public interface NonRestorableCustom extends Custom { this.indicesLookup = indicesLookup; } + public Metadata withIncrementedVersion() { + return new Metadata( + clusterUUID, + clusterUUIDCommitted, + version + 1, + coordinationMetadata, + transientSettings, + persistentSettings, + settings, + hashesOfConsistentSettings, + totalNumberOfShards, + totalOpenIndexShards, + indices, + templates, + customs, + allIndices, + visibleIndices, + allOpenIndices, + visibleOpenIndices, + allClosedIndices, + visibleClosedIndices, + indicesLookup + ); + } + public long version() { return this.version; } @@ -1613,9 +1645,37 @@ public Metadata build(boolean builtIndicesLookupEagerly) { String[] allClosedIndicesArray = allClosedIndices.toArray(Strings.EMPTY_ARRAY); String[] visibleClosedIndicesArray = visibleClosedIndices.toArray(Strings.EMPTY_ARRAY); - return new Metadata(clusterUUID, clusterUUIDCommitted, version, coordinationMetadata, transientSettings, persistentSettings, - hashesOfConsistentSettings, indices, templates.build(), customs.build(), allIndicesArray, visibleIndicesArray, - allOpenIndicesArray, visibleOpenIndicesArray, allClosedIndicesArray, visibleClosedIndicesArray, indicesLookup); + int totalNumberOfShards = 0; + int totalOpenIndexShards = 0; + for (IndexMetadata indexMetadata : indices.values()) { + totalNumberOfShards += indexMetadata.getTotalNumberOfShards(); + if (IndexMetadata.State.OPEN.equals(indexMetadata.getState())) { + totalOpenIndexShards += indexMetadata.getTotalNumberOfShards(); + } + } + + return new Metadata( + clusterUUID, + clusterUUIDCommitted, + version, + coordinationMetadata, + transientSettings, + persistentSettings, + Settings.builder().put(persistentSettings).put(transientSettings).build(), + hashesOfConsistentSettings, + totalNumberOfShards, + totalOpenIndexShards, + indices, + templates.build(), + customs.build(), + allIndicesArray, + visibleIndicesArray, + allOpenIndicesArray, + visibleOpenIndicesArray, + allClosedIndicesArray, + visibleClosedIndicesArray, + indicesLookup + ); } static SortedMap buildIndicesLookup(DataStreamMetadata dataStreamMetadata, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index ca9f19c26767e..6f9814e2a7fbe 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -58,6 +58,10 @@ private RoutingTable(long version, ImmutableOpenMap i this.indicesRouting = indicesRouting; } + public RoutingTable withIncrementedVersion() { + return new RoutingTable(version + 1, indicesRouting); + } + /** * Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable} * or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable. @@ -207,7 +211,7 @@ public List allShards(String index) { * @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well */ public GroupShardsIterator allActiveShardsGrouped(String[] indices, boolean includeEmpty) { - return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, ACTIVE_PREDICATE); + return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, ShardRouting::active); } /** @@ -216,12 +220,9 @@ public GroupShardsIterator allActiveShardsGrouped(String[] indice * @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well */ public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty) { - return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, ASSIGNED_PREDICATE); + return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, ShardRouting::assignedToNode); } - private static Predicate ACTIVE_PREDICATE = ShardRouting::active; - private static Predicate ASSIGNED_PREDICATE = ShardRouting::assignedToNode; - private GroupShardsIterator allSatisfyingPredicateShardsGrouped(String[] indices, boolean includeEmpty, Predicate predicate) { // use list here since we need to maintain identity across shards @@ -361,6 +362,10 @@ private static class RoutingTableDiff implements Diff { @Override public RoutingTable apply(RoutingTable part) { + final ImmutableOpenMap updatedRouting = indicesRouting.apply(part.indicesRouting); + if (part.version == version && updatedRouting == part.indicesRouting) { + return part; + } return new RoutingTable(version, indicesRouting.apply(part.indicesRouting)); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 19ef3b0f44d64..944b01a835d39 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -23,11 +23,9 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.ClusterSettings; @@ -371,11 +369,10 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas final var previousIndicesLookup = newClusterState.metadata().getIndicesLookup(); Builder builder = incrementVersion(newClusterState); if (previousClusterState.routingTable() != newClusterState.routingTable()) { - builder.routingTable(RoutingTable.builder(newClusterState.routingTable()) - .version(newClusterState.routingTable().version() + 1).build()); + builder.routingTable(newClusterState.routingTable().withIncrementedVersion()); } if (previousClusterState.metadata() != newClusterState.metadata()) { - builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1)); + builder.metadata(newClusterState.metadata().withIncrementedVersion()); } newClusterState = builder.build(); From 10f16c2d7acb45215d7c7f1c6ca95c3d61d04e8c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 27 Oct 2021 12:17:28 +0200 Subject: [PATCH 2/2] CR: add assertion --- .../elasticsearch/cluster/ClusterState.java | 5 +- .../cluster/routing/RoutingNode.java | 41 +++++++--- .../cluster/routing/RoutingNodes.java | 79 ++++++++++++++++++- 3 files changed, 114 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index f6c1e4748cf34..2a4a6237cd5d5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; @@ -134,7 +135,7 @@ public ClusterState( ClusterBlocks blocks, ImmutableOpenMap customs, boolean wasReadFromDiff, - RoutingNodes routingNodes + @Nullable RoutingNodes routingNodes ) { this.version = version; this.stateUUID = stateUUID; @@ -146,6 +147,8 @@ public ClusterState( this.customs = customs; this.wasReadFromDiff = wasReadFromDiff; this.routingNodes = routingNodes; + assert routingNodes == null || routingNodes.equals(new RoutingNodes(this)) : + "RoutingNodes [" + routingNodes + "] are not consistent with this cluster state [" + new RoutingNodes(this) + "]"; } public long term() { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index ba15eaf1a594f..ac21977092c7c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -22,6 +22,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ public class RoutingNode implements Iterable { private final String nodeId; + @Nullable private final DiscoveryNode node; private final LinkedHashMap shards; // LinkedHashMap to preserve order @@ -48,7 +50,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this(nodeId, node, buildShardRoutingMap(shards)); } - RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap shards) { + RoutingNode(String nodeId, @Nullable DiscoveryNode node, LinkedHashMap shards) { this.nodeId = nodeId; this.node = node; this.shards = shards; @@ -88,6 +90,7 @@ public Iterator iterator() { * * @return discoveryNode of this node */ + @Nullable public DiscoveryNode node() { return this.node; } @@ -298,13 +301,17 @@ public String prettyPrint() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("routingNode (["); - sb.append(node.getName()); - sb.append("]["); - sb.append(node.getId()); - sb.append("]["); - sb.append(node.getHostName()); - sb.append("]["); - sb.append(node.getHostAddress()); + if (node != null) { + sb.append(node.getName()); + sb.append("]["); + sb.append(node.getId()); + sb.append("]["); + sb.append(node.getHostName()); + sb.append("]["); + sb.append(node.getHostAddress()); + } else { + sb.append("null"); + } sb.append("], ["); sb.append(shards.size()); sb.append(" assigned shards])"); @@ -320,7 +327,6 @@ public boolean isEmpty() { } private boolean invariant() { - // initializingShards must consistent with that in shards Collection shardRoutingsInitializing = shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList()); @@ -339,4 +345,21 @@ private boolean invariant() { return true; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RoutingNode that = (RoutingNode) o; + return nodeId.equals(that.nodeId) && Objects.equals(node, that.node) && shards.equals(that.shards); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, node, shards); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 448813aac83a5..9c65b8ee9ef46 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -778,6 +778,46 @@ public int size() { return nodesToShards.size(); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RoutingNodes that = (RoutingNodes) o; + return readOnly == that.readOnly + && inactivePrimaryCount == that.inactivePrimaryCount + && inactiveShardCount == that.inactiveShardCount + && relocatingShards == that.relocatingShards + && activeShardCount == that.activeShardCount + && totalShardCount == that.totalShardCount + && nodesToShards.equals(that.nodesToShards) + && unassignedShards.equals(that.unassignedShards) + && assignedShards.equals(that.assignedShards) + && attributeValuesByAttribute.equals(that.attributeValuesByAttribute) + && recoveriesPerNode.equals(that.recoveriesPerNode + ); + } + + @Override + public int hashCode() { + return Objects.hash( + nodesToShards, + unassignedShards, + assignedShards, + readOnly, + inactivePrimaryCount, + inactiveShardCount, + relocatingShards, + activeShardCount, + totalShardCount, + attributeValuesByAttribute, + recoveriesPerNode + ); + } + public static final class UnassignedShards implements Iterable { private final RoutingNodes nodes; @@ -990,6 +1030,26 @@ public ShardRouting[] drain() { primaries = 0; return mutableShardRoutings; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnassignedShards that = (UnassignedShards) o; + return primaries == that.primaries + && ignoredPrimaries == that.ignoredPrimaries + && unassigned.equals(that.unassigned) + && ignored.equals(that.ignored); + } + + @Override + public int hashCode() { + return Objects.hash(unassigned, ignored, primaries, ignoredPrimaries); + } } @@ -1183,5 +1243,22 @@ public static Recoveries getOrAdd(Map map, String key) { } return recoveries; } - } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Recoveries that = (Recoveries) o; + return incoming == that.incoming && outgoing == that.outgoing; + } + + @Override + public int hashCode() { + return Objects.hash(incoming, outgoing); + } + } }