Skip to content

Conversation

@DaveCTurner
Copy link
Contributor

@DaveCTurner DaveCTurner commented Sep 20, 2021

Reuse local node in async shard fetch responses

We read various objects from the wire that already exist in the cluster
state. The most notable is DiscoveryNode which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
DiscoveryNode objects in flight from various TransportNodesAction
responses.

This branch adds a DiscoveryNode parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

  • TransportNodesListShardStoreMetadata
  • TransportNodesListGatewayStartedShards

Relates #77266

@DaveCTurner DaveCTurner added the :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) label Sep 20, 2021
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch adds a `DiscoveryNode` parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

- `TransportNodesListShardStoreMetadata`
- `TransportNodesListGatewayStartedShards`

Relates elastic#77266
@DaveCTurner DaveCTurner force-pushed the 2021-09-20-reuse-local-disco-node-in-async-shard-fetch branch from 205073d to c35942b Compare September 20, 2021 09:36
@DaveCTurner DaveCTurner marked this pull request as ready for review September 20, 2021 10:31
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Sep 20, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@DaveCTurner
Copy link
Contributor Author

Thanks Henning :)

@DaveCTurner DaveCTurner deleted the 2021-09-20-reuse-local-disco-node-in-async-shard-fetch branch September 20, 2021 11:09
@elasticsearchmachine
Copy link
Collaborator

💔 Backport failed

Status Branch Result
7.x Commit could not be cherrypicked due to conflicts

You can use sqren/backport to manually backport by running backport --upstream elastic/elasticsearch --pr 77991

DaveCTurner added a commit that referenced this pull request Sep 20, 2021
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch adds a `DiscoveryNode` parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

- `TransportNodesListShardStoreMetadata`
- `TransportNodesListGatewayStartedShards`

Relates #77266
@howardhuanghua
Copy link
Contributor

@DaveCTurner In our production env, we double checked the optimization about fetcing respose memory consumption.
Before optimization, we could see DiscoveryNode costs 1.7k memory:
image

After this PR optimization, it only has 128 bytes that contains TransportAddress :
image

However, the master's heap is still crash, due to huge inflight fetch shard requests. In our case, we have 75 data nodes, and 3 dedicated master nodes, each master node has 4 GB heap, 1.5w shards. After full restarting cluster, master node memory will used up in several seconds. We dump the memory and found netty inflight sending request used lots of heap:
企业微信截图_16367325494745

Each WriteOperation should be single shard request to specific node (16k buffer size per each):
企业微信截图_16367325062007

From Netty4MessageChannelHandler class we could see a queuedWrites, messages are flushed asynchronously:

private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();

So besides cutting fetch shard response, we also need to handle massive shard sending requests.

@DaveCTurner
Copy link
Contributor Author

That sounds like a separate problem @howardhuanghua, although it's related. Would you open a new issue about it?

1.5w shards

I think that's a typo, but this is important - how many shards are there in this cluster?

@howardhuanghua
Copy link
Contributor

howardhuanghua commented Nov 14, 2021

@DaveCTurner Sorry about the typo. 15000 shards total in cluster. I will open another issue.

@howardhuanghua
Copy link
Contributor

Opened a new issue #80694.

@DaveCTurner
Copy link
Contributor Author

I think that's a typo

TIL that you used "w" to abbreviate "wan", i.e. 万, meaning 10,000. I didn't know that was a thing, but I do now 😄

@howardhuanghua
Copy link
Contributor

howardhuanghua commented Nov 17, 2021

😄 Yes, you are right. It's a Chinese style.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >enhancement Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v7.16.0 v8.0.0-beta1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants