Skip to content

Commit 0577703

Browse files
committed
Revert "ingest: processor stats (#34202)"
This reverts commit 6567729.
1 parent bf5f0af commit 0577703

File tree

12 files changed

+146
-652
lines changed

12 files changed

+146
-652
lines changed

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@
2020
package org.elasticsearch.ingest;
2121

2222
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.common.collect.Tuple;
2423

2524
import java.util.ArrayList;
2625
import java.util.Arrays;
2726
import java.util.Collections;
2827
import java.util.List;
2928
import java.util.Map;
30-
import java.util.concurrent.TimeUnit;
31-
import java.util.function.LongSupplier;
3229
import java.util.stream.Collectors;
3330

3431
/**
@@ -43,33 +40,16 @@ public class CompoundProcessor implements Processor {
4340
private final boolean ignoreFailure;
4441
private final List<Processor> processors;
4542
private final List<Processor> onFailureProcessors;
46-
private final List<Tuple<Processor, IngestMetric>> processorsWithMetrics;
47-
private final LongSupplier relativeTimeProvider;
48-
49-
CompoundProcessor(LongSupplier relativeTimeProvider, Processor... processor) {
50-
this(false, Arrays.asList(processor), Collections.emptyList(), relativeTimeProvider);
51-
}
5243

5344
public CompoundProcessor(Processor... processor) {
5445
this(false, Arrays.asList(processor), Collections.emptyList());
5546
}
5647

5748
public CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors) {
58-
this(ignoreFailure, processors, onFailureProcessors, System::nanoTime);
59-
}
60-
CompoundProcessor(boolean ignoreFailure, List<Processor> processors, List<Processor> onFailureProcessors,
61-
LongSupplier relativeTimeProvider) {
6249
super();
6350
this.ignoreFailure = ignoreFailure;
6451
this.processors = processors;
6552
this.onFailureProcessors = onFailureProcessors;
66-
this.relativeTimeProvider = relativeTimeProvider;
67-
this.processorsWithMetrics = new ArrayList<>(processors.size());
68-
processors.forEach(p -> processorsWithMetrics.add(new Tuple<>(p, new IngestMetric())));
69-
}
70-
71-
List<Tuple<Processor, IngestMetric>> getProcessorsWithMetrics() {
72-
return processorsWithMetrics;
7353
}
7454

7555
public boolean isIgnoreFailure() {
@@ -114,17 +94,12 @@ public String getTag() {
11494

11595
@Override
11696
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
117-
for (Tuple<Processor, IngestMetric> processorWithMetric : processorsWithMetrics) {
118-
Processor processor = processorWithMetric.v1();
119-
IngestMetric metric = processorWithMetric.v2();
120-
long startTimeInNanos = relativeTimeProvider.getAsLong();
97+
for (Processor processor : processors) {
12198
try {
122-
metric.preIngest();
12399
if (processor.execute(ingestDocument) == null) {
124100
return null;
125101
}
126102
} catch (Exception e) {
127-
metric.ingestFailed();
128103
if (ignoreFailure) {
129104
continue;
130105
}
@@ -137,15 +112,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
137112
executeOnFailure(ingestDocument, compoundProcessorException);
138113
break;
139114
}
140-
} finally {
141-
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
142-
metric.postIngest(ingestTimeInMillis);
143115
}
144116
}
145117
return ingestDocument;
146118
}
147119

148-
149120
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
150121
try {
151122
putFailureMetadata(ingestDocument, exception);

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import java.util.ListIterator;
2929
import java.util.Map;
3030
import java.util.Set;
31-
import java.util.concurrent.TimeUnit;
32-
import java.util.function.LongSupplier;
3331
import java.util.stream.Collectors;
3432
import org.elasticsearch.script.IngestConditionalScript;
3533
import org.elasticsearch.script.Script;
@@ -44,51 +42,24 @@ public class ConditionalProcessor extends AbstractProcessor {
4442
private final ScriptService scriptService;
4543

4644
private final Processor processor;
47-
private final IngestMetric metric;
48-
private final LongSupplier relativeTimeProvider;
4945

5046
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor) {
51-
this(tag, script, scriptService, processor, System::nanoTime);
52-
}
53-
54-
ConditionalProcessor(String tag, Script script, ScriptService scriptService, Processor processor, LongSupplier relativeTimeProvider) {
5547
super(tag);
5648
this.condition = script;
5749
this.scriptService = scriptService;
5850
this.processor = processor;
59-
this.metric = new IngestMetric();
60-
this.relativeTimeProvider = relativeTimeProvider;
6151
}
6252

6353
@Override
6454
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
6555
IngestConditionalScript script =
6656
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
6757
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
68-
// Only record metric if the script evaluates to true
69-
long startTimeInNanos = relativeTimeProvider.getAsLong();
70-
try {
71-
metric.preIngest();
72-
return processor.execute(ingestDocument);
73-
} catch (Exception e) {
74-
metric.ingestFailed();
75-
throw e;
76-
} finally {
77-
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
78-
metric.postIngest(ingestTimeInMillis);
79-
}
58+
return processor.execute(ingestDocument);
8059
}
8160
return ingestDocument;
8261
}
8362

84-
Processor getProcessor() {
85-
return processor;
86-
}
87-
88-
IngestMetric getMetric() {
89-
return metric;
90-
}
91-
9263
@Override
9364
public String getType() {
9465
return TYPE;

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 16 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,19 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Objects;
29+
import java.util.Set;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.function.BiConsumer;
32+
import java.util.function.Consumer;
33+
import java.util.stream.Collectors;
34+
2235
import org.elasticsearch.ElasticsearchParseException;
2336
import org.elasticsearch.ExceptionsHelper;
2437
import org.elasticsearch.ResourceNotFoundException;
@@ -36,7 +49,6 @@
3649
import org.elasticsearch.cluster.metadata.MetaData;
3750
import org.elasticsearch.cluster.node.DiscoveryNode;
3851
import org.elasticsearch.cluster.service.ClusterService;
39-
import org.elasticsearch.common.collect.Tuple;
4052
import org.elasticsearch.common.regex.Regex;
4153
import org.elasticsearch.common.unit.TimeValue;
4254
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -49,19 +61,6 @@
4961
import org.elasticsearch.script.ScriptService;
5062
import org.elasticsearch.threadpool.ThreadPool;
5163

52-
import java.util.ArrayList;
53-
import java.util.Collections;
54-
import java.util.HashMap;
55-
import java.util.HashSet;
56-
import java.util.Iterator;
57-
import java.util.List;
58-
import java.util.Map;
59-
import java.util.Objects;
60-
import java.util.Set;
61-
import java.util.concurrent.TimeUnit;
62-
import java.util.function.BiConsumer;
63-
import java.util.function.Consumer;
64-
6564
/**
6665
* Holder class for several ingest related services.
6766
*/
@@ -263,59 +262,11 @@ public void applyClusterState(final ClusterChangedEvent event) {
263262
Pipeline originalPipeline = originalPipelines.get(id);
264263
if (originalPipeline != null) {
265264
pipeline.getMetrics().add(originalPipeline.getMetrics());
266-
List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
267-
List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
268-
getProcessorMetrics(originalPipeline.getCompoundProcessor(), oldPerProcessMetrics);
269-
getProcessorMetrics(pipeline.getCompoundProcessor(), newPerProcessMetrics);
270-
//Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
271-
//the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
272-
//consistent id's per processor and/or semantic equals for each processor will be needed.
273-
if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
274-
Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
275-
for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
276-
String type = compositeMetric.v1().getType();
277-
IngestMetric metric = compositeMetric.v2();
278-
if (oldMetricsIterator.hasNext()) {
279-
Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
280-
String oldType = oldCompositeMetric.v1().getType();
281-
IngestMetric oldMetric = oldCompositeMetric.v2();
282-
if (type.equals(oldType)) {
283-
metric.add(oldMetric);
284-
}
285-
}
286-
}
287-
}
288265
}
289266
});
290267
}
291268
}
292269

293-
/**
294-
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
295-
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
296-
* @param compoundProcessor The compound processor to start walking the non-failure processors
297-
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
298-
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
299-
*/
300-
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
301-
List<Tuple<Processor, IngestMetric>> processorMetrics) {
302-
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
303-
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
304-
Processor processor = processorWithMetric.v1();
305-
IngestMetric metric = processorWithMetric.v2();
306-
if (processor instanceof CompoundProcessor) {
307-
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
308-
} else {
309-
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
310-
if (processor instanceof ConditionalProcessor) {
311-
metric = ((ConditionalProcessor) processor).getMetric();
312-
}
313-
processorMetrics.add(new Tuple<>(processor, metric));
314-
}
315-
}
316-
return processorMetrics;
317-
}
318-
319270
private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
320271
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
321272
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
@@ -420,42 +371,11 @@ protected void doRun() {
420371
}
421372

422373
public IngestStats stats() {
423-
IngestStats.Builder statsBuilder = new IngestStats.Builder();
424-
statsBuilder.addTotalMetrics(totalMetrics);
425-
pipelines.forEach((id, pipeline) -> {
426-
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
427-
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
428-
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
429-
getProcessorMetrics(rootProcessor, processorMetrics);
430-
processorMetrics.forEach(t -> {
431-
Processor processor = t.v1();
432-
IngestMetric processorMetric = t.v2();
433-
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
434-
});
435-
});
436-
return statsBuilder.build();
437-
}
438374

439-
//package private for testing
440-
static String getProcessorName(Processor processor){
441-
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
442-
if(processor instanceof ConditionalProcessor){
443-
processor = ((ConditionalProcessor) processor).getProcessor();
444-
}
445-
StringBuilder sb = new StringBuilder(5);
446-
sb.append(processor.getType());
375+
Map<String, IngestStats.Stats> statsPerPipeline =
376+
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));
447377

448-
if(processor instanceof PipelineProcessor){
449-
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
450-
sb.append(":");
451-
sb.append(pipelineName);
452-
}
453-
String tag = processor.getTag();
454-
if(tag != null && !tag.isEmpty()){
455-
sb.append(":");
456-
sb.append(tag);
457-
}
458-
return sb.toString();
378+
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
459379
}
460380

461381
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {

0 commit comments

Comments
 (0)