Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
* Returns a {@link Decision} whether the given shard routing can be remain
* on the given node. The default is {@link Decision#ALWAYS}.
*/
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
}
return Decision.NO;
}
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
if (allocation.debugDecision()) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canRemain(shardRouting, node, allocation);
Decision decision = allocationDecider.canRemain(indexMetadata, shardRouting, node, allocation);
// short track if a NO is returned.
if (decision.type() == Decision.Type.NO) {
maybeTraceLogNoDecision(shardRouting, node, allocationDecider);
Expand All @@ -103,7 +104,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
// tighter loop if debug information is not collected: don't collect yes decisions + break out right away on NO
Decision ret = Decision.YES;
for (AllocationDecider allocationDecider : allocations) {
switch (allocationDecider.canRemain(shardRouting, node, allocation).type()) {
switch (allocationDecider.canRemain(indexMetadata, shardRouting, node, allocation).type()) {
case NO -> {
maybeTraceLogNoDecision(shardRouting, node, allocationDecider);
return Decision.NO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private void setAwarenessAttributes(List<String> awarenessAttributes) {

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true);
return underCapacity(allocation.metadata().getIndexSafe(shardRouting.index()), shardRouting, node, allocation, true);
}

@Override
Expand All @@ -135,8 +135,8 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false);
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(indexMetadata, shardRouting, node, allocation, false);
}

private static final Decision YES_NOT_ENABLED = Decision.single(
Expand All @@ -155,13 +155,18 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl

private static final Decision YES_ALL_MET = Decision.single(Decision.Type.YES, NAME, "node meets all awareness attribute requirements");

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
private Decision underCapacity(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation,
boolean moveToNode
) {
if (awarenessAttributes.isEmpty()) {
return YES_NOT_ENABLED;
}

final boolean debug = allocation.debugDecision();
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (indexMetadata.getAutoExpandReplicas().expandToAllNodes()) {
return YES_AUTO_EXPAND_ALL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
);

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}
Expand All @@ -472,7 +472,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return decision;
}

if (allocation.metadata().index(shardRouting.index()).ignoreDiskWatermarks()) {
if (indexMetadata.ignoreDiskWatermarks()) {
return YES_DISK_WATERMARKS_IGNORED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,19 @@ public FilterAllocationDecider(Settings settings, ClusterSettings clusterSetting

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
if (shardRouting.unassigned() && shardRouting.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// only for unassigned - we filter allocation right after the index creation (for shard shrinking) to ensure
// that once it has been allocated post API the replicas can be allocated elsewhere without user interaction
// this is a setting that can only be set within the system!
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
DiscoveryNodeFilters initialRecoveryFilters = DiscoveryNodeFilters.trimTier(indexMetadata.getInitialRecoveryFilters());
if (initialRecoveryFilters != null && initialRecoveryFilters.match(node.node()) == false) {
String explanation =
"initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index";
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
}
}
return shouldFilter(shardRouting, node.node(), allocation);
return shouldFilter(indexMetadata, node.node(), allocation);
}

@Override
Expand All @@ -111,8 +111,8 @@ public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, Routi
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(shardRouting, node.node(), allocation);
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
}

@Override
Expand All @@ -126,16 +126,6 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;

decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
if (decision != null) return decision;

return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
}

private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (replacementOngoing(allocation) == false) {
return NO_REPLACEMENTS;
} else if (isReplacementSource(allocation, node.nodeId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
* determine if shards can remain on their current node.
*/
@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return this.canAllocate(shardRouting, node, allocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,28 @@ private void setClusterShardLimit(int clusterShardLimit) {

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return doDecide(shardRouting, node, allocation, (count, limit) -> count >= limit);
return doDecide(
allocation.metadata().getIndexSafe(shardRouting.index()),
shardRouting,
node,
allocation,
(count, limit) -> count >= limit
);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return doDecide(shardRouting, node, allocation, (count, limit) -> count > limit);
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return doDecide(indexMetadata, shardRouting, node, allocation, (count, limit) -> count > limit);

}

private Decision doDecide(
IndexMetadata indexMd,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation,
BiPredicate<Integer, Integer> decider
) {
IndexMetadata indexMd = allocation.metadata().getIndexSafe(shardRouting.index());
final int indexShardLimit = indexMd.getShardsPerNodeLimit();
// Capture the limit here in case it changes during this method's
// execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return getRandomDecision();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -54,7 +55,12 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation
) {
return Decision.YES;
}

Expand All @@ -79,18 +85,25 @@ public Decision canRebalance(RoutingAllocation allocation) {
}
}));

ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build();
IndexMetadata idx = IndexMetadata.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0).build();
IndexMetadata testIdx = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
.metadata(Metadata.builder().put(idx, false).put(testIdx, false).build())
.build();
final RoutingAllocation allocation = new RoutingAllocation(deciders, clusterState, null, null, 0L);

allocation.setDebugMode(mode);
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "_message");
final ShardRouting shardRouting = ShardRouting.newUnassigned(
new ShardId("test", "testUUID", 0),
new ShardId(testIdx.getIndex(), 0),
true,
RecoverySource.ExistingStoreRecoverySource.INSTANCE,
unassignedInfo
);
IndexMetadata idx = IndexMetadata.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0).build();

RoutingNode routingNode = RoutingNodesHelper.routingNode("testNode", null);
verify(deciders.canAllocate(shardRouting, routingNode, allocation), matcher);
Expand Down Expand Up @@ -130,7 +143,12 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation
) {
return decisionOne;
}

Expand Down Expand Up @@ -171,7 +189,12 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
RoutingNode node,
RoutingAllocation allocation
) {
return decision(allocation);
}

Expand Down Expand Up @@ -208,20 +231,28 @@ private Decision decision(RoutingAllocation allocation) {
}
}));

IndexMetadata testIdx = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();

// no debug should just short-circuit to no, no matter what kind of no type return the first decider returns
final ShardRouting shardRouting = ShardRouting.newUnassigned(
new ShardId("test", "testUUID", 0),
new ShardId(testIdx.getIndex(), 0),
true,
RecoverySource.ExistingStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "_message")
);
final RoutingNode routingNode = RoutingNodesHelper.routingNode("testNode", null);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build();
final IndexMetadata indexMetadata = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
final ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
.metadata(Metadata.builder().put(testIdx, false).put(indexMetadata, false).build())
.build();

final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, clusterState, null, null, 0L);
assertSame(Decision.NO, allocationDeciders.canAllocate(shardRouting, routingNode, allocation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,10 +919,15 @@ public void testCanRemainWithShardRelocatingAway() {
System.nanoTime()
);
routingAllocation.debugDecision(true);
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
Decision decision = diskThresholdDecider.canRemain(
routingAllocation.metadata().getIndexSafe(firstRouting.index()),
firstRouting,
firstRoutingNode,
routingAllocation
);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(
((Decision.Single) decision).getExplanation(),
decision.getExplanation(),
containsString(
"the shard cannot remain on this node because it is above the high watermark cluster setting "
+ "[cluster.routing.allocation.disk.watermark.high=70%] and there is less than the required [30.0%] free disk on node, "
Expand Down Expand Up @@ -951,7 +956,12 @@ public void testCanRemainWithShardRelocatingAway() {
System.nanoTime()
);
routingAllocation.debugDecision(true);
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
decision = diskThresholdDecider.canRemain(
routingAllocation.metadata().getIndexSafe(firstRouting.index()),
firstRouting,
firstRoutingNode,
routingAllocation
);
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertEquals(
"there is enough disk on this node for the shard to remain, free: [60b]",
Expand Down Expand Up @@ -1109,7 +1119,12 @@ public void testWatermarksEnabledForSingleDataNode() {
System.nanoTime()
);
routingAllocation.debugDecision(true);
Decision decision = diskThresholdDecider.canRemain(startedShard, clusterState.getRoutingNodes().node("data"), routingAllocation);
Decision decision = diskThresholdDecider.canRemain(
routingAllocation.metadata().getIndexSafe(startedShard.index()),
startedShard,
clusterState.getRoutingNodes().node("data"),
routingAllocation
);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(
decision.getExplanation(),
Expand Down
Loading