Skip to content

Commit f1e48f4

Browse files
GH-3020: Create topology before Kafka Streams start
Fixes: #3020 * Create topology before starting Kafka Streams * Use SmartInitializingSingleton to build topology before starting Kafka Streams * Fix failed tests and fix check style * Add author
1 parent 739679e commit f1e48f4

File tree

3 files changed

+63
-11
lines changed

3 files changed

+63
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
3434

3535
import org.springframework.beans.factory.BeanNameAware;
36+
import org.springframework.beans.factory.SmartInitializingSingleton;
3637
import org.springframework.beans.factory.config.AbstractFactoryBean;
3738
import org.springframework.context.SmartLifecycle;
3839
import org.springframework.core.log.LogAccessor;
@@ -56,11 +57,12 @@
5657
* @author Denis Washington
5758
* @author Gary Russell
5859
* @author Julien Wittouck
60+
* @author Sanghyeok An
5961
*
6062
* @since 1.1.4
6163
*/
6264
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder>
63-
implements SmartLifecycle, BeanNameAware {
65+
implements SmartLifecycle, BeanNameAware, SmartInitializingSingleton {
6466

6567
/**
6668
* The default {@link Duration} of {@code 10 seconds} for close timeout.
@@ -356,11 +358,7 @@ public void start() {
356358
try {
357359
Assert.state(this.properties != null,
358360
"streams configuration properties must not be null");
359-
Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
360-
this.infrastructureCustomizer.configureTopology(topol);
361-
this.topology = topol;
362-
LOGGER.debug(() -> topol.describe().toString());
363-
this.kafkaStreams = new KafkaStreams(topol, this.properties, this.clientSupplier);
361+
this.kafkaStreams = new KafkaStreams(this.topology, this.properties, this.clientSupplier);
364362
this.kafkaStreams.setStateListener(this.stateListener);
365363
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
366364
if (this.streamsUncaughtExceptionHandler != null) {
@@ -432,6 +430,18 @@ public boolean isRunning() {
432430
}
433431
}
434432

433+
@Override
434+
public void afterSingletonsInstantiated() {
435+
try {
436+
this.topology = getObject().build(this.properties);
437+
this.infrastructureCustomizer.configureTopology(this.topology);
438+
LOGGER.debug(() -> this.topology.describe().toString());
439+
}
440+
catch (Exception e) {
441+
throw new RuntimeException(e);
442+
}
443+
}
444+
435445
/**
436446
* Called whenever a {@link KafkaStreams} is added or removed.
437447
*

spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
5656
* @author Gary Russell
5757
* @author Denis Washington
5858
* @author Soby Chacko
59+
* @author Sanghyeok An
5960
*/
6061
@SpringJUnitConfig
6162
@DirtiesContext
@@ -102,12 +103,34 @@ protected StreamsBuilder createInstance() {
102103
streamsBuilderFactoryBean.afterPropertiesSet();
103104
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
104105
builder.stream(Pattern.compile("foo"));
106+
streamsBuilderFactoryBean.afterSingletonsInstantiated();
105107
streamsBuilderFactoryBean.start();
106108
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
107109
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
108110
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
109111
}
110112

113+
@Test
114+
public void testGetTopologyBeforeKafkaStreamsStart() throws Exception {
115+
// Given
116+
streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) {
117+
@Override
118+
protected StreamsBuilder createInstance() {
119+
return spy(super.createInstance());
120+
}
121+
};
122+
streamsBuilderFactoryBean.afterPropertiesSet();
123+
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
124+
builder.stream(Pattern.compile("test-topic"));
125+
126+
// When
127+
streamsBuilderFactoryBean.afterSingletonsInstantiated();
128+
129+
// Then
130+
assertThat(streamsBuilderFactoryBean.getTopology()).isNotNull();
131+
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
132+
}
133+
111134
@Configuration
112135
@EnableKafkaStreams
113136
public static class KafkaStreamsConfig {

spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryLateConfigTests.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,8 +21,8 @@
2121
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2222

2323
import java.util.Properties;
24-
import java.util.regex.Pattern;
2524

25+
import org.apache.kafka.streams.StreamsBuilder;
2626
import org.apache.kafka.streams.StreamsConfig;
2727
import org.junit.jupiter.api.Test;
2828

@@ -43,6 +43,7 @@
4343
* @author Soby Chacko
4444
* @author Artem Bilan
4545
* @author Gary Russell
46+
* @author Sanghyeok An
4647
*/
4748
@SpringJUnitConfig
4849
@DirtiesContext
@@ -72,11 +73,12 @@ public void testStreamBuilderFactoryCannotBeInstantiatedWhenAutoStart() {
7273

7374
@Test
7475
public void testStreamsBuilderFactoryWithConfigProvidedLater() throws Exception {
76+
boolean isAutoStartUp = true;
7577
Properties props = new Properties();
7678
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
7779
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
7880
streamsBuilderFactoryBean.setStreamsConfiguration(props);
79-
streamsBuilderFactoryBean.getObject().stream(Pattern.compile("foo"));
81+
streamsBuilderFactoryBean.setAutoStartup(isAutoStartUp);
8082

8183
assertThat(streamsBuilderFactoryBean.isRunning()).isFalse();
8284
streamsBuilderFactoryBean.start();
@@ -95,6 +97,23 @@ public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder() {
9597
return streamsBuilderFactoryBean;
9698
}
9799

100+
@Bean
101+
public KafkaStreamsService kafkaStreamsService(StreamsBuilder streamsBuilder) {
102+
return new KafkaStreamsService(streamsBuilder);
103+
}
104+
98105
}
99106

107+
static class KafkaStreamsService {
108+
private final StreamsBuilder streamsBuilder;
109+
110+
KafkaStreamsService(StreamsBuilder streamsBuilder) {
111+
this.streamsBuilder = streamsBuilder;
112+
buildPipeline();
113+
}
114+
115+
void buildPipeline() {
116+
this.streamsBuilder.stream("test-topic");
117+
}
118+
}
100119
}

0 commit comments

Comments
 (0)