Skip to content

Commit 37c4314

Browse files
committed
Merge pull request #23636 from adrien-ben
* pr/23636: Polish "Add configuration options for Kafka Stream's CleanupConfig" Add configuration options for Kafka Stream's CleanupConfig Closes gh-23636
2 parents a5b2778 + b960fa3 commit 37c4314

File tree

4 files changed

+72
-0
lines changed

4 files changed

+72
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,8 @@ public static class Streams {
685685

686686
private final Security security = new Security();
687687

688+
private final Cleanup cleanup = new Cleanup();
689+
688690
/**
689691
* Kafka streams application.id property; default spring.application.name.
690692
*/
@@ -735,6 +737,10 @@ public Security getSecurity() {
735737
return this.security;
736738
}
737739

740+
public Cleanup getCleanup() {
741+
return this.cleanup;
742+
}
743+
738744
public String getApplicationId() {
739745
return this.applicationId;
740746
}
@@ -1234,6 +1240,36 @@ public Map<String, Object> buildProperties() {
12341240

12351241
}
12361242

1243+
public static class Cleanup {
1244+
1245+
/**
1246+
* Cleanup the application’s local state directory on startup.
1247+
*/
1248+
private boolean onStartup = false;
1249+
1250+
/**
1251+
* Cleanup the application’s local state directory on shutdown.
1252+
*/
1253+
private boolean onShutdown = true;
1254+
1255+
public boolean isOnStartup() {
1256+
return this.onStartup;
1257+
}
1258+
1259+
public void setOnStartup(boolean onStartup) {
1260+
this.onStartup = onStartup;
1261+
}
1262+
1263+
public boolean isOnShutdown() {
1264+
return this.onShutdown;
1265+
}
1266+
1267+
public void setOnShutdown(boolean onShutdown) {
1268+
this.onShutdown = onShutdown;
1269+
}
1270+
1271+
}
1272+
12371273
public enum IsolationLevel {
12381274

12391275
/**

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
3535
import org.springframework.kafka.config.KafkaStreamsConfiguration;
3636
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
37+
import org.springframework.kafka.core.CleanupConfig;
3738

3839
/**
3940
* Configuration for Kafka Streams annotation-driven support.
@@ -91,6 +92,9 @@ static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
9192
@Override
9293
public void afterPropertiesSet() {
9394
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
95+
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
96+
CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown());
97+
this.factoryBean.setCleanupConfig(cleanupConfig);
9498
}
9599

96100
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.common.serialization.LongSerializer;
3737
import org.apache.kafka.streams.StreamsBuilder;
3838
import org.apache.kafka.streams.StreamsConfig;
39+
import org.assertj.core.api.InstanceOfAssertFactories;
3940
import org.junit.jupiter.api.Test;
4041

4142
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -50,6 +51,7 @@
5051
import org.springframework.kafka.config.KafkaListenerContainerFactory;
5152
import org.springframework.kafka.config.KafkaStreamsConfiguration;
5253
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
54+
import org.springframework.kafka.core.CleanupConfig;
5355
import org.springframework.kafka.core.ConsumerFactory;
5456
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5557
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -340,6 +342,26 @@ void streamsWithSeveralStreamsBuilderFactoryBeans() {
340342
});
341343
}
342344

345+
@Test
346+
void streamsWithCleanupConfig() {
347+
this.contextRunner
348+
.withUserConfiguration(EnableKafkaStreamsConfiguration.class, TestKafkaStreamsConfiguration.class)
349+
.withPropertyValues("spring.application.name=my-test-app",
350+
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
351+
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-startup=true",
352+
"spring.kafka.streams.cleanup.on-shutdown=false")
353+
.run((context) -> {
354+
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
355+
.getBean(StreamsBuilderFactoryBean.class);
356+
assertThat(streamsBuilderFactoryBean)
357+
.extracting("cleanupConfig", InstanceOfAssertFactories.type(CleanupConfig.class))
358+
.satisfies((cleanupConfig) -> {
359+
assertThat(cleanupConfig.cleanupOnStart()).isTrue();
360+
assertThat(cleanupConfig.cleanupOnStop()).isFalse();
361+
});
362+
});
363+
}
364+
343365
@Test
344366
void streamsApplicationIdIsMandatory() {
345367
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import org.junit.jupiter.api.Test;
2020

21+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup;
2122
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
2223
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
24+
import org.springframework.kafka.core.CleanupConfig;
2325
import org.springframework.kafka.listener.ContainerProperties;
2426

2527
import static org.assertj.core.api.Assertions.assertThat;
@@ -48,4 +50,12 @@ void listenerDefaultValuesAreConsistent() {
4850
assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal());
4951
}
5052

53+
@Test
54+
void cleanupConfigDefaultValuesAreConsistent() {
55+
CleanupConfig cleanupConfig = new CleanupConfig();
56+
Cleanup cleanup = new KafkaProperties().getStreams().getCleanup();
57+
assertThat(cleanup.isOnStartup()).isEqualTo(cleanupConfig.cleanupOnStart());
58+
assertThat(cleanup.isOnShutdown()).isEqualTo(cleanupConfig.cleanupOnStop());
59+
}
60+
5161
}

0 commit comments

Comments
 (0)