diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 2162e44c5f..f6377ed961 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -829,6 +829,7 @@ private ProducerRecord interceptorProducerRecord(ProducerRecord prod return producerRecord; } + @SuppressWarnings("try") private Callback buildCallback(final ProducerRecord producerRecord, final Producer producer, final CompletableFuture> future, @Nullable Object sample, Observation observation) { @@ -841,10 +842,9 @@ private Callback buildCallback(final ProducerRecord producerRecord, final catch (Exception e) { this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); } - try { + try (Observation.Scope ignored = observation.openScope()) { if (exception == null) { successTimer(sample, producerRecord); - observation.stop(); future.complete(new SendResult<>(producerRecord, metadata)); if (this.producerListener != null) { this.producerListener.onSuccess(producerRecord, metadata); @@ -855,7 +855,6 @@ private Callback buildCallback(final ProducerRecord producerRecord, final else { failureTimer(sample, exception, producerRecord); observation.error(exception); - observation.stop(); future.completeExceptionally( new KafkaProducerException(producerRecord, "Failed to send", exception)); if (this.producerListener != null) { @@ -865,6 +864,7 @@ private Callback buildCallback(final ProducerRecord producerRecord, final } } finally { + observation.stop(); closeProducer(producer, this.transactional); } }; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index b5caf4e51e..45fa015ac7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -108,7 +109,14 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spanFromCallback = new AtomicReference<>(); + + template.send("observation.testT1", "test") + .thenAccept((sendResult) -> spanFromCallback.set(tracer.currentSpan())) + .get(10, TimeUnit.SECONDS); + + assertThat(spanFromCallback.get()).isNotNull(); + assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(listener.record).isNotNull(); Headers headers = listener.record.headers();