Skip to content

Commit cf55c12

Browse files
gmaroulidavidkyle
authored andcommitted
Revert "ES-6566: Move the calculation of data tier usage stats to individual nodes (elastic#100230) (elastic#101599)" (elastic#102042)
Reverting because the new action is not properly handled in a mixed cluster.
1 parent 6426cf1 commit cf55c12

File tree

18 files changed

+1054
-1421
lines changed

18 files changed

+1054
-1421
lines changed

docs/changelog/101599.yaml

Lines changed: 0 additions & 6 deletions
This file was deleted.

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.elasticsearch.core.Nullable;
2727
import org.elasticsearch.plugins.Plugin;
2828
import org.elasticsearch.test.ESIntegTestCase;
29+
import org.elasticsearch.xpack.core.DataTiersFeatureSetUsage;
2930
import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
3031
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
31-
import org.elasticsearch.xpack.core.datatiers.DataTiersFeatureSetUsage;
3232
import org.junit.Before;
3333

3434
import java.util.ArrayList;

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/rest/action/DataTiersUsageRestCancellationIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.core.rest.action;
99

1010
import org.apache.http.client.methods.HttpGet;
11+
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
1112
import org.elasticsearch.action.support.ActionFilters;
1213
import org.elasticsearch.action.support.ActionTestUtils;
1314
import org.elasticsearch.action.support.PlainActionFuture;
@@ -34,7 +35,6 @@
3435
import org.elasticsearch.xpack.core.action.XPackUsageAction;
3536
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
3637
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
37-
import org.elasticsearch.xpack.core.datatiers.NodesDataTiersUsageTransportAction;
3838

3939
import java.nio.file.Path;
4040
import java.util.Arrays;
@@ -76,7 +76,7 @@ public void testCancellation() throws Exception {
7676
final SubscribableListener<Void> nodeStatsRequestsReleaseListener = new SubscribableListener<>();
7777
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
7878
((MockTransportService) transportService).addRequestHandlingBehavior(
79-
NodesDataTiersUsageTransportAction.TYPE.name() + "[n]",
79+
TransportNodesStatsAction.TYPE.name() + "[n]",
8080
(handler, request, channel, task) -> {
8181
tasksBlockedLatch.countDown();
8282
nodeStatsRequestsReleaseListener.addListener(
@@ -94,13 +94,14 @@ public void testCancellation() throws Exception {
9494
safeAwait(tasksBlockedLatch); // must wait for the node-level tasks to start to avoid cancelling being handled earlier
9595
cancellable.cancel();
9696

97-
assertAllCancellableTasksAreCancelled(NodesDataTiersUsageTransportAction.TYPE.name());
97+
// NB this test works by blocking node-level stats requests; when #100230 is addressed this will need to target a different action.
98+
assertAllCancellableTasksAreCancelled(TransportNodesStatsAction.TYPE.name());
9899
assertAllCancellableTasksAreCancelled(XPackUsageAction.NAME);
99100

100101
nodeStatsRequestsReleaseListener.onResponse(null);
101102
expectThrows(CancellationException.class, future::actionGet);
102103

103-
assertAllTasksHaveFinished(NodesDataTiersUsageTransportAction.TYPE.name());
104+
assertAllTasksHaveFinished(TransportNodesStatsAction.TYPE.name());
104105
assertAllTasksHaveFinished(XPackUsageAction.NAME);
105106
}
106107

x-pack/plugin/core/src/main/java/module-info.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
exports org.elasticsearch.xpack.core.common.validation;
5858
exports org.elasticsearch.xpack.core.common;
5959
exports org.elasticsearch.xpack.core.datastreams;
60-
exports org.elasticsearch.xpack.core.datatiers;
6160
exports org.elasticsearch.xpack.core.deprecation;
6261
exports org.elasticsearch.xpack.core.downsample;
6362
exports org.elasticsearch.xpack.core.enrich.action;
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.core.datatiers;
8+
package org.elasticsearch.xpack.core;
99

1010
import org.elasticsearch.TransportVersion;
1111
import org.elasticsearch.TransportVersions;
@@ -16,8 +16,6 @@
1616
import org.elasticsearch.common.unit.ByteSizeValue;
1717
import org.elasticsearch.xcontent.ToXContentObject;
1818
import org.elasticsearch.xcontent.XContentBuilder;
19-
import org.elasticsearch.xpack.core.XPackFeatureSet;
20-
import org.elasticsearch.xpack.core.XPackField;
2119

2220
import java.io.IOException;
2321
import java.util.Collections;
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@
55
* 2.0.
66
*/
77

8-
package org.elasticsearch.xpack.core.datatiers;
8+
package org.elasticsearch.xpack.core;
99

1010
import org.elasticsearch.action.support.ActionFilters;
1111
import org.elasticsearch.common.inject.Inject;
1212
import org.elasticsearch.transport.TransportService;
13-
import org.elasticsearch.xpack.core.XPackField;
1413
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
1514
import org.elasticsearch.xpack.core.action.XPackInfoFeatureTransportAction;
1615

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
12+
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
13+
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
14+
import org.elasticsearch.action.support.ActionFilters;
15+
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
20+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
21+
import org.elasticsearch.cluster.routing.RoutingNode;
22+
import org.elasticsearch.cluster.routing.RoutingNodes;
23+
import org.elasticsearch.cluster.routing.ShardRouting;
24+
import org.elasticsearch.cluster.routing.ShardRoutingState;
25+
import org.elasticsearch.cluster.routing.allocation.DataTier;
26+
import org.elasticsearch.cluster.service.ClusterService;
27+
import org.elasticsearch.common.Strings;
28+
import org.elasticsearch.common.inject.Inject;
29+
import org.elasticsearch.index.Index;
30+
import org.elasticsearch.index.store.StoreStats;
31+
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
32+
import org.elasticsearch.search.aggregations.metrics.TDigestState;
33+
import org.elasticsearch.tasks.Task;
34+
import org.elasticsearch.threadpool.ThreadPool;
35+
import org.elasticsearch.transport.TransportService;
36+
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
37+
import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse;
38+
import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
39+
40+
import java.util.HashMap;
41+
import java.util.HashSet;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.Set;
45+
import java.util.stream.StreamSupport;
46+
47+
public class DataTiersUsageTransportAction extends XPackUsageFeatureTransportAction {
48+
49+
private final Client client;
50+
51+
@Inject
52+
public DataTiersUsageTransportAction(
53+
TransportService transportService,
54+
ClusterService clusterService,
55+
ThreadPool threadPool,
56+
ActionFilters actionFilters,
57+
IndexNameExpressionResolver indexNameExpressionResolver,
58+
Client client
59+
) {
60+
super(
61+
XPackUsageFeatureAction.DATA_TIERS.name(),
62+
transportService,
63+
clusterService,
64+
threadPool,
65+
actionFilters,
66+
indexNameExpressionResolver
67+
);
68+
this.client = client;
69+
}
70+
71+
@Override
72+
protected void masterOperation(
73+
Task task,
74+
XPackUsageRequest request,
75+
ClusterState state,
76+
ActionListener<XPackUsageFeatureResponse> listener
77+
) {
78+
new ParentTaskAssigningClient(client, clusterService.localNode(), task).admin()
79+
.cluster()
80+
.prepareNodesStats()
81+
.all()
82+
.setIndices(CommonStatsFlags.ALL)
83+
.execute(listener.delegateFailureAndWrap((delegate, nodesStatsResponse) -> {
84+
final RoutingNodes routingNodes = state.getRoutingNodes();
85+
final Map<String, IndexMetadata> indices = state.getMetadata().getIndices();
86+
87+
// Determine which tiers each index would prefer to be within
88+
Map<String, String> indicesToTiers = tierIndices(indices);
89+
90+
// Generate tier specific stats for the nodes and indices
91+
Map<String, DataTiersFeatureSetUsage.TierSpecificStats> tierSpecificStats = calculateStats(
92+
nodesStatsResponse.getNodes(),
93+
indicesToTiers,
94+
routingNodes
95+
);
96+
97+
delegate.onResponse(new XPackUsageFeatureResponse(new DataTiersFeatureSetUsage(tierSpecificStats)));
98+
}));
99+
}
100+
101+
// Visible for testing
102+
// Takes a registry of indices and returns a mapping of index name to which tier it most prefers. Always 1 to 1, some may filter out.
103+
static Map<String, String> tierIndices(Map<String, IndexMetadata> indices) {
104+
Map<String, String> indexByTier = new HashMap<>();
105+
indices.entrySet().forEach(entry -> {
106+
String tierPref = entry.getValue().getSettings().get(DataTier.TIER_PREFERENCE);
107+
if (Strings.hasText(tierPref)) {
108+
String[] tiers = tierPref.split(",");
109+
if (tiers.length > 0) {
110+
indexByTier.put(entry.getKey(), tiers[0]);
111+
}
112+
}
113+
});
114+
return indexByTier;
115+
}
116+
117+
/**
118+
* Accumulator to hold intermediate data tier stats before final calculation.
119+
*/
120+
private static class TierStatsAccumulator {
121+
int nodeCount = 0;
122+
Set<String> indexNames = new HashSet<>();
123+
int totalShardCount = 0;
124+
long totalByteCount = 0;
125+
long docCount = 0;
126+
int primaryShardCount = 0;
127+
long primaryByteCount = 0L;
128+
final TDigestState valueSketch = TDigestState.create(1000);
129+
}
130+
131+
// Visible for testing
132+
static Map<String, DataTiersFeatureSetUsage.TierSpecificStats> calculateStats(
133+
List<NodeStats> nodesStats,
134+
Map<String, String> indexByTier,
135+
RoutingNodes routingNodes
136+
) {
137+
Map<String, TierStatsAccumulator> statsAccumulators = new HashMap<>();
138+
for (NodeStats nodeStats : nodesStats) {
139+
aggregateDataTierNodeCounts(nodeStats, statsAccumulators);
140+
aggregateDataTierIndexStats(nodeStats, routingNodes, indexByTier, statsAccumulators);
141+
}
142+
Map<String, DataTiersFeatureSetUsage.TierSpecificStats> results = new HashMap<>();
143+
for (Map.Entry<String, TierStatsAccumulator> entry : statsAccumulators.entrySet()) {
144+
results.put(entry.getKey(), calculateFinalTierStats(entry.getValue()));
145+
}
146+
return results;
147+
}
148+
149+
/**
150+
* Determine which data tiers this node belongs to (if any), and increment the node counts for those tiers.
151+
*/
152+
private static void aggregateDataTierNodeCounts(NodeStats nodeStats, Map<String, TierStatsAccumulator> tiersStats) {
153+
nodeStats.getNode()
154+
.getRoles()
155+
.stream()
156+
.map(DiscoveryNodeRole::roleName)
157+
.filter(DataTier::validTierName)
158+
.forEach(tier -> tiersStats.computeIfAbsent(tier, k -> new TierStatsAccumulator()).nodeCount++);
159+
}
160+
161+
/**
162+
* Locate which indices are hosted on the node specified by the NodeStats, then group and aggregate the available index stats by tier.
163+
*/
164+
private static void aggregateDataTierIndexStats(
165+
NodeStats nodeStats,
166+
RoutingNodes routingNodes,
167+
Map<String, String> indexByTier,
168+
Map<String, TierStatsAccumulator> accumulators
169+
) {
170+
final RoutingNode node = routingNodes.node(nodeStats.getNode().getId());
171+
if (node != null) {
172+
StreamSupport.stream(node.spliterator(), false)
173+
.map(ShardRouting::index)
174+
.distinct()
175+
.forEach(index -> classifyIndexAndCollectStats(index, nodeStats, indexByTier, node, accumulators));
176+
}
177+
}
178+
179+
/**
180+
* Determine which tier an index belongs in, then accumulate its stats into that tier's stats.
181+
*/
182+
private static void classifyIndexAndCollectStats(
183+
Index index,
184+
NodeStats nodeStats,
185+
Map<String, String> indexByTier,
186+
RoutingNode node,
187+
Map<String, TierStatsAccumulator> accumulators
188+
) {
189+
// Look up which tier this index belongs to (its most preferred)
190+
String indexTier = indexByTier.get(index.getName());
191+
if (indexTier != null) {
192+
final TierStatsAccumulator accumulator = accumulators.computeIfAbsent(indexTier, k -> new TierStatsAccumulator());
193+
accumulator.indexNames.add(index.getName());
194+
aggregateDataTierShardStats(nodeStats, index, node, accumulator);
195+
}
196+
}
197+
198+
/**
199+
* Collect shard-level data tier stats from shard stats contained in the node stats response.
200+
*/
201+
private static void aggregateDataTierShardStats(NodeStats nodeStats, Index index, RoutingNode node, TierStatsAccumulator accumulator) {
202+
// Shard based stats
203+
final List<IndexShardStats> allShardStats = nodeStats.getIndices().getShardStats(index);
204+
if (allShardStats != null) {
205+
for (IndexShardStats shardStat : allShardStats) {
206+
accumulator.totalByteCount += shardStat.getTotal().getStore().totalDataSetSizeInBytes();
207+
accumulator.docCount += shardStat.getTotal().getDocs().getCount();
208+
209+
// Accumulate stats about started shards
210+
ShardRouting shardRouting = node.getByShardId(shardStat.getShardId());
211+
if (shardRouting != null && shardRouting.state() == ShardRoutingState.STARTED) {
212+
accumulator.totalShardCount += 1;
213+
214+
// Accumulate stats about started primary shards
215+
StoreStats primaryStoreStats = shardStat.getPrimary().getStore();
216+
if (primaryStoreStats != null) {
217+
// if primaryStoreStats is null, it means there is no primary on the node in question
218+
accumulator.primaryShardCount++;
219+
long primarySize = primaryStoreStats.totalDataSetSizeInBytes();
220+
accumulator.primaryByteCount += primarySize;
221+
accumulator.valueSketch.add(primarySize);
222+
}
223+
}
224+
}
225+
}
226+
}
227+
228+
private static DataTiersFeatureSetUsage.TierSpecificStats calculateFinalTierStats(TierStatsAccumulator accumulator) {
229+
long primaryShardSizeMedian = (long) accumulator.valueSketch.quantile(0.5);
230+
long primaryShardSizeMAD = computeMedianAbsoluteDeviation(accumulator.valueSketch);
231+
return new DataTiersFeatureSetUsage.TierSpecificStats(
232+
accumulator.nodeCount,
233+
accumulator.indexNames.size(),
234+
accumulator.totalShardCount,
235+
accumulator.primaryShardCount,
236+
accumulator.docCount,
237+
accumulator.totalByteCount,
238+
accumulator.primaryByteCount,
239+
primaryShardSizeMedian,
240+
primaryShardSizeMAD
241+
);
242+
}
243+
244+
// Visible for testing
245+
static long computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
246+
if (valuesSketch.size() == 0) {
247+
return 0;
248+
} else {
249+
final double approximateMedian = valuesSketch.quantile(0.5);
250+
final TDigestState approximatedDeviationsSketch = TDigestState.createUsingParamsFrom(valuesSketch);
251+
valuesSketch.centroids().forEach(centroid -> {
252+
final double deviation = Math.abs(approximateMedian - centroid.mean());
253+
approximatedDeviationsSketch.add(deviation, centroid.count());
254+
});
255+
256+
return (long) approximatedDeviationsSketch.quantile(0.5);
257+
}
258+
}
259+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
2929
import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage;
3030
import org.elasticsearch.xpack.core.datastreams.DataStreamLifecycleFeatureSetUsage;
31-
import org.elasticsearch.xpack.core.datatiers.DataTiersFeatureSetUsage;
3231
import org.elasticsearch.xpack.core.downsample.DownsampleShardStatus;
3332
import org.elasticsearch.xpack.core.enrich.EnrichFeatureSetUsage;
3433
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@
9898
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
9999
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
100100
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
101-
import org.elasticsearch.xpack.core.datatiers.DataTiersInfoTransportAction;
102-
import org.elasticsearch.xpack.core.datatiers.DataTiersUsageTransportAction;
103-
import org.elasticsearch.xpack.core.datatiers.NodesDataTiersUsageTransportAction;
104101
import org.elasticsearch.xpack.core.ml.MlMetadata;
105102
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
106103
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
@@ -365,7 +362,6 @@ public Collection<?> createComponents(PluginServices services) {
365362
actions.add(new ActionHandler<>(XPackUsageFeatureAction.DATA_STREAM_LIFECYCLE, DataStreamLifecycleUsageTransportAction.class));
366363
actions.add(new ActionHandler<>(XPackUsageFeatureAction.HEALTH, HealthApiUsageTransportAction.class));
367364
actions.add(new ActionHandler<>(XPackUsageFeatureAction.REMOTE_CLUSTERS, RemoteClusterUsageTransportAction.class));
368-
actions.add(new ActionHandler<>(NodesDataTiersUsageTransportAction.TYPE, NodesDataTiersUsageTransportAction.class));
369365
return actions;
370366
}
371367

0 commit comments

Comments
 (0)