Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ The API returns the following response:
},
...
],
"ingest": {
"number_of_pipelines" : 1,
"processor_stats": {
...
}
},
"network_types": {
...
},
Expand All @@ -244,6 +250,7 @@ The API returns the following response:
// TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/]
// TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/]
// TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/]
// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/]
// TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/]
// TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/]
// TESTRESPONSE[s/: true|false/: $body.$_path/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ClusterStatsNodes implements ToXContentFragment {
Expand All @@ -64,6 +66,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final NetworkTypes networkTypes;
private final DiscoveryTypes discoveryTypes;
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
Expand Down Expand Up @@ -97,6 +100,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
}

public Counts getCounts() {
Expand Down Expand Up @@ -178,6 +182,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
discoveryTypes.toXContent(builder, params);

packagingTypes.toXContent(builder, params);

ingestStats.toXContent(builder, params);

return builder;
}

Expand Down Expand Up @@ -690,4 +697,68 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

}

static class IngestStats implements ToXContentFragment {

final int pipelineCount;
final SortedMap<String, long[]> stats;

IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String,
List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat.getIngestStats()
.getProcessorStats().entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis()
};
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
return v;
}
});
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.stats = Collections.unmodifiableSortedMap(stats);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject("ingest");
{
builder.field("number_of_pipelines", pipelineCount);
builder.startObject("processor_stats");
for (Map.Entry<String, long[]> stat : stats.entrySet()) {
long[] statValues = stat.getValue();
builder.startObject(stat.getKey());
builder.field("count", statValues[0]);
builder.field("failed", statValues[1]);
builder.field("current", statValues[2]);
builder.humanReadableField("time_in_millis", "time",
new TimeValue(statValues[3], TimeUnit.MILLISECONDS));
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, false, false);
true, true, true, false, true, false, false, false, false, false, true, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public IngestStats stats() {
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
return statsBuilder.build();
Expand Down
24 changes: 20 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ public IngestStats(StreamInput in) throws IOException {
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
String processorType = "_NOT_AVAILABLE";
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
processorType = in.readString();
}
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
Expand All @@ -92,6 +96,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeString(processorStat.getType());
}
processorStat.getStats().writeTo(out);
}
}
Expand All @@ -115,9 +122,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
}
}
builder.endArray();
Expand Down Expand Up @@ -229,9 +239,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
return this;
}

Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
.add(new ProcessorStat(processorName, processorType, metric.createStats()));
return this;
}

Expand Down Expand Up @@ -267,17 +277,23 @@ public Stats getStats() {
*/
public static class ProcessorStat {
private final String name;
private final String type;
private final Stats stats;

public ProcessorStat(String name, Stats stats) {
public ProcessorStat(String name, String type, Stats stats) {
this.name = name;
this.type = type;
this.stats = stats;
}

public String getName() {
return name;
}

public String getType() {
return type;
}

public Stats getStats() {
return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testSerialization() throws IOException {
}
}

private static NodeStats createNodeStats() {
public static NodeStats createNodeStats() {
DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), VersionUtils.randomVersion(random()));
OsStats osStats = null;
Expand Down Expand Up @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() {
for (int j =0; j < numProcessors;j++) {
IngestStats.Stats processorStats = new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10),
randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.hamcrest.Matchers.equalTo;

public class ClusterStatsNodesTests extends ESTestCase {

Expand Down Expand Up @@ -59,6 +67,41 @@ public void testNetworkTypesToXContent() throws Exception {
+ "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString());
}

public void testIngestStats() throws Exception {
NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats);

SortedMap<String, long[]> processorStats = new TreeMap<>();
nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(),
new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(),
s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()})));
ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats));
assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size()));
String processorStatsString = "{";
Iterator<Map.Entry<String, long[]>> iter = processorStats.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, long[]> entry = iter.next();
long[] statValues = entry.getValue();
long count = statValues[0];
long failedCount = statValues[1];
long current = statValues[2];
long timeInMillis = statValues[3];
processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count
+ ",\"failed\":" + failedCount
+ ",\"current\":" + current
+ ",\"time_in_millis\":" + timeInMillis
+ "}";
if (iter.hasNext()) {
processorStatsString += ",";
}
}
processorStatsString += "}";
assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo(
"{\"ingest\":{"
+ "\"number_of_pipelines\":" + stats.pipelineCount + ","
+ "\"processor_stats\":" + processorStatsString
+ "}}"));
}

private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
Settings.Builder settings = Settings.builder();
if (transportType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testSerialization() throws IOException {
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats);
IngestStats serializedStats = serialize(ingestStats);
assertIngestStats(ingestStats, serializedStats, true);
assertIngestStats(ingestStats, serializedStats, true, true);
}

public void testReadLegacyStream() throws IOException {
Expand All @@ -63,7 +63,24 @@ public void testReadLegacyStream() throws IOException {
in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0));
IngestStats serializedStats = new IngestStats(in);
IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap());
assertIngestStats(expectedStats, serializedStats, false);
assertIngestStats(expectedStats, serializedStats, false, true);
}

public void testBWCIngestProcessorTypeStats() throws IOException {
IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300);
List<IngestStats.PipelineStat> pipelineStats = createPipelineStats();
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats expectedIngestStats = new IngestStats(totalStats, pipelineStats, processorStats);

//legacy output logic
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0));
expectedIngestStats.writeTo(out);

StreamInput in = out.bytes().streamInput();
in.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0));
IngestStats serializedStats = new IngestStats(in);
assertIngestStats(expectedIngestStats, serializedStats, true, false);
}

private List<IngestStats.PipelineStat> createPipelineStats() {
Expand All @@ -75,9 +92,10 @@ private List<IngestStats.PipelineStat> createPipelineStats() {

private Map<String, List<IngestStats.ProcessorStat>> createProcessorStats(List<IngestStats.PipelineStat> pipelineStats){
assert(pipelineStats.size() >= 2);
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type",
new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
return MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
Expand All @@ -92,7 +110,8 @@ private IngestStats serialize(IngestStats stats) throws IOException {
return new IngestStats(in);
}

private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){
private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors,
boolean expectProcessorTypes){
assertNotSame(ingestStats, serializedStats);
assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats());
assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats());
Expand All @@ -114,6 +133,11 @@ private void assertIngestStats(IngestStats ingestStats, IngestStats serializedSt
for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) {
IngestStats.ProcessorStat ps = it.next();
assertEquals(ps.getName(), serializedProcessorStat.getName());
if (expectProcessorTypes) {
assertEquals(ps.getType(), serializedProcessorStat.getType());
} else {
assertEquals("_NOT_AVAILABLE", serializedProcessorStat.getType());
}
assertStats(ps.getStats(), serializedProcessorStat.getStats());
}
assertFalse(it.hasNext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,11 @@ public void testToXContent() throws IOException {
+ "\"type\":\"docker\","
+ "\"count\":1"
+ "}"
+ "]"
+ "],"
+ "\"ingest\":{"
+ "\"number_of_pipelines\":0,"
+ "\"processor_stats\":{}"
+ "}"
+ "}"
+ "},"
+ "\"cluster_state\":{"
Expand Down