Skip to content

Commit cfc9568

Browse files
committed
GH-10431: Propagate observation to error handler
Fixes: #10431 The `SourcePollingChannelAdapter` is able to start an observation for a just received message from a source. This observation is completed when the polling task is done (including transaction). However, a thrown from downstream error goes to the `errorChannel` already after finishing the mentioned observation (even if that includes error). According to end-user experience, the error handling must be a part of the same trace started for the received message. * Fix `AbstractPollingEndpoint` to perform `donePollingTask()` only for a successful message reception * Fix `SourcePollingChannelAdapter` to propagate its `ObservationRegistry` to the `ErrorHandlingTaskExecutor` * Introduce an `Observation` handling in the `ErrorHandlingTaskExecutor`. Essentially, stop currently on scope `Observation` after error handling for th running task failure * Prove the feature with a new `SourcePollingChannelAdapterErrorObservationTests` **Cherry-pick to `6.5.x`**
1 parent a4ca5c0 commit cfc9568

File tree

4 files changed

+195
-30
lines changed

4 files changed

+195
-30
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ public void setAdviceChain(@Nullable List<Advice> adviceChain) {
156156
* A negative number means retrieve unlimited messages until the {@code MessageSource} returns {@code null}.
157157
* Zero means do not poll for any records -
158158
* it can be considered as pausing if 'maxMessagesPerPoll' is later changed to a non-zero value.
159-
* The polling cycle may exit earlier if the source returns null for the current receive call.
160-
* @param maxMessagesPerPoll the number of message to poll per schedule.
159+
* The polling cycle may exit earlier if the source returns null for the current {@code receive} call.
160+
* @param maxMessagesPerPoll the number of messages to poll per schedule.
161161
*/
162162
@ManagedAttribute
163163
public void setMaxMessagesPerPoll(long maxMessagesPerPoll) {
@@ -184,14 +184,14 @@ public void setTransactionSynchronizationFactory(
184184
}
185185

186186
/**
187-
* Return the default error channel if the error handler is explicitly provided and
188-
* it is a {@link MessagePublishingErrorHandler}.
187+
* Return the default error channel if the error handler is explicitly provided,
188+
* and it is a {@link MessagePublishingErrorHandler}.
189189
* @return the channel or null.
190190
* @since 4.3
191191
*/
192192
public @Nullable MessageChannel getDefaultErrorChannel() {
193-
if (!this.errorHandlerIsDefault && this.errorHandler
194-
instanceof MessagePublishingErrorHandler messagePublishingErrorHandler) {
193+
if (!this.errorHandlerIsDefault &&
194+
this.errorHandler instanceof MessagePublishingErrorHandler messagePublishingErrorHandler) {
195195

196196
return messagePublishingErrorHandler.getDefaultErrorChannel();
197197
}
@@ -333,11 +333,9 @@ protected void doStart() {
333333
List<Advice> advices = this.adviceChain;
334334
if (!CollectionUtils.isEmpty(advices)) {
335335
ProxyFactory proxyFactory = new ProxyFactory(task);
336-
if (!CollectionUtils.isEmpty(advices)) {
337-
advices.stream()
338-
.filter(advice -> !isReceiveOnlyAdvice(advice))
339-
.forEach(proxyFactory::addAdvice);
340-
}
336+
advices.stream()
337+
.filter(advice -> !isReceiveOnlyAdvice(advice))
338+
.forEach(proxyFactory::addAdvice);
341339
task = (Callable<@Nullable Message<?>>) proxyFactory.getProxy(this.beanClassLoader);
342340
}
343341
if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) {
@@ -419,12 +417,12 @@ private Flux<Message<?>> createFluxGenerator() {
419417
}
420418

421419
private @Nullable Message<?> pollForMessage() {
422-
Exception pollingTaskError = null;
423420
try {
424-
return this.pollingTask.call();
421+
Message<?> message = this.pollingTask.call();
422+
donePollingTask(message);
423+
return message;
425424
}
426425
catch (Exception ex) {
427-
pollingTaskError = ex;
428426
if (ex instanceof MessagingException messagingException) {
429427
throw messagingException;
430428
}
@@ -451,7 +449,6 @@ private Flux<Message<?>> createFluxGenerator() {
451449
TransactionSynchronizationManager.unbindResource(resource);
452450
}
453451
}
454-
donePollingTask(pollingTaskError);
455452
}
456453
}
457454

@@ -501,7 +498,12 @@ protected void messageReceived(@Nullable IntegrationResourceHolder holder, Messa
501498
}
502499
}
503500

504-
protected void donePollingTask(@Nullable Exception pollingTaskError) {
501+
/**
502+
* The callback of a received message (if any) after the polling task is done.
503+
* If a transaction is enabled, it is committed at this point.
504+
* @param message the message result from the polling task.
505+
*/
506+
protected void donePollingTask(@Nullable Message<?> message) {
505507

506508
}
507509

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.integration.support.management.observation.MessageReceiverContext;
4545
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
4646
import org.springframework.integration.transaction.IntegrationResourceHolder;
47+
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
4748
import org.springframework.messaging.Message;
4849
import org.springframework.messaging.MessageChannel;
4950
import org.springframework.messaging.MessagingException;
@@ -141,6 +142,12 @@ public void setShouldTrack(boolean shouldTrack) {
141142
@Override
142143
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
143144
this.observationRegistry = observationRegistry;
145+
if (isObserved()) {
146+
ErrorHandlingTaskExecutor taskExecutor = (ErrorHandlingTaskExecutor) getTaskExecutor();
147+
if (taskExecutor.getObservationRegistry() == null) {
148+
taskExecutor.setObservationRegistry(observationRegistry);
149+
}
150+
}
144151
}
145152

146153
/**
@@ -263,31 +270,30 @@ protected void handleMessage(Message<?> messageArg) {
263270
@Override
264271
protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message<?> message) {
265272
if (isObserved()) {
266-
Observation observation =
267-
IntegrationObservation.HANDLER.observation(this.observationConvention,
273+
// Cannot use 'Observation.observe()' API
274+
// since transaction needs to be aware of the message
275+
// and error handling happens in the caller of 'doPoll()' - 'ErrorHandler'
276+
IntegrationObservation.HANDLER.observation(this.observationConvention,
268277
DefaultMessageReceiverObservationConvention.INSTANCE,
269278
() -> new MessageReceiverContext(message, getComponentName(), "message-source"),
270-
this.observationRegistry);
271-
272-
observation.start().openScope();
279+
this.observationRegistry)
280+
.start()
281+
.openScope();
273282
}
283+
274284
super.messageReceived(holder, message);
275285
}
276286

277287
/**
278288
* Stop an observation (and close its scope) previously started
279289
* from the {@link #messageReceived(IntegrationResourceHolder, Message)}.
280-
* @param pollingTaskError an optional error as a result of the polling task.
281290
*/
282291
@Override
283-
protected void donePollingTask(@Nullable Exception pollingTaskError) {
292+
protected void donePollingTask(@Nullable Message<?> message) {
284293
Observation.Scope currentObservationScope = this.observationRegistry.getCurrentObservationScope();
285294
if (currentObservationScope != null) {
286295
currentObservationScope.close();
287296
Observation currentObservation = currentObservationScope.getCurrentObservation();
288-
if (pollingTaskError != null) {
289-
currentObservation.error(pollingTaskError);
290-
}
291297
currentObservation.stop();
292298
}
293299
}
@@ -302,8 +308,7 @@ protected String getResourceKey() {
302308
return IntegrationResourceHolder.MESSAGE_SOURCE;
303309
}
304310

305-
@Nullable
306-
private static Object extractProxyTarget(@Nullable Object target) {
311+
private static @Nullable Object extractProxyTarget(@Nullable Object target) {
307312
if (!(target instanceof Advised advised)) {
308313
return target;
309314
}

spring-integration-core/src/main/java/org/springframework/integration/util/ErrorHandlingTaskExecutor.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
import java.util.concurrent.Executor;
2020

21+
import io.micrometer.observation.Observation;
22+
import io.micrometer.observation.ObservationRegistry;
23+
import org.jspecify.annotations.Nullable;
24+
2125
import org.springframework.core.task.SyncTaskExecutor;
2226
import org.springframework.core.task.TaskExecutor;
2327
import org.springframework.util.Assert;
@@ -27,24 +31,43 @@
2731
* A {@link TaskExecutor} implementation that wraps an existing Executor
2832
* instance in order to catch any exceptions. If an exception is thrown, it
2933
* will be handled by the provided {@link ErrorHandler}.
34+
* <p>
35+
* If an {@link ObservationRegistry} is provided, the current observation in scope
36+
* will be closed after error handling.
3037
*
3138
* @author Jonas Partner
3239
* @author Mark Fisher
3340
* @author Gary Russell
41+
* @author Artem Bilan
3442
*/
3543
public class ErrorHandlingTaskExecutor implements TaskExecutor {
3644

3745
private final Executor executor;
3846

3947
private final ErrorHandler errorHandler;
4048

49+
private @Nullable ObservationRegistry observationRegistry;
50+
4151
public ErrorHandlingTaskExecutor(Executor executor, ErrorHandler errorHandler) {
4252
Assert.notNull(executor, "executor must not be null");
4353
Assert.notNull(errorHandler, "errorHandler must not be null");
4454
this.executor = executor;
4555
this.errorHandler = errorHandler;
4656
}
4757

58+
/**
59+
* Set an {@link ObservationRegistry} to close current observation in scope after error handling.
60+
* @param observationRegistry the {@link ObservationRegistry} to use.
61+
* @since 6.5.3
62+
*/
63+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
64+
this.observationRegistry = observationRegistry;
65+
}
66+
67+
public @Nullable ObservationRegistry getObservationRegistry() {
68+
return this.observationRegistry;
69+
}
70+
4871
public boolean isSyncExecutor() {
4972
return this.executor instanceof SyncTaskExecutor;
5073
}
@@ -55,8 +78,21 @@ public void execute(final Runnable task) {
5578
try {
5679
task.run();
5780
}
58-
catch (Throwable t) { //NOSONAR
59-
ErrorHandlingTaskExecutor.this.errorHandler.handleError(t);
81+
catch (Throwable throwable) {
82+
try {
83+
ErrorHandlingTaskExecutor.this.errorHandler.handleError(throwable);
84+
}
85+
finally {
86+
if (this.observationRegistry != null) {
87+
var currentObservationScope = this.observationRegistry.getCurrentObservationScope();
88+
if (currentObservationScope != null) {
89+
currentObservationScope.close();
90+
Observation currentObservation = currentObservationScope.getCurrentObservation();
91+
currentObservation.error(throwable);
92+
currentObservation.stop();
93+
}
94+
}
95+
}
6096
}
6197
});
6298
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.management.observation;
18+
19+
import io.micrometer.observation.ObservationRegistry;
20+
import io.micrometer.tracing.Span;
21+
import io.micrometer.tracing.test.SampleTestRunner;
22+
import io.micrometer.tracing.test.simple.SpansAssert;
23+
24+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
25+
import org.springframework.context.annotation.Bean;
26+
import org.springframework.context.annotation.Configuration;
27+
import org.springframework.integration.annotation.BridgeTo;
28+
import org.springframework.integration.annotation.EndpointId;
29+
import org.springframework.integration.annotation.InboundChannelAdapter;
30+
import org.springframework.integration.annotation.Poller;
31+
import org.springframework.integration.annotation.ServiceActivator;
32+
import org.springframework.integration.channel.DirectChannel;
33+
import org.springframework.integration.config.EnableIntegration;
34+
import org.springframework.integration.config.EnableIntegrationManagement;
35+
import org.springframework.integration.scheduling.PollerMetadata;
36+
import org.springframework.integration.test.util.OnlyOnceTrigger;
37+
38+
import static org.assertj.core.api.Assertions.assertThat;
39+
import static org.awaitility.Awaitility.await;
40+
41+
/**
42+
* @author Artem Bilan
43+
*
44+
* @since 6.5.3
45+
*/
46+
public class SourcePollingChannelAdapterErrorObservationTests extends SampleTestRunner {
47+
48+
@Override
49+
public TracingSetup[] getTracingSetup() {
50+
return new TracingSetup[] {TracingSetup.IN_MEMORY_BRAVE};
51+
}
52+
53+
@Override
54+
public SampleTestRunnerConsumer yourCode() {
55+
return (bb, meterRegistry) -> {
56+
ObservationRegistry observationRegistry = getObservationRegistry();
57+
58+
try (AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext()) {
59+
applicationContext.registerBean(ObservationRegistry.class, () -> observationRegistry);
60+
applicationContext.register(ObservationIntegrationTestConfiguration.class);
61+
applicationContext.refresh();
62+
}
63+
64+
await().untilAsserted(() -> assertThat(bb.getFinishedSpans()).hasSize(5));
65+
66+
SpansAssert.assertThat(bb.getFinishedSpans())
67+
.haveSameTraceId()
68+
.hasASpanWithName("dataMessageSource receive", spanAssert -> spanAssert
69+
.hasTag(IntegrationObservation.GatewayTags.COMPONENT_TYPE.asString(), "message-source")
70+
.hasKindEqualTo(Span.Kind.CONSUMER))
71+
.hasASpanWithName("inputChannel send", spanAssert -> spanAssert
72+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "inputChannel")
73+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
74+
.hasKindEqualTo(Span.Kind.PRODUCER))
75+
.hasASpanWithName("dataHandler receive", spanAssert -> spanAssert
76+
.hasTag(IntegrationObservation.HandlerTags.COMPONENT_NAME.asString(), "dataHandler")
77+
.hasTag(IntegrationObservation.HandlerTags.COMPONENT_TYPE.asString(), "handler")
78+
.hasKindEqualTo(Span.Kind.CONSUMER))
79+
.hasASpanWithName("errorChannel send", spanAssert -> spanAssert
80+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "errorChannel")
81+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
82+
.hasKindEqualTo(Span.Kind.PRODUCER))
83+
.hasASpanWithName("errorChannel.bridgeTo receive", spanAssert -> spanAssert
84+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "errorChannel.bridgeTo")
85+
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "handler")
86+
.hasKindEqualTo(Span.Kind.CONSUMER));
87+
};
88+
}
89+
90+
@Configuration
91+
@EnableIntegration
92+
@EnableIntegrationManagement(observationPatterns = "*")
93+
static class ObservationIntegrationTestConfiguration {
94+
95+
@Bean
96+
PollerMetadata pollerMetadata() {
97+
PollerMetadata pollerMetadata = new PollerMetadata();
98+
pollerMetadata.setTrigger(new OnlyOnceTrigger());
99+
return pollerMetadata;
100+
}
101+
102+
@EndpointId("dataMessageSource")
103+
@InboundChannelAdapter(channel = "inputChannel", poller = @Poller("pollerMetadata"))
104+
String errorData() {
105+
return "some data";
106+
}
107+
108+
@EndpointId("dataHandler")
109+
@ServiceActivator(inputChannel = "inputChannel")
110+
void processData(String data) {
111+
throw new RuntimeException("intentional");
112+
}
113+
114+
@Bean
115+
@BridgeTo("nullChannel")
116+
DirectChannel errorChannel() {
117+
return new DirectChannel();
118+
}
119+
120+
}
121+
122+
}

0 commit comments

Comments
 (0)