From aa283a389499b140ab9e9a46832b3162bf2d8527 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 19 Jun 2020 11:24:44 -0400 Subject: [PATCH] GH-1501: StreamsBuilderFB.Listener Improvement Resolves https://github.com/spring-projects/spring-kafka/issues/1501 - add support for multiple listeners - consistent with consumer/producer factories --- .../config/StreamsBuilderFactoryBean.java | 52 +++++++++++++------ .../config/KafkaStreamsCustomizerTests.java | 2 +- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index cfa10c97fe..fd49ad1e75 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -17,6 +17,9 @@ package org.springframework.kafka.config; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Properties; import org.apache.commons.logging.LogFactory; @@ -74,6 +77,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean listeners = new ArrayList<>(); + private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer() { }; @@ -97,8 +102,6 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean getListeners() { + return Collections.unmodifiableList(this.listeners); + } + + /** + * Add a {@link Listener} which will be called after starting and stopping the + * streams. + * @param listener the listener. + * @since 2.5.3 + */ + public void addListener(Listener listener) { + Assert.notNull(listener, "'listener' cannot be null"); + this.listeners.add(listener); + } + + /** + * Remove a listener. + * @param listener the listener. + * @return true if removed. + * @since 2.5.3 + */ + public boolean removeListener(Listener listener) { + return this.listeners.remove(listener); + } + @Override protected synchronized StreamsBuilder createInstance() { if (this.autoStartup) { @@ -297,7 +319,7 @@ public synchronized void start() { this.kafkaStreams.cleanUp(); } this.kafkaStreams.start(); - this.listener.streamsAdded(this.beanName, this.kafkaStreams); + this.listeners.forEach(listener -> listener.streamsAdded(this.beanName, this.kafkaStreams)); this.running = true; } catch (Exception e) { @@ -315,7 +337,7 @@ public synchronized void stop() { if (this.cleanupConfig.cleanupOnStop()) { this.kafkaStreams.cleanUp(); } - this.listener.streamsRemoved(this.beanName, this.kafkaStreams); + this.listeners.forEach(listener -> listener.streamsRemoved(this.beanName, this.kafkaStreams)); this.kafkaStreams = null; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java index 674ab20706..545c46a93e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java @@ -149,7 +149,7 @@ public void configureTopology(Topology topology) { } }); - streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry(), + streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry(), Collections.singletonList(new ImmutableTag("customTag", "stream")))); return streamsBuilderFactoryBean; }