From d6f1710e48f5139d4c0142c1b37b391dc697473d Mon Sep 17 00:00:00 2001 From: Adrien Bennadji Date: Mon, 12 Oct 2020 16:11:44 +0200 Subject: [PATCH] Add KafkaStream's CleanupConfig properties --- .../autoconfigure/kafka/KafkaProperties.java | 40 +++++++++++++++++++ ...aStreamsAnnotationDrivenConfiguration.java | 5 +++ .../spring-boot-dependencies/build.gradle | 2 +- 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 778c6f9bac47..60f9dfd73b8a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -37,6 +37,7 @@ import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; @@ -722,6 +723,11 @@ public static class Streams { */ private String stateDir; + /** + * Cleanup configuration for the state stores. + */ + private Cleanup cleanup; + /** * Additional Kafka properties used to configure the streams. */ @@ -791,6 +797,14 @@ public void setStateDir(String stateDir) { this.stateDir = stateDir; } + public Cleanup getCleanup() { + return cleanup; + } + + public void setCleanup(Cleanup cleanup) { + this.cleanup = cleanup; + } + public Map getProperties() { return this.properties; } @@ -1259,6 +1273,32 @@ public byte id() { } + public static class Cleanup { + + /** + * Cleanup the application's state on start. + */ + private boolean onStart = false; + + /** + * Cleanup the application's state on stop. + */ + private boolean onStop = true; + + public CleanupConfig buildCleanupConfig() { + return new CleanupConfig(this.onStart, this.onStop); + } + + public boolean isOnStart() { + return onStart; + } + + public boolean isOnStop() { + return onStop; + } + + } + @SuppressWarnings("serial") private static class Properties extends HashMap { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 1a5a106392c1..ef36f273534d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -91,6 +91,11 @@ static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { @Override public void afterPropertiesSet() { this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); + + KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); + if (cleanup != null) { + this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig()); + } } } diff --git a/spring-boot-project/spring-boot-dependencies/build.gradle b/spring-boot-project/spring-boot-dependencies/build.gradle index b645ee4a8e73..bbe4e313d213 100644 --- a/spring-boot-project/spring-boot-dependencies/build.gradle +++ b/spring-boot-project/spring-boot-dependencies/build.gradle @@ -1588,7 +1588,7 @@ bom { ] } } - library("Spring Kafka", "2.6.1") { + library("Spring Kafka", "2.6.2-SNAPSHOT") { group("org.springframework.kafka") { modules = [ "spring-kafka",