Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.springframework.kafka.config;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -74,6 +77,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde

private final CleanupConfig cleanupConfig;

private final List<Listener> listeners = new ArrayList<>();

private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer() {
};

Expand All @@ -97,8 +102,6 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde

private Topology topology;

private Listener listener = new Listener() { };

private String beanName;

/**
Expand Down Expand Up @@ -203,17 +206,6 @@ public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
}

/**
* Set a {@link Listener} which will be called after starting and stopping the
* streams.
* @param listener the listener.
* @since 2.5.3
*/
public void setListener(Listener listener) {
Assert.notNull(listener, "'listener' cannot be null");
this.listener = listener;
}

/**
* Providing access to the associated {@link Topology} of this
* {@link StreamsBuilderFactoryBean}.
Expand Down Expand Up @@ -252,6 +244,36 @@ public KafkaStreams getKafkaStreams() {
return this.kafkaStreams;
}

/**
* Get the current list of listeners.
* @return the listeners.
* @since 2.5.3
*/
public List<Listener> getListeners() {
return Collections.unmodifiableList(this.listeners);
}

/**
* Add a {@link Listener} which will be called after starting and stopping the
* streams.
* @param listener the listener.
* @since 2.5.3
*/
public void addListener(Listener listener) {
Assert.notNull(listener, "'listener' cannot be null");
this.listeners.add(listener);
}

/**
* Remove a listener.
* @param listener the listener.
* @return true if removed.
* @since 2.5.3
*/
public boolean removeListener(Listener listener) {
return this.listeners.remove(listener);
}

@Override
protected synchronized StreamsBuilder createInstance() {
if (this.autoStartup) {
Expand Down Expand Up @@ -297,7 +319,7 @@ public synchronized void start() {
this.kafkaStreams.cleanUp();
}
this.kafkaStreams.start();
this.listener.streamsAdded(this.beanName, this.kafkaStreams);
this.listeners.forEach(listener -> listener.streamsAdded(this.beanName, this.kafkaStreams));
this.running = true;
}
catch (Exception e) {
Expand All @@ -315,7 +337,7 @@ public synchronized void stop() {
if (this.cleanupConfig.cleanupOnStop()) {
this.kafkaStreams.cleanUp();
}
this.listener.streamsRemoved(this.beanName, this.kafkaStreams);
this.listeners.forEach(listener -> listener.streamsRemoved(this.beanName, this.kafkaStreams));
this.kafkaStreams = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void configureTopology(Topology topology) {
}

});
streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry(),
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "stream"))));
return streamsBuilderFactoryBean;
}
Expand Down