From f0a0aaa126ed9aded711a70315e2a6678c6bf005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Schaller?= Date: Thu, 4 Apr 2024 07:11:38 +0200 Subject: [PATCH] GH-3176: Take property default.dsl.store into account --- .../config/StreamsBuilderFactoryBean.java | 16 ++- ...derFactoryBeanInMemoryStateStoreTests.java | 97 +++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java index b1238cc99a..dc391ddb91 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java @@ -27,7 +27,9 @@ import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; @@ -58,6 +60,7 @@ * @author Gary Russell * @author Julien Wittouck * @author Sanghyeok An + * @author Cédric Schaller * * @since 1.1.4 */ @@ -328,7 +331,7 @@ protected StreamsBuilder createInstance() { Assert.state(this.properties != null, "streams configuration properties must not be null"); } - StreamsBuilder builder = new StreamsBuilder(); + StreamsBuilder builder = createStreamBuilder(); this.infrastructureCustomizer.configureBuilder(builder); return builder; } @@ -442,6 +445,17 @@ public void afterSingletonsInstantiated() { } } + private StreamsBuilder createStreamBuilder() { + if (this.properties == null) { + return new StreamsBuilder(); + } + else { + StreamsConfig streamsConfig = new StreamsConfig(this.properties); + TopologyConfig topologyConfig = new TopologyConfig(streamsConfig); + return new StreamsBuilder(topologyConfig); + } + } + /** * Called whenever a {@link KafkaStreams} is added or removed. * diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java new file mode 100644 index 0000000000..4d9bd19200 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanInMemoryStateStoreTests.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Cédric Schaller + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +public class StreamsBuilderFactoryBeanInMemoryStateStoreTests { + + private static Path stateStoreDir; + + @BeforeAll + static void beforeAll() throws IOException { + stateStoreDir = Files.createTempDirectory(StreamsBuilderFactoryBeanInMemoryStateStoreTests.class.getSimpleName()); + } + + @Test + void testStateStoreIsInMemory() { + // Testing that an in-memory state store is used requires accessing the internal state of StreamsBuilder via reflection + // Therefore, we check the non-existence of RocksDB files instead + assertThat(stateStoreDir).isEmptyDirectory(); + } + + @Configuration + @EnableKafkaStreams + static class KafkaStreamsConfig { + + @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}") + private String brokerAddresses; + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kStreamsConfigWithInMemoryStateStores() { + Map props = new HashMap<>(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "should-be-stored-in-memory"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString()); + + // Property introduced with KIP-591 (Kafka 3.2) and deprecated (but still supported) with Kafka 3.7 + props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "in_memory"); + return new KafkaStreamsConfiguration(props); + } + + @Bean + public KTable table(StreamsBuilder builder) { + KStream stream = builder.stream("source-topic"); + return stream.groupByKey() + .count(Materialized.as("store")); + + } + } +}