Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
Expand All @@ -52,7 +53,8 @@
*
* @since 1.1.4
*/
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder> implements SmartLifecycle {
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder>
implements SmartLifecycle, BeanNameAware {

/**
* The default {@link Duration} of {@code 10 seconds} for close timeout.
Expand Down Expand Up @@ -95,6 +97,10 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde

private Topology topology;

private Listener listener = new Listener() { };

private String beanName;

/**
* Default constructor that creates the factory without configuration
* {@link Properties}. It is the factory user's responsibility to properly set
Expand Down Expand Up @@ -129,6 +135,11 @@ public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig) {
this(streamsConfig, new CleanupConfig());
}

@Override
public void setBeanName(String name) {
this.beanName = name;
}

/**
* Set the streams configuration {@link Properties} on this factory.
* @param streamsConfig the streams configuration.
Expand Down Expand Up @@ -183,8 +194,8 @@ public void setStateRestoreListener(StateRestoreListener stateRestoreListener) {
}

/**
* Specify the timeout in seconds for the {@link KafkaStreams#close(Duration)} operation.
* Defaults to {@link #DEFAULT_CLOSE_TIMEOUT} seconds.
* Specify the timeout in seconds for the {@link KafkaStreams#close(Duration)}
* operation. Defaults to {@link #DEFAULT_CLOSE_TIMEOUT} seconds.
* @param closeTimeout the timeout for close in seconds.
* @see KafkaStreams#close(Duration)
*/
Expand All @@ -193,7 +204,19 @@ public void setCloseTimeout(int closeTimeout) {
}

/**
* Providing access to the associated {@link Topology} of this {@link StreamsBuilderFactoryBean}.
* Set a {@link Listener} which will be called after starting and stopping the
* streams.
* @param listener the listener.
* @since 2.5.3
*/
public void setListener(Listener listener) {
Assert.notNull(listener, "'listener' cannot be null");
this.listener = listener;
}

/**
* Providing access to the associated {@link Topology} of this
* {@link StreamsBuilderFactoryBean}.
* @return {@link Topology} object
* @since 2.4.4
*/
Expand Down Expand Up @@ -274,6 +297,7 @@ public synchronized void start() {
this.kafkaStreams.cleanUp();
}
this.kafkaStreams.start();
this.listener.streamsAdded(this.beanName, this.kafkaStreams);
this.running = true;
}
catch (Exception e) {
Expand All @@ -291,6 +315,7 @@ public synchronized void stop() {
if (this.cleanupConfig.cleanupOnStop()) {
this.kafkaStreams.cleanUp();
}
this.listener.streamsRemoved(this.beanName, this.kafkaStreams);
this.kafkaStreams = null;
}
}
Expand All @@ -308,4 +333,30 @@ public synchronized boolean isRunning() {
return this.running;
}

/**
* Called whenever a {@link KafkaStreams} is added or removed.
*
* @since 2.5.3
*
*/
public interface Listener {

/**
* A new {@link KafkaStreams} was created.
* @param id the streams id (factory bean name).
* @param streams the streams;
*/
default void streamsAdded(String id, KafkaStreams streams) {
}

/**
* An existing {@link KafkaStreams} was removed.
* @param id the streams id (factory bean name).
* @param streams the streams;
*/
default void streamsRemoved(String id, KafkaStreams streams) {
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.streams;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.streams.KafkaStreams;

import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;

/**
* Creates a {@link KafkaStreamsMetrics} for the {@link KafkaStreams}.
*
* @author Gary Russell
* @since 2.5.3
*
*/
public class KafkaStreamsMicrometerListener implements StreamsBuilderFactoryBean.Listener {

private final MeterRegistry meterRegistry;

private final List<Tag> tags;

private final Map<String, KafkaStreamsMetrics> metrics = new HashMap<>();

/**
* Construct an instance with the provided registry.
* @param meterRegistry the registry.
*/
public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry) {
this(meterRegistry, Collections.emptyList());
}

/**
* Construct an instance with the provided registry and tags.
* @param meterRegistry the registry.
* @param tags the tags.
*/
public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List<Tag> tags) {
this.meterRegistry = meterRegistry;
this.tags = tags;
}


@Override
public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) {
if (!this.metrics.containsKey(id)) {
List<Tag> streamsTags = new ArrayList<>(this.tags);
streamsTags.add(new ImmutableTag("spring.id", id));
this.metrics.put(id, new KafkaStreamsMetrics(kafkaStreams, streamsTags));
this.metrics.get(id).bindTo(this.meterRegistry);
}
}

@Override
public synchronized void streamsRemoved(String id, KafkaStreams streams) {
KafkaStreamsMetrics removed = this.metrics.remove(id);
if (removed != null) {
removed.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.streams.KafkaStreamsMicrometerListener;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

/**
* @author Nurettin Yilmaz
* @author Artem Bilan
Expand All @@ -71,6 +76,9 @@ public class KafkaStreamsCustomizerTests {
@Autowired
private KafkaStreamsConfig config;

@Autowired
private MeterRegistry meterRegistry;

@Test
public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration configuration,
@Autowired KafkaStreamsConfig config) {
Expand All @@ -86,6 +94,18 @@ public void testKafkaStreamsCustomizer(@Autowired KafkaStreamsConfiguration conf
.isEqualTo(1000);
assertThat(this.config.builderConfigured.get()).isTrue();
assertThat(this.config.topologyConfigured.get()).isTrue();
assertThat(this.meterRegistry.get("kafka.consumer.coordinator.join.total")
.tag("customTag", "stream")
.tag("spring.id", KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
.functionCounter()
.count())
.isGreaterThanOrEqualTo(0);
assertThat(this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "stream")
.tag("spring.id", KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
.functionCounter()
.count())
.isGreaterThanOrEqualTo(0);
}

@Configuration
Expand All @@ -100,6 +120,12 @@ public static class KafkaStreamsConfig {
@Autowired
EmbeddedKafkaBroker broker;

@SuppressWarnings("unchecked")
@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kStreamsConfigs());
Expand All @@ -123,6 +149,8 @@ public void configureTopology(Topology topology) {
}

});
streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "stream"))));
return streamsBuilderFactoryBean;
}

Expand Down
4 changes: 3 additions & 1 deletion src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2940,14 +2940,16 @@ The consumer/producer `id` passed to the listener is added to the meter's tags w
.An example of obtaining one of the Kafka metrics
[source, java]
----
double count =this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
----
====

A similar listener is provided for the `StreamsBuilderFactoryBean` - see <<streams-micrometer>>.

[[transactions]]
==== Transactions

Expand Down
13 changes: 13 additions & 0 deletions src/reference/asciidoc/streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ Default no-op implementations are provided to avoid having to implement both met

A `CompositeKafkaStreamsInfrastructureCustomizer` is provided, for when you need to apply multiple customizers.

[[streams-micrometer]]
==== KafkaStreams Micrometer Support

Introduced in version 2.5.3, you can configure a `KafkaStreamsMicrometerListener` to automatically register micrometer meters for the `KafkaStreams` object managed by the factory bean:

====
[source, java]
----
streamsBuilderFactoryBean.setListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
----
====

[[serde]]
==== Streams JSON Serialization and Deserialization

Expand Down
7 changes: 7 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ See <<factory-listeners>> for more information.
You can now change bootstrap server properties at runtime, enabling failover to another Kafka cluster.
See <<connecting>> for more information.

[[x25-streams-listeners]]
==== `StreamsBuilderFactoryBean` Changes

The factory bean can now invoke a callback whenever a `KafkaStreams` created or destroyed.
An Implementation for native Micrometer metrics is provided.
See <<streams-micrometer>> for more information.

[[x25-kafka-client]]
==== Kafka Client Version

Expand Down