Skip to content

Conversation

@idegtiarenko
Copy link
Contributor

@idegtiarenko idegtiarenko commented Apr 21, 2023

When reconciling a balance with a lot of shards on undesired nodes there is a possibility of causing node hot spots due to usage of nodeInterleavedShardIterator. This iterator orders shards based on nodes they are located and order nodes based hash map iteration. This means it tends to pick shards returned first by the iterator.

Depends on #96025

Related to #91386

When reconciling a balance with a lot of shards on undesired nodes there is a
possibility of causing node hot spots due to usage of
nodeInterleavedShardIterator. This iterator orders shards based on nodes they
are located and order nodes based hash map iteration. This means it tends
to pick shards returned first by the iterator.
@idegtiarenko idegtiarenko added >bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.8.0 labels Apr 21, 2023
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should stop with the interleaving altogether and just try all the shards from the least-recently-touched node until we find one to move, then move on to the next-least-recently-touched node and try all its shards, and so on.

case YES -> {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, desiredNodeId);
if (logger.isDebugEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no real need for these checks, they're the first thing that logger.debug() does anyway

@idegtiarenko
Copy link
Contributor Author

I wonder if we should stop with the interleaving altogether and just try all the shards from the least-recently-touched node until we find one to move, then move on to the next-least-recently-touched node and try all its shards, and so on.

We need to handle the case when it is allowed to move more shards then there are nodes in cluster (or somehow to return to the first node again)

@DaveCTurner
Copy link
Contributor

We need to handle the case when it is allowed to move more shards then there are nodes in cluster (or somehow to return to the first node again)

Yes, we'd keep iterating until we've considered moving every shard on every node.

@gmarouli gmarouli added v8.9.0 and removed v8.8.0 labels Apr 26, 2023
@idegtiarenko idegtiarenko marked this pull request as ready for review May 10, 2023 15:01
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@elasticsearchmachine
Copy link
Collaborator

Hi @idegtiarenko, I've created a changelog YAML for you.

return new OrderedNodesShardsIterator();
}

private class OrderedNodesShardsIterator implements Iterator<ShardRouting> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to move this to a top-level class and add some unit tests for this

return nextShard;
}

public void dePrioritizeNode(String nodeId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMO we should update moveOrdering here rather than relying on the caller calling both dePrioritizeNode and recordAllocation.

@idegtiarenko idegtiarenko requested a review from DaveCTurner May 11, 2023 14:42
Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. I left a number of minor comments.

}

private void moveShards() {
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment needs updating.

return;
}

// Iterate over the started shards interleaving between nodes, and try to move any which are on undesired nodes. In the presence of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also update comment here?

import java.util.NoSuchElementException;
import java.util.Objects;

public class OrderedShardsIterator implements Iterator<ShardRouting> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add javadoc here explaining the intended order?

}
allocationOrdering.retainNodes(getNodeIds(allocation.routingNodes()));
recordTime(cumulativeReconciliationTime, new DesiredBalanceReconciler(desiredBalance, allocation, allocationOrdering)::run);
allocationOrdering.retainNodes(allocation.routingNodes().getAllNodeIds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also do retainNodes on moveOrdering?

}

var summary = totalOutgoingMoves.values().stream().mapToInt(AtomicInteger::get).summaryStatistics();
assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The text says "similar", whereas the assertion is more precise (max 1 difference). I wonder if you can add a comment explaining how we are sure of max 1 difference here when it works?

My intuition says that if the currentNodeId that is picked randomly originally is unfortunate enough to ensure only one node needs any shard movements, we could get a distance of two here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - I think we should remove the fully-reconciled nodes at the top of the loop, rather than doing it after the first assertion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one node needs any shard movements, we could get a distance of two here

Fixing by adding only nodes that require reconciliation to totalOutgoingMoves

var ordering = new NodeAllocationOrdering();
ordering.recordAllocation("node-1");

var iterator = OrderedShardsIterator.create(RoutingNodes.mutable(routing, nodes), ordering);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let us use immutable instead, seems more inline with the read-only nature of the iterator:

Suggested change
var iterator = OrderedShardsIterator.create(RoutingNodes.mutable(routing, nodes), ordering);
var iterator = OrderedShardsIterator.create(RoutingNodes.immutable(routing, nodes), ordering);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While iterator is not performing any changes I wanted to test it it with mutable RoutingNodes as this is what we supply it in the real code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By testing it with immutable, we also validate that the iterator does not mutate the routing nodes. We can compromise on randomizing it maybe ;-).

var ordering = new NodeAllocationOrdering();
ordering.recordAllocation("node-1");

var iterator = OrderedShardsIterator.create(RoutingNodes.mutable(routing, nodes), ordering);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let us use immutable instead, seems more inline with the read-only nature of the iterator:

Suggested change
var iterator = OrderedShardsIterator.create(RoutingNodes.mutable(routing, nodes), ordering);
var iterator = OrderedShardsIterator.create(RoutingNodes.immutable(routing, nodes), ordering);

ordering.recordAllocation("node-3");
ordering.recordAllocation("node-2");

var iterator = OrderedShardsIterator.create(RoutingNodes.mutable(routing, nodes), ordering);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var iterator = OrderedShardsIterator.create(RoutingNodes.mutable(routing, nodes), ordering);
var iterator = OrderedShardsIterator.create(RoutingNodes.immutable(routing, nodes), ordering);

var routing = RoutingTable.builder()
.add(index("index-1a", "node-1"))
.add(index("index-1b", "node-1"))
.add(index("index-2", "node-2"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an extra index for node-2 to see that that extra index comes out before the shards on node-1? I.e., that it does not return one shard from each node like the interleaved one.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

// ensure that we do not cause hotspots by round-robin unreconciled source nodes when picking next rebalance
// (already reconciled nodes are excluded as they are no longer causing new moves)
assertThat(
"Every node expect to have similar amount of outgoing rebalances: " + totalOutgoingMoves,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Every node expect to have similar amount of outgoing rebalances: " + totalOutgoingMoves,
"Reconciling nodes should all have same amount (max 1 delta) of moves, since we allow only 2 outgoing recoveries by default: " + totalOutgoingMoves,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No action needed, it's only an assertion message, but I don't think this is right:

since we allow only 2 outgoing recoveries by default

The max 1 delta is because we round-robin through the nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I updated the message to Reconciling nodes should all have same amount (max 1 delta) of moves

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if we allowed 3 outgoing recoveries, we risked one node having just one shard left to move and another having 3 shards left to move. In that case, we'd have a delta of 2 even when we round-robin. Did I misunderstand something or does the check here not rely on max doing 2 outgoing recoveries at a time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we'd move one shard from each node and then remove the completed node from consideration, so from then on the delta would always be zero.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allowed 3 outgoing recoveries I think we would risk one node having one outgoing relocation and the other node having 3 outgoing relocations, if these were the very last pending relocations needed. This would happen in one DesiredBalanceReconciler.run round, i.e., we would not remove from the totalOutgoingMoves before after incrementing the counts and validating - and I think that would make the test fail?

I should probably try this out....

public void testRebalanceDoesNotCauseHotSpots() {

int numberOfNodes = randomIntBetween(5, 9);
int shardsPerNode = randomIntBetween(4, 15);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to go to 1 here? I.e.:

Suggested change
int shardsPerNode = randomIntBetween(4, 15);
int shardsPerNode = randomIntBetween(1, 15);

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM2

Did we also agree to backport this to 8.8? I think we should.

@idegtiarenko
Copy link
Contributor Author

idegtiarenko commented May 16, 2023

Did we also agree to backport this to 8.8? I think we should.

I think it is worth backporting while we might look for other allocator improvements.
I will open a backport pr, but will not merge it automatically until we make a final decision.

@idegtiarenko idegtiarenko added auto-backport Automatically create backport pull requests when merged v8.8.0 labels May 16, 2023
@idegtiarenko
Copy link
Contributor Author

@elasticsearchmachine please run elasticsearch-ci/bwc

@idegtiarenko idegtiarenko merged commit 6ecd74d into elastic:main May 16, 2023
@idegtiarenko idegtiarenko deleted the balance_priorities_during_reconciliation branch May 16, 2023 10:36
@elasticsearchmachine
Copy link
Collaborator

💔 Backport failed

Status Branch Result
8.8 Commit could not be cherrypicked due to conflicts

You can use sqren/backport to manually backport by running backport --upstream elastic/elasticsearch --pr 95454

idegtiarenko added a commit to idegtiarenko/elasticsearch that referenced this pull request May 16, 2023
When reconciling a balance with a lot of shards on undesired nodes there is a
possibility of causing node hot spots due to usage of
nodeInterleavedShardIterator. This iterator orders shards based on nodes they
are located and order nodes based hash map iteration. This means it tends
to pick shards returned first by the iterator. This change uses OrderedShardsIterator
that applies custom shards order based on allocation recency.

(cherry picked from commit 6ecd74d)
idegtiarenko added a commit that referenced this pull request May 16, 2023
When reconciling a balance with a lot of shards on undesired nodes there is a
possibility of causing node hot spots due to usage of
nodeInterleavedShardIterator. This iterator orders shards based on nodes they
are located and order nodes based hash map iteration. This means it tends
to pick shards returned first by the iterator. This change uses OrderedShardsIterator
that applies custom shards order based on allocation recency.

(cherry picked from commit 6ecd74d)
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request May 31, 2023
elasticsearchmachine pushed a commit that referenced this pull request May 31, 2023
DaveCTurner added a commit that referenced this pull request May 31, 2023
DaveCTurner added a commit that referenced this pull request May 31, 2023
DaveCTurner added a commit that referenced this pull request May 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-backport Automatically create backport pull requests when merged >bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.8.0 v8.9.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants