Skip to content

Commit 9c43b98

Browse files
Preventing serialization errors in the nodes stats API (#90319) (#90429)
Preventing serialization errors in the nodes stats API, and adding logging to the ingest counter code so that we can find the root cause of the problem in the future. Co-authored-by: Elastic Machine <[email protected]>
1 parent 2e74803 commit 9c43b98

File tree

7 files changed

+168
-73
lines changed

7 files changed

+168
-73
lines changed

docs/changelog/90319.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 90319
2+
summary: Preventing serialization errors in the nodes stats API
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 77973

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchException;
1214
import org.elasticsearch.core.Tuple;
1315

@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124
import java.util.stream.Collectors;
@@ -30,6 +33,8 @@ public class CompoundProcessor implements Processor {
3033
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
3134
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
3235

36+
private static final Logger logger = LogManager.getLogger(CompoundProcessor.class);
37+
3338
private final boolean ignoreFailure;
3439
private final List<Processor> processors;
3540
private final List<Processor> onFailureProcessors;
@@ -191,25 +196,43 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
191196
final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2();
192197
final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1();
193198
final IngestDocument finalIngestDocument = ingestDocument;
199+
/*
200+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
201+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
202+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
203+
* is only executed once.
204+
*/
205+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
194206
finalMetric.preIngest();
207+
final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false);
195208
try {
196209
finalProcessor.execute(ingestDocument, (result, e) -> {
197-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
198-
finalMetric.postIngest(ingestTimeInNanos);
199-
200-
if (e != null) {
201-
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
210+
if (listenerHasBeenCalled.getAndSet(true)) {
211+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
212+
assert false : "A listener was unexpectedly called more than once";
202213
} else {
203-
if (result != null) {
204-
innerExecute(nextProcessor, result, handler);
214+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
215+
finalMetric.postIngest(ingestTimeInNanos);
216+
postIngestHasBeenCalled.set(true);
217+
if (e != null) {
218+
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
205219
} else {
206-
handler.accept(null, null);
220+
if (result != null) {
221+
innerExecute(nextProcessor, result, handler);
222+
} else {
223+
handler.accept(null, null);
224+
}
207225
}
208226
}
209227
});
210228
} catch (Exception e) {
211229
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
212-
finalMetric.postIngest(ingestTimeInNanos);
230+
if (postIngestHasBeenCalled.get()) {
231+
logger.warn("Preventing postIngest from being called more than once", new RuntimeException());
232+
assert false : "Attempt to call postIngest more than once";
233+
} else {
234+
finalMetric.postIngest(ingestTimeInNanos);
235+
}
213236
executeOnFailureOuter(currentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
214237
}
215238
}

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.logging.DeprecationCategory;
1214
import org.elasticsearch.common.logging.DeprecationLogger;
1315
import org.elasticsearch.script.DynamicMap;
@@ -26,6 +28,7 @@
2628
import java.util.ListIterator;
2729
import java.util.Map;
2830
import java.util.Set;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2932
import java.util.function.BiConsumer;
3033
import java.util.function.Function;
3134
import java.util.function.LongSupplier;
@@ -45,6 +48,8 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
4548
return value;
4649
});
4750

51+
private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class);
52+
4853
static final String TYPE = "conditional";
4954

5055
private final Script condition;
@@ -120,15 +125,27 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
120125

121126
if (matches) {
122127
final long startTimeInNanos = relativeTimeProvider.getAsLong();
128+
/*
129+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
130+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
131+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
132+
* is only executed once.
133+
*/
134+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
123135
metric.preIngest();
124136
processor.execute(ingestDocument, (result, e) -> {
125-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
126-
metric.postIngest(ingestTimeInNanos);
127-
if (e != null) {
128-
metric.ingestFailed();
129-
handler.accept(null, e);
137+
if (listenerHasBeenCalled.getAndSet(true)) {
138+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
139+
assert false : "A listener was unexpectedly called more than once";
130140
} else {
131-
handler.accept(result, null);
141+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
142+
metric.postIngest(ingestTimeInNanos);
143+
if (e != null) {
144+
metric.ingestFailed();
145+
handler.accept(null, e);
146+
} else {
147+
handler.accept(result, null);
148+
}
132149
}
133150
});
134151
} else {

server/src/main/java/org/elasticsearch/ingest/IngestMetric.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.common.metrics.CounterMetric;
1214

1315
import java.util.concurrent.TimeUnit;
@@ -22,6 +24,8 @@
2224
*/
2325
class IngestMetric {
2426

27+
private static final Logger logger = LogManager.getLogger(IngestMetric.class);
28+
2529
/**
2630
* The time it takes to complete the measured item.
2731
*/
@@ -53,7 +57,19 @@ void preIngest() {
5357
*/
5458
void postIngest(long ingestTimeInNanos) {
5559
long current = ingestCurrent.decrementAndGet();
56-
assert current >= 0 : "ingest metric current count double-decremented";
60+
if (current < 0) {
61+
/*
62+
* This ought to never happen. However if it does, it's incredibly bad because ingestCurrent being negative causes a
63+
* serialization error that prevents the nodes stats API from working. So we're doing 3 things here:
64+
* (1) Log a stack trace at warn level so that the Elasticsearch engineering team can track down and fix the source of the
65+
* bug if it still exists
66+
* (2) Throw an AssertionError if assertions are enabled so that we are aware of the bug
67+
* (3) Increment the counter back up so that we don't hit serialization failures
68+
*/
69+
logger.warn("Current ingest counter decremented below 0", new RuntimeException());
70+
assert false : "ingest metric current count double-decremented";
71+
ingestCurrent.incrementAndGet();
72+
}
5773
this.ingestTimeInNanos.inc(ingestTimeInNanos);
5874
ingestCount.inc();
5975
}
@@ -84,6 +100,8 @@ void add(IngestMetric metrics) {
84100
IngestStats.Stats createStats() {
85101
// we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting
86102
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count());
87-
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count());
103+
// It is possible for the current count to briefly drop below 0, causing serialization problems. See #90319
104+
long currentCount = Math.max(0, ingestCurrent.get());
105+
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, currentCount, ingestFailed.count());
88106
}
89107
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.Objects;
7373
import java.util.Set;
7474
import java.util.concurrent.CopyOnWriteArrayList;
75+
import java.util.concurrent.atomic.AtomicBoolean;
7576
import java.util.concurrent.atomic.AtomicInteger;
7677
import java.util.function.BiConsumer;
7778
import java.util.function.Consumer;
@@ -884,60 +885,72 @@ private void innerExecute(
884885
VersionType versionType = indexRequest.versionType();
885886
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
886887
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
888+
/*
889+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
890+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
891+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
892+
* is only executed once.
893+
*/
894+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
887895
ingestDocument.executePipeline(pipeline, (result, e) -> {
888-
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
889-
totalMetrics.postIngest(ingestTimeInNanos);
890-
if (e != null) {
891-
totalMetrics.ingestFailed();
892-
handler.accept(e);
893-
} else if (result == null) {
894-
itemDroppedHandler.accept(slot);
895-
handler.accept(null);
896+
if (listenerHasBeenCalled.getAndSet(true)) {
897+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
898+
assert false : "A listener was unexpectedly called more than once";
896899
} else {
897-
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
898-
899-
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
900-
// before ingestion, which might also get modified during ingestion.
901-
indexRequest.index(metadata.getIndex());
902-
indexRequest.id(metadata.getId());
903-
indexRequest.routing(metadata.getRouting());
904-
indexRequest.version(metadata.getVersion());
905-
if (metadata.getVersionType() != null) {
906-
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
907-
}
908-
Number number;
909-
if ((number = metadata.getIfSeqNo()) != null) {
910-
indexRequest.setIfSeqNo(number.longValue());
911-
}
912-
if ((number = metadata.getIfPrimaryTerm()) != null) {
913-
indexRequest.setIfPrimaryTerm(number.longValue());
914-
}
915-
try {
916-
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
917-
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
918-
} catch (IllegalArgumentException ex) {
919-
// An IllegalArgumentException can be thrown when an ingest
920-
// processor creates a source map that is self-referencing.
921-
// In that case, we catch and wrap the exception so we can
922-
// include which pipeline failed.
900+
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
901+
totalMetrics.postIngest(ingestTimeInNanos);
902+
if (e != null) {
923903
totalMetrics.ingestFailed();
924-
handler.accept(
925-
new IllegalArgumentException(
926-
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
927-
ex
928-
)
929-
);
930-
return;
931-
}
932-
Map<String, String> map;
933-
if ((map = metadata.getDynamicTemplates()) != null) {
934-
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
935-
mergedDynamicTemplates.putAll(map);
936-
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
937-
}
938-
postIngest(ingestDocument, indexRequest);
904+
handler.accept(e);
905+
} else if (result == null) {
906+
itemDroppedHandler.accept(slot);
907+
handler.accept(null);
908+
} else {
909+
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
910+
911+
// it's fine to set all metadata fields all the time, as ingest document holds their starting values
912+
// before ingestion, which might also get modified during ingestion.
913+
indexRequest.index(metadata.getIndex());
914+
indexRequest.id(metadata.getId());
915+
indexRequest.routing(metadata.getRouting());
916+
indexRequest.version(metadata.getVersion());
917+
if (metadata.getVersionType() != null) {
918+
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
919+
}
920+
Number number;
921+
if ((number = metadata.getIfSeqNo()) != null) {
922+
indexRequest.setIfSeqNo(number.longValue());
923+
}
924+
if ((number = metadata.getIfPrimaryTerm()) != null) {
925+
indexRequest.setIfPrimaryTerm(number.longValue());
926+
}
927+
try {
928+
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
929+
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
930+
} catch (IllegalArgumentException ex) {
931+
// An IllegalArgumentException can be thrown when an ingest
932+
// processor creates a source map that is self-referencing.
933+
// In that case, we catch and wrap the exception so we can
934+
// include which pipeline failed.
935+
totalMetrics.ingestFailed();
936+
handler.accept(
937+
new IllegalArgumentException(
938+
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
939+
ex
940+
)
941+
);
942+
return;
943+
}
944+
Map<String, String> map;
945+
if ((map = metadata.getDynamicTemplates()) != null) {
946+
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
947+
mergedDynamicTemplates.putAll(map);
948+
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
949+
}
950+
postIngest(ingestDocument, indexRequest);
939951

940-
handler.accept(null);
952+
handler.accept(null);
953+
}
941954
}
942955
});
943956
}

server/src/main/java/org/elasticsearch/ingest/Pipeline.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.elasticsearch.ingest;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.ElasticsearchParseException;
1214
import org.elasticsearch.core.Nullable;
1315
import org.elasticsearch.script.ScriptService;
@@ -16,6 +18,7 @@
1618
import java.util.Collections;
1719
import java.util.List;
1820
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1922
import java.util.function.BiConsumer;
2023
import java.util.function.LongSupplier;
2124

@@ -30,6 +33,8 @@ public final class Pipeline {
3033
public static final String ON_FAILURE_KEY = "on_failure";
3134
public static final String META_KEY = "_meta";
3235

36+
private static final Logger logger = LogManager.getLogger(Pipeline.class);
37+
3338
private final String id;
3439
@Nullable
3540
private final String description;
@@ -113,14 +118,26 @@ public static Pipeline create(
113118
*/
114119
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
115120
final long startTimeInNanos = relativeTimeProvider.getAsLong();
121+
/*
122+
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
123+
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
124+
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
125+
* is only executed once.
126+
*/
127+
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
116128
metrics.preIngest();
117129
compoundProcessor.execute(ingestDocument, (result, e) -> {
118-
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
119-
metrics.postIngest(ingestTimeInNanos);
120-
if (e != null) {
121-
metrics.ingestFailed();
130+
if (listenerHasBeenCalled.getAndSet(true)) {
131+
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
132+
assert false : "A listener was unexpectedly called more than once";
133+
} else {
134+
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
135+
metrics.postIngest(ingestTimeInNanos);
136+
if (e != null) {
137+
metrics.ingestFailed();
138+
}
139+
handler.accept(result, e);
122140
}
123-
handler.accept(result, e);
124141
});
125142
}
126143

server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public void testPostIngestDoubleDecrement() {
4242

4343
// the second postIngest triggers an assertion error
4444
expectThrows(AssertionError.class, () -> metric.postIngest(500000L));
45-
assertThat(-1L, equalTo(metric.createStats().getIngestCurrent()));
45+
// We never allow the reported ingestCurrent to be negative:
46+
assertThat(metric.createStats().getIngestCurrent(), equalTo(0L));
4647
}
4748

4849
}

0 commit comments

Comments
 (0)