From b8370cceaa72b3b6dfc15a9915de0ef08e74776b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Sun, 14 Jun 2020 01:32:12 -0500 Subject: [PATCH 1/3] Add micrometer support for kafka streams See gh-21890 --- .../build.gradle | 1 + .../KafkaMetricsAutoConfiguration.java | 15 ++++++++ .../KafkaMetricsAutoConfigurationTests.java | 38 +++++++++++++++++++ ...aStreamsAnnotationDrivenConfiguration.java | 6 ++- .../StreamsBuilderFactoryBeanCustomizer.java | 36 ++++++++++++++++++ .../spring-boot-dependencies/build.gradle | 2 +- 6 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle b/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle index eb50381e06d7..f49bc9fe0209 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle @@ -67,6 +67,7 @@ dependencies { optional("org.apache.activemq:activemq-broker") optional("org.apache.commons:commons-dbcp2") optional("org.apache.kafka:kafka-clients") + optional("org.apache.kafka:kafka-streams") optional("org.apache.tomcat.embed:tomcat-embed-core") optional("org.apache.tomcat.embed:tomcat-embed-el") optional("org.apache.tomcat:tomcat-jdbc") diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java index fbde6c33ac92..aea06f9fc1c5 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java @@ -18,6 +18,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; +import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; @@ -26,13 +27,16 @@ import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.StreamsBuilderFactoryBeanCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.MicrometerConsumerListener; import org.springframework.kafka.core.MicrometerProducerListener; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; /** * Auto-configuration for Kafka metrics. @@ -66,4 +70,15 @@ private void addListener(DefaultKafkaProducerFactory factory, Meter factory.addListener(new MicrometerProducerListener<>(meterRegistry)); } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass({ KafkaStreamsMetrics.class, StreamsBuilderFactoryBean.class }) + static class KafkaStreamsMetricsAutoConfiguration { + + @Bean + public StreamsBuilderFactoryBeanCustomizer kafkaStreamsProducerMetrics(MeterRegistry meterRegistry) { + return (factoryBean) -> factoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry)); + } + + } + } diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java index d15918ea9114..874dcda1b169 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java @@ -22,10 +22,15 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.MicrometerConsumerListener; import org.springframework.kafka.core.MicrometerProducerListener; +import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -34,6 +39,7 @@ * * @author Andy Wilkinson * @author Stephane Nicoll + * @author Eddú Meléndez */ class KafkaMetricsAutoConfigurationTests { @@ -61,4 +67,36 @@ void whenThereIsNoMeterRegistryThenListenerCustomizationBacksOff() { }); } + @Test + void whenKafkaStreamsIsEnabledAndThereIsAMeterRegistryThenMetricsListenersAreAdded() { + this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)) + .withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app").with(MetricsRun.simple()).run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + StreamsBuilderFactoryBean.Listener listener = (StreamsBuilderFactoryBean.Listener) ReflectionTestUtils + .getField(streamsBuilderFactoryBean, "listener"); + assertThat(listener).isInstanceOf(KafkaStreamsMicrometerListener.class); + }); + } + + @Test + void whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff() { + this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)) + .withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app").run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + StreamsBuilderFactoryBean.Listener listener = (StreamsBuilderFactoryBean.Listener) ReflectionTestUtils + .getField(streamsBuilderFactoryBean, "listener"); + assertThat(listener).isNotInstanceOf(KafkaStreamsMicrometerListener.class); + }); + } + + @Configuration(proxyBeanMethods = false) + @EnableKafkaStreams + static class EnableKafkaStreamsConfiguration { + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 06f22c7e0b62..d052adddc08e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -39,6 +40,7 @@ * * @author Gary Russell * @author Stephane Nicoll + * @author Eddú Meléndez */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamsBuilder.class) @@ -68,7 +70,9 @@ KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) { @Bean KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( - @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean) { + @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean, + ObjectProvider customizers) { + customizers.orderedStream().forEach((customizer) -> customizer.customize(factoryBean)); return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java new file mode 100644 index 000000000000..789eb846d736 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import org.springframework.kafka.config.StreamsBuilderFactoryBean; + +/** + * Callback interface for customizing {@code StreamsBuilderFactoryBean} beans. + * + * @author Eddú Meléndez + * @since 2.3.2 + */ +@FunctionalInterface +public interface StreamsBuilderFactoryBeanCustomizer { + + /** + * Customize the {@link StreamsBuilderFactoryBean}. + * @param factoryBean the factory bean to customize + */ + void customize(StreamsBuilderFactoryBean factoryBean); + +} diff --git a/spring-boot-project/spring-boot-dependencies/build.gradle b/spring-boot-project/spring-boot-dependencies/build.gradle index 1f2a4f38b388..8725412d0dd5 100644 --- a/spring-boot-project/spring-boot-dependencies/build.gradle +++ b/spring-boot-project/spring-boot-dependencies/build.gradle @@ -1707,7 +1707,7 @@ bom { ] } } - library("Spring Kafka", "2.5.2.RELEASE") { + library("Spring Kafka", "2.5.3.BUILD-SNAPSHOT") { group("org.springframework.kafka") { modules = [ "spring-kafka", From 3a85509ba88cb9b2b76fe8d83547e314fa3aea74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Fri, 19 Jun 2020 12:08:37 -0500 Subject: [PATCH 2/3] Fix checkstyle --- .../autoconfigure/metrics/KafkaMetricsAutoConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java index aea06f9fc1c5..90a88ca809a9 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java @@ -75,7 +75,7 @@ private void addListener(DefaultKafkaProducerFactory factory, Meter static class KafkaStreamsMetricsAutoConfiguration { @Bean - public StreamsBuilderFactoryBeanCustomizer kafkaStreamsProducerMetrics(MeterRegistry meterRegistry) { + StreamsBuilderFactoryBeanCustomizer kafkaStreamsProducerMetrics(MeterRegistry meterRegistry) { return (factoryBean) -> factoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry)); } From a64be978125cf5c925aa5ddad50886f1e185ecd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Fri, 19 Jun 2020 12:18:42 -0500 Subject: [PATCH 3/3] Align with spring-kafka changes --- .../metrics/KafkaMetricsAutoConfiguration.java | 3 ++- .../metrics/KafkaMetricsAutoConfigurationTests.java | 10 +++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java index 90a88ca809a9..d200498fda77 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java @@ -43,6 +43,7 @@ * * @author Andy Wilkinson * @author Stephane Nicoll + * @author Eddú Meléndez * @since 2.1.0 */ @Configuration(proxyBeanMethods = false) @@ -76,7 +77,7 @@ static class KafkaStreamsMetricsAutoConfiguration { @Bean StreamsBuilderFactoryBeanCustomizer kafkaStreamsProducerMetrics(MeterRegistry meterRegistry) { - return (factoryBean) -> factoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry)); + return (factoryBean) -> factoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry)); } } diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java index 874dcda1b169..11c14b605663 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java @@ -30,7 +30,6 @@ import org.springframework.kafka.core.MicrometerConsumerListener; import org.springframework.kafka.core.MicrometerProducerListener; import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; -import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -74,9 +73,8 @@ void whenKafkaStreamsIsEnabledAndThereIsAMeterRegistryThenMetricsListenersAreAdd .withPropertyValues("spring.application.name=my-test-app").with(MetricsRun.simple()).run((context) -> { StreamsBuilderFactoryBean streamsBuilderFactoryBean = context .getBean(StreamsBuilderFactoryBean.class); - StreamsBuilderFactoryBean.Listener listener = (StreamsBuilderFactoryBean.Listener) ReflectionTestUtils - .getField(streamsBuilderFactoryBean, "listener"); - assertThat(listener).isInstanceOf(KafkaStreamsMicrometerListener.class); + assertThat(streamsBuilderFactoryBean.getListeners()).hasSize(1) + .hasOnlyElementsOfTypes(KafkaStreamsMicrometerListener.class); }); } @@ -87,9 +85,7 @@ void whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomization .withPropertyValues("spring.application.name=my-test-app").run((context) -> { StreamsBuilderFactoryBean streamsBuilderFactoryBean = context .getBean(StreamsBuilderFactoryBean.class); - StreamsBuilderFactoryBean.Listener listener = (StreamsBuilderFactoryBean.Listener) ReflectionTestUtils - .getField(streamsBuilderFactoryBean, "listener"); - assertThat(listener).isNotInstanceOf(KafkaStreamsMicrometerListener.class); + assertThat(streamsBuilderFactoryBean.getListeners()).isEmpty(); }); }