Skip to content

Commit f4799c7

Browse files
Adrien Bennadjisnicoll
authored andcommitted
Add configuration options for Kafka Stream's CleanupConfig
See gh-23636
1 parent a5b2778 commit f4799c7

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.boot.context.properties.PropertyMapper;
3838
import org.springframework.boot.convert.DurationUnit;
3939
import org.springframework.core.io.Resource;
40+
import org.springframework.kafka.core.CleanupConfig;
4041
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4142
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
4243
import org.springframework.util.CollectionUtils;
@@ -722,6 +723,11 @@ public static class Streams {
722723
*/
723724
private String stateDir;
724725

726+
/**
727+
* Cleanup configuration for the state stores.
728+
*/
729+
private Cleanup cleanup;
730+
725731
/**
726732
* Additional Kafka properties used to configure the streams.
727733
*/
@@ -791,6 +797,14 @@ public void setStateDir(String stateDir) {
791797
this.stateDir = stateDir;
792798
}
793799

800+
public Cleanup getCleanup() {
801+
return cleanup;
802+
}
803+
804+
public void setCleanup(Cleanup cleanup) {
805+
this.cleanup = cleanup;
806+
}
807+
794808
public Map<String, String> getProperties() {
795809
return this.properties;
796810
}
@@ -1259,6 +1273,32 @@ public byte id() {
12591273

12601274
}
12611275

1276+
public static class Cleanup {
1277+
1278+
/**
1279+
* Cleanup the application's state on start.
1280+
*/
1281+
private boolean onStart = false;
1282+
1283+
/**
1284+
* Cleanup the application's state on stop.
1285+
*/
1286+
private boolean onStop = true;
1287+
1288+
public CleanupConfig buildCleanupConfig() {
1289+
return new CleanupConfig(this.onStart, this.onStop);
1290+
}
1291+
1292+
public boolean isOnStart() {
1293+
return onStart;
1294+
}
1295+
1296+
public boolean isOnStop() {
1297+
return onStop;
1298+
}
1299+
1300+
}
1301+
12621302
@SuppressWarnings("serial")
12631303
private static class Properties extends HashMap<String, Object> {
12641304

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
9191
@Override
9292
public void afterPropertiesSet() {
9393
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
94+
95+
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
96+
if (cleanup != null) {
97+
this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig());
98+
}
9499
}
95100

96101
}

0 commit comments

Comments
 (0)