Skip to content

Commit 8ad0006

Browse files
author
Adrien Bennadji
committed
Add KafkaStream's CleanupConfig properties
1 parent 53bf8c1 commit 8ad0006

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.boot.context.properties.PropertyMapper;
3838
import org.springframework.boot.convert.DurationUnit;
3939
import org.springframework.core.io.Resource;
40+
import org.springframework.kafka.core.CleanupConfig;
4041
import org.springframework.kafka.listener.ContainerProperties.AckMode;
4142
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
4243
import org.springframework.util.CollectionUtils;
@@ -722,6 +723,11 @@ public static class Streams {
722723
*/
723724
private String stateDir;
724725

726+
/**
727+
* Cleanup configuration for the state stores.
728+
*/
729+
private Cleanup cleanup;
730+
725731
/**
726732
* Additional Kafka properties used to configure the streams.
727733
*/
@@ -791,6 +797,14 @@ public void setStateDir(String stateDir) {
791797
this.stateDir = stateDir;
792798
}
793799

800+
public Cleanup getCleanup() {
801+
return cleanup;
802+
}
803+
804+
public void setCleanup(Cleanup cleanup) {
805+
this.cleanup = cleanup;
806+
}
807+
794808
public Map<String, String> getProperties() {
795809
return this.properties;
796810
}
@@ -1259,6 +1273,31 @@ public byte id() {
12591273

12601274
}
12611275

1276+
public static class Cleanup {
1277+
1278+
/**
1279+
* Cleanup the application's state on start.
1280+
*/
1281+
private boolean onStart = false;
1282+
1283+
/**
1284+
* Cleanup the application's state on stop.
1285+
*/
1286+
private boolean onStop = true;
1287+
1288+
public CleanupConfig buildCleanupConfig() {
1289+
return new CleanupConfig(this.onStart, this.onStop);
1290+
}
1291+
1292+
public boolean isOnStart() {
1293+
return onStart;
1294+
}
1295+
1296+
public boolean isOnStop() {
1297+
return onStop;
1298+
}
1299+
}
1300+
12621301
@SuppressWarnings("serial")
12631302
private static class Properties extends HashMap<String, Object> {
12641303

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
9191
@Override
9292
public void afterPropertiesSet() {
9393
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
94+
95+
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
96+
if (cleanup != null) {
97+
this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig());
98+
}
9499
}
95100

96101
}

spring-boot-project/spring-boot-dependencies/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1588,7 +1588,7 @@ bom {
15881588
]
15891589
}
15901590
}
1591-
library("Spring Kafka", "2.6.1") {
1591+
library("Spring Kafka", "2.6.2-SNAPSHOT") {
15921592
group("org.springframework.kafka") {
15931593
modules = [
15941594
"spring-kafka",

0 commit comments

Comments
 (0)