diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index a103027a97..ae1b0f27ca 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.context.SmartLifecycle; import org.springframework.core.log.LogAccessor; @@ -52,7 +53,8 @@ * * @since 1.1.4 */ -public class StreamsBuilderFactoryBean extends AbstractFactoryBean implements SmartLifecycle { +public class StreamsBuilderFactoryBean extends AbstractFactoryBean + implements SmartLifecycle, BeanNameAware { /** * The default {@link Duration} of {@code 10 seconds} for close timeout. @@ -95,6 +97,10 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean tags; + + private final Map metrics = new HashMap<>(); + + /** + * Construct an instance with the provided registry. + * @param meterRegistry the registry. + */ + public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry) { + this(meterRegistry, Collections.emptyList()); + } + + /** + * Construct an instance with the provided registry and tags. + * @param meterRegistry the registry. + * @param tags the tags. + */ + public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List tags) { + this.meterRegistry = meterRegistry; + this.tags = tags; + } + + + @Override + public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) { + if (!this.metrics.containsKey(id)) { + List streamsTags = new ArrayList<>(this.tags); + streamsTags.add(new ImmutableTag("spring.id", id)); + this.metrics.put(id, new KafkaStreamsMetrics(kafkaStreams, streamsTags)); + this.metrics.get(id).bindTo(this.meterRegistry); + } + } + + @Override + public synchronized void streamsRemoved(String id, KafkaStreams streams) { + KafkaStreamsMetrics removed = this.metrics.remove(id); + if (removed != null) { + removed.close(); + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java index 5b7b94fe00..674ab20706 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java @@ -45,11 +45,16 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import io.micrometer.core.instrument.ImmutableTag; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + /** * @author Nurettin Yilmaz * @author Artem Bilan @@ -71,6 +76,9 @@ public class KafkaStreamsCustomizerTests { @Autowired private KafkaStreamsConfig config; + @Autowired + private MeterRegistry meterRegistry; + @Test public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration configuration, @Autowired KafkaStreamsConfig config) { @@ -86,6 +94,18 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf .isEqualTo(1000); assertThat(this.config.builderConfigured.get()).isTrue(); assertThat(this.config.topologyConfigured.get()).isTrue(); + assertThat(this.meterRegistry.get("kafka.consumer.coordinator.join.total") + .tag("customTag", "stream") + .tag("spring.id", KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) + .functionCounter() + .count()) + .isGreaterThanOrEqualTo(0); + assertThat(this.meterRegistry.get("kafka.producer.node.incoming.byte.total") + .tag("customTag", "stream") + .tag("spring.id", KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) + .functionCounter() + .count()) + .isGreaterThanOrEqualTo(0); } @Configuration @@ -100,6 +120,12 @@ public static class KafkaStreamsConfig { @Autowired EmbeddedKafkaBroker broker; + @SuppressWarnings("unchecked") + @Bean + public MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() { StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kStreamsConfigs()); @@ -123,6 +149,8 @@ public void configureTopology(Topology topology) { } }); + streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry(), + Collections.singletonList(new ImmutableTag("customTag", "stream")))); return streamsBuilderFactoryBean; } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 53555ba3b8..dca28168fc 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -2940,7 +2940,7 @@ The consumer/producer `id` passed to the listener is added to the meter's tags w .An example of obtaining one of the Kafka metrics [source, java] ---- -double count =this.meterRegistry.get("kafka.producer.node.incoming.byte.total") +double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total") .tag("customTag", "customTagValue") .tag("spring.id", "myProducerFactory.myClientId-1") .functionCounter() @@ -2948,6 +2948,8 @@ double count =this.meterRegistry.get("kafka.producer.node.incoming.byte.total") ---- ==== +A similar listener is provided for the `StreamsBuilderFactoryBean` - see <>. + [[transactions]] ==== Transactions diff --git a/src/reference/asciidoc/streams.adoc b/src/reference/asciidoc/streams.adoc index 12172f36e9..f339bfebd5 100644 --- a/src/reference/asciidoc/streams.adoc +++ b/src/reference/asciidoc/streams.adoc @@ -141,6 +141,19 @@ Default no-op implementations are provided to avoid having to implement both met A `CompositeKafkaStreamsInfrastructureCustomizer` is provided, for when you need to apply multiple customizers. +[[streams-micrometer]] +==== KafkaStreams Micrometer Support + +Introduced in version 2.5.3, you can configure a `KafkaStreamsMicrometerListener` to automatically register micrometer meters for the `KafkaStreams` object managed by the factory bean: + +==== +[source, java] +---- +streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry, + Collections.singletonList(new ImmutableTag("customTag", "customTagValue")))); +---- +==== + [[serde]] ==== Streams JSON Serialization and Deserialization diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index ef239046a7..52348c69d6 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -15,6 +15,13 @@ See <> for more information. You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster. See <> for more information. +[[x25-streams-listeners]] +==== `StreamsBuilderFactoryBean` Changes + +The factory bean can now invoke a callback whenever a `KafkaStreams` created or destroyed. +An Implementation for native Micrometer metrics is provided. +See <> for more information. + [[x25-kafka-client]] ==== Kafka Client Version