From 1bb4727f2e095d3eeeda68e6a0c46783647119ba Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 30 Aug 2019 14:03:08 -0700 Subject: [PATCH 1/7] add ingest info to cluster stats --- .../cluster/stats/ClusterStatsNodes.java | 60 +++++++++++++++++++ .../stats/TransportClusterStatsAction.java | 2 +- .../elasticsearch/ingest/IngestService.java | 2 +- .../org/elasticsearch/ingest/IngestStats.java | 25 ++++++-- .../cluster/node/stats/NodeStatsTests.java | 5 +- .../cluster/stats/ClusterStatsNodesTests.java | 32 ++++++++++ .../ingest/IngestStatsTests.java | 7 ++- 7 files changed, 122 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index bcadbc8e3292a..710f7c0b76e59 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -36,6 +36,8 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.ingest.IngestInfo; +import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.plugins.PluginInfo; @@ -49,7 +51,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; public class ClusterStatsNodes implements ToXContentFragment { @@ -64,6 +68,7 @@ public class ClusterStatsNodes implements ToXContentFragment { private final NetworkTypes networkTypes; private final DiscoveryTypes discoveryTypes; private final PackagingTypes packagingTypes; + private IngestStats ingestStats; ClusterStatsNodes(List nodeResponses) { this.versions = new HashSet<>(); @@ -97,6 +102,7 @@ public class ClusterStatsNodes implements ToXContentFragment { this.networkTypes = new NetworkTypes(nodeInfos); this.discoveryTypes = new DiscoveryTypes(nodeInfos); this.packagingTypes = new PackagingTypes(nodeInfos); + this.ingestStats = new IngestStats(nodeStats); } public Counts getCounts() { @@ -178,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws discoveryTypes.toXContent(builder, params); packagingTypes.toXContent(builder, params); + + ingestStats.toXContent(builder, params); + return builder; } @@ -690,4 +699,55 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa } + static class IngestStats implements ToXContentFragment { + + int pipelineCount; + Map stats = new HashMap<>(); + + IngestStats(final List nodeStats) { + Set pipelineIds = new HashSet<>(); + Map stats = new HashMap<>(); + for (NodeStats nodeStat : nodeStats) { + if (nodeStat.getIngestStats() != null) { + for (Map.Entry> processorStats : nodeStat.getIngestStats() + .getProcessorStats().entrySet()) { + pipelineIds.add(processorStats.getKey()); + for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { + stats.compute(stat.getType(), (k, v) -> { + if (v == null) { + return new long[] { stat.getStats().getIngestCount(), stat.getStats().getIngestFailedCount() }; + } else { + v[0] += stat.getStats().getIngestCount(); + v[1] += stat.getStats().getIngestFailedCount(); + return v; + } + }); + } + } + } + } + this.pipelineCount = pipelineIds.size(); + this.stats = Collections.unmodifiableMap(stats); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject("ingest"); + { + builder.field("number_of_pipelines", pipelineCount); + builder.startObject("processor_stats"); + for (Map.Entry stat : stats.entrySet()) { + builder.startObject(stat.getKey()); + builder.field("count", stat.getValue()[0]); + builder.field("fail_count", stat.getValue()[1]); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5ecf569397d4f..4f23874a54fbd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -95,7 +95,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) { NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false); NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, - true, true, true, false, true, false, false, false, false, false, false, false); + true, true, true, false, true, false, false, false, false, false, true, false); List shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index dc5285d0c48c1..b17b530aca9f0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -413,7 +413,7 @@ public IngestStats stats() { processorMetrics.forEach(t -> { Processor processor = t.v1(); IngestMetric processorMetric = t.v2(); - statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric); }); }); return statsBuilder.build(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index f140c5f155563..b7696ec61268f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -67,8 +68,12 @@ public IngestStats(StreamInput in) throws IOException { List processorStatsPerPipeline = new ArrayList<>(processorsSize); for (int j = 0; j < processorsSize; j++) { String processorName = in.readString(); + String processorType = null; + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + processorType = in.readString(); + } Stats processorStat = new Stats(in); - processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat)); } this.processorStats.put(pipelineId, processorStatsPerPipeline); } @@ -88,6 +93,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(processorStatsForPipeline.size()); for (ProcessorStat processorStat : processorStatsForPipeline) { out.writeString(processorStat.getName()); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeString(processorStat.getType()); + } processorStat.getStats().writeTo(out); } } @@ -110,9 +118,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (ProcessorStat processorStat : processorStatsForPipeline) { builder.startObject(); builder.startObject(processorStat.getName()); + builder.field("type", processorStat.getType()); + builder.startObject("stats"); processorStat.getStats().toXContent(builder, params); builder.endObject(); builder.endObject(); + builder.endObject(); } } builder.endArray(); @@ -224,9 +235,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { return this; } - Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) { this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) - .add(new ProcessorStat(processorName, metric.createStats())); + .add(new ProcessorStat(processorName, processorType, metric.createStats())); return this; } @@ -262,10 +273,12 @@ public Stats getStats() { */ public static class ProcessorStat { private final String name; + private final String type; private final Stats stats; - public ProcessorStat(String name, Stats stats) { + public ProcessorStat(String name, String type, Stats stats) { this.name = name; + this.type = type; this.stats = stats; } @@ -273,6 +286,10 @@ public String getName() { return name; } + public String getType() { + return type; + } + public Stats getStats() { return stats; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 57395859c503f..fdd71ba98b495 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -315,7 +315,7 @@ public void testSerialization() throws IOException { } } - private static NodeStats createNodeStats() { + public static NodeStats createNodeStats() { DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), VersionUtils.randomVersion(random())); OsStats osStats = null; @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() { for (int j =0; j < numProcessors;j++) { IngestStats.Stats processorStats = new IngestStats.Stats (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), + randomAlphaOfLengthBetween(3, 10), processorStats)); } ingestProcessorStats.put(pipelineId,processorPerPipeline); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 04edc775a2d53..91c5ac65daebd 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -27,11 +28,18 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests.createNodeStats; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; +import static org.hamcrest.Matchers.equalTo; public class ClusterStatsNodesTests extends ESTestCase { @@ -59,6 +67,30 @@ public void testNetworkTypesToXContent() throws Exception { + "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString()); } + public void testIngestStats() throws Exception { + NodeStats nodeStats = createNodeStats(); + + SortedSet processorTypes = new TreeSet<>(); + nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorTypes.add(s.getType()))); + ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats)); + assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size())); + assertThat(stats.processorTypes, equalTo(processorTypes)); + Object[] processorTypesArray = processorTypes.toArray(); + String processorTypesString = "["; + for (int i = 0; i < processorTypesArray.length; i++) { + processorTypesString += "\"" + processorTypesArray[i] + "\""; + if (i < processorTypesArray.length - 1) { + processorTypesString += ","; + } + } + processorTypesString += "]"; + assertThat(toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString(), equalTo( + "{\"ingest\":{" + + "\"number_of_pipelines\":" + stats.pipelineCount + "," + + "\"processor_types\":" + processorTypesString + + "}}")); + } + private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) { Settings.Builder settings = Settings.builder(); if (transportType != null) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index bb132f73840f3..d50df691ac948 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -52,9 +52,10 @@ private List createPipelineStats() { private Map> createProcessorStats(List pipelineStats){ assert(pipelineStats.size() >= 2); - IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); - IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); - IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type", + new IngestStats.Stats(47, 97, 197, 297)); //pipeline1 -> processor1,processor2; pipeline2 -> processor3 return MapBuilder.>newMapBuilder() .put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) From 44eff3ef62c66c1e0e53003d4ab4b80a330aae09 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 24 Oct 2019 12:43:25 -0700 Subject: [PATCH 2/7] fixes --- .../cluster/stats/ClusterStatsNodes.java | 7 ++-- .../cluster/stats/ClusterStatsNodesTests.java | 33 ++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 710f7c0b76e59..6e6c79fe36f7a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -701,12 +702,12 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa static class IngestStats implements ToXContentFragment { - int pipelineCount; - Map stats = new HashMap<>(); + final int pipelineCount; + final Map stats; IngestStats(final List nodeStats) { Set pipelineIds = new HashSet<>(); - Map stats = new HashMap<>(); + SortedMap stats = new TreeMap<>(); for (NodeStats nodeStat : nodeStats) { if (nodeStat.getIngestStats() != null) { for (Map.Entry> processorStats : nodeStat.getIngestStats() diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 91c5ac65daebd..05222820ba5b6 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -29,11 +29,11 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -70,24 +70,27 @@ public void testNetworkTypesToXContent() throws Exception { public void testIngestStats() throws Exception { NodeStats nodeStats = createNodeStats(); - SortedSet processorTypes = new TreeSet<>(); - nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorTypes.add(s.getType()))); + SortedMap processorStats = new TreeMap<>(); + nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(), + new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount() }))); ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats)); assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size())); - assertThat(stats.processorTypes, equalTo(processorTypes)); - Object[] processorTypesArray = processorTypes.toArray(); - String processorTypesString = "["; - for (int i = 0; i < processorTypesArray.length; i++) { - processorTypesString += "\"" + processorTypesArray[i] + "\""; - if (i < processorTypesArray.length - 1) { - processorTypesString += ","; + String processorStatsString = "{"; + Iterator> iter = processorStats.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + long count = entry.getValue()[0]; + long failedCount = entry.getValue()[1]; + processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count + ",\"fail_count\":" + failedCount + "}"; + if (iter.hasNext()) { + processorStatsString += ","; } } - processorTypesString += "]"; + processorStatsString += "}"; assertThat(toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString(), equalTo( "{\"ingest\":{" + "\"number_of_pipelines\":" + stats.pipelineCount + "," - + "\"processor_types\":" + processorTypesString + + "\"processor_stats\":" + processorStatsString + "}}")); } From 0ae069701f709ba0f2ad6de531f0b735eda7fdab Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 24 Oct 2019 15:26:11 -0700 Subject: [PATCH 3/7] fix checkstyle --- .../action/admin/cluster/stats/ClusterStatsNodes.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 6e6c79fe36f7a..a891acf40104a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -36,8 +36,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; -import org.elasticsearch.ingest.IngestInfo; -import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.plugins.PluginInfo; @@ -52,9 +50,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; -import java.util.SortedSet; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; public class ClusterStatsNodes implements ToXContentFragment { @@ -710,8 +706,9 @@ static class IngestStats implements ToXContentFragment { SortedMap stats = new TreeMap<>(); for (NodeStats nodeStat : nodeStats) { if (nodeStat.getIngestStats() != null) { - for (Map.Entry> processorStats : nodeStat.getIngestStats() - .getProcessorStats().entrySet()) { + for (Map.Entry> processorStats : nodeStat.getIngestStats() + .getProcessorStats().entrySet()) { pipelineIds.add(processorStats.getKey()); for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { stats.compute(stat.getType(), (k, v) -> { From dc82b30bb15894abb744e6bce5d5cef14946b816 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 25 Oct 2019 08:33:30 -0700 Subject: [PATCH 4/7] Fix docs tests for cluster stats --- docs/reference/cluster/stats.asciidoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index dbb4ea25d365f..60cee385eabbc 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -227,6 +227,12 @@ The API returns the following response: }, ... ], + "ingest": { + "number_of_pipelines" : 1, + "processor_stats": { + ... + } + }, "network_types": { ... }, @@ -244,6 +250,7 @@ The API returns the following response: // TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/] // TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/] // TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/] +// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/] // TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/] // TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/] // TESTRESPONSE[s/: true|false/: $body.$_path/] From 40286c818d2be071e433649faea313dd2315d79c Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 25 Oct 2019 10:35:05 -0700 Subject: [PATCH 5/7] fix clusterstatsmonitoringdoctests --- .../collector/cluster/ClusterStatsMonitoringDocTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index a4e76d60f041c..f7da3118c5e29 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -536,7 +536,11 @@ public void testToXContent() throws IOException { + "\"type\":\"docker\"," + "\"count\":1" + "}" - + "]" + + "]," + + "\"ingest\":{" + + "\"number_of_pipelines\":0," + + "\"processor_stats\":{}" + + "}" + "}" + "}," + "\"cluster_state\":{" From aa84babbcd77240b6d66708f8e7cdae1f44e93e4 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 28 Oct 2019 06:47:19 -0700 Subject: [PATCH 6/7] respond to review code nits --- .../action/admin/cluster/stats/ClusterStatsNodes.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index a891acf40104a..3bf970e8af121 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -65,7 +65,7 @@ public class ClusterStatsNodes implements ToXContentFragment { private final NetworkTypes networkTypes; private final DiscoveryTypes discoveryTypes; private final PackagingTypes packagingTypes; - private IngestStats ingestStats; + private final IngestStats ingestStats; ClusterStatsNodes(List nodeResponses) { this.versions = new HashSet<>(); @@ -713,7 +713,10 @@ static class IngestStats implements ToXContentFragment { for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { stats.compute(stat.getType(), (k, v) -> { if (v == null) { - return new long[] { stat.getStats().getIngestCount(), stat.getStats().getIngestFailedCount() }; + return new long[] { + stat.getStats().getIngestCount(), + stat.getStats().getIngestFailedCount(), + }; } else { v[0] += stat.getStats().getIngestCount(); v[1] += stat.getStats().getIngestFailedCount(); @@ -725,7 +728,7 @@ static class IngestStats implements ToXContentFragment { } } this.pipelineCount = pipelineIds.size(); - this.stats = Collections.unmodifiableMap(stats); + this.stats = Map.copyOf(stats); } @Override From e3c7cd74f0e80e0c192176585170cf2358d11790 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 28 Oct 2019 07:31:48 -0700 Subject: [PATCH 7/7] add all ingest node stats to cluster stats --- .../cluster/stats/ClusterStatsNodes.java | 27 +++++++++++++------ .../cluster/stats/ClusterStatsNodesTests.java | 18 +++++++++---- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 3bf970e8af121..485f215e594b3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.plugins.PluginInfo; @@ -51,6 +52,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ClusterStatsNodes implements ToXContentFragment { @@ -699,7 +701,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa static class IngestStats implements ToXContentFragment { final int pipelineCount; - final Map stats; + final SortedMap stats; IngestStats(final List nodeStats) { Set pipelineIds = new HashSet<>(); @@ -712,14 +714,19 @@ static class IngestStats implements ToXContentFragment { pipelineIds.add(processorStats.getKey()); for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { stats.compute(stat.getType(), (k, v) -> { + org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats(); if (v == null) { return new long[] { - stat.getStats().getIngestCount(), - stat.getStats().getIngestFailedCount(), + nodeIngestStats.getIngestCount(), + nodeIngestStats.getIngestFailedCount(), + nodeIngestStats.getIngestCurrent(), + nodeIngestStats.getIngestTimeInMillis() }; } else { - v[0] += stat.getStats().getIngestCount(); - v[1] += stat.getStats().getIngestFailedCount(); + v[0] += nodeIngestStats.getIngestCount(); + v[1] += nodeIngestStats.getIngestFailedCount(); + v[2] += nodeIngestStats.getIngestCurrent(); + v[3] += nodeIngestStats.getIngestTimeInMillis(); return v; } }); @@ -728,7 +735,7 @@ static class IngestStats implements ToXContentFragment { } } this.pipelineCount = pipelineIds.size(); - this.stats = Map.copyOf(stats); + this.stats = Collections.unmodifiableSortedMap(stats); } @Override @@ -738,9 +745,13 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field("number_of_pipelines", pipelineCount); builder.startObject("processor_stats"); for (Map.Entry stat : stats.entrySet()) { + long[] statValues = stat.getValue(); builder.startObject(stat.getKey()); - builder.field("count", stat.getValue()[0]); - builder.field("fail_count", stat.getValue()[1]); + builder.field("count", statValues[0]); + builder.field("failed", statValues[1]); + builder.field("current", statValues[2]); + builder.humanReadableField("time_in_millis", "time", + new TimeValue(statValues[3], TimeUnit.MILLISECONDS)); builder.endObject(); } builder.endObject(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 05222820ba5b6..c6ba462eb9e33 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -72,22 +72,30 @@ public void testIngestStats() throws Exception { SortedMap processorStats = new TreeMap<>(); nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(), - new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount() }))); + new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(), + s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()}))); ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats)); assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size())); String processorStatsString = "{"; Iterator> iter = processorStats.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); - long count = entry.getValue()[0]; - long failedCount = entry.getValue()[1]; - processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count + ",\"fail_count\":" + failedCount + "}"; + long[] statValues = entry.getValue(); + long count = statValues[0]; + long failedCount = statValues[1]; + long current = statValues[2]; + long timeInMillis = statValues[3]; + processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count + + ",\"failed\":" + failedCount + + ",\"current\":" + current + + ",\"time_in_millis\":" + timeInMillis + + "}"; if (iter.hasNext()) { processorStatsString += ","; } } processorStatsString += "}"; - assertThat(toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString(), equalTo( + assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo( "{\"ingest\":{" + "\"number_of_pipelines\":" + stats.pipelineCount + "," + "\"processor_stats\":" + processorStatsString