Skip to content

Commit c588425

Browse files
GH-3176: Take property default.dsl.store into account
1 parent 739679e commit c588425

File tree

2 files changed

+111
-2
lines changed

2 files changed

+111
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,7 +27,9 @@
2727
import org.apache.kafka.streams.KafkaClientSupplier;
2828
import org.apache.kafka.streams.KafkaStreams;
2929
import org.apache.kafka.streams.StreamsBuilder;
30+
import org.apache.kafka.streams.StreamsConfig;
3031
import org.apache.kafka.streams.Topology;
32+
import org.apache.kafka.streams.TopologyConfig;
3133
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
3234
import org.apache.kafka.streams.processor.StateRestoreListener;
3335
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
@@ -56,6 +58,7 @@
5658
* @author Denis Washington
5759
* @author Gary Russell
5860
* @author Julien Wittouck
61+
* @author Cédric Schaller
5962
*
6063
* @since 1.1.4
6164
*/
@@ -326,7 +329,7 @@ protected StreamsBuilder createInstance() {
326329
Assert.state(this.properties != null,
327330
"streams configuration properties must not be null");
328331
}
329-
StreamsBuilder builder = new StreamsBuilder();
332+
StreamsBuilder builder = createStreamBuilder();
330333
this.infrastructureCustomizer.configureBuilder(builder);
331334
return builder;
332335
}
@@ -432,6 +435,17 @@ public boolean isRunning() {
432435
}
433436
}
434437

438+
private StreamsBuilder createStreamBuilder() {
439+
if (this.properties == null) {
440+
return new StreamsBuilder();
441+
}
442+
else {
443+
StreamsConfig streamsConfig = new StreamsConfig(this.properties);
444+
TopologyConfig topologyConfig = new TopologyConfig(streamsConfig);
445+
return new StreamsBuilder(topologyConfig);
446+
}
447+
}
448+
435449
/**
436450
* Called whenever a {@link KafkaStreams} is added or removed.
437451
*
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.io.IOException;
22+
import java.nio.file.Files;
23+
import java.nio.file.Path;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
import org.apache.kafka.common.serialization.Serdes;
28+
import org.apache.kafka.streams.StreamsBuilder;
29+
import org.apache.kafka.streams.StreamsConfig;
30+
import org.apache.kafka.streams.kstream.KStream;
31+
import org.apache.kafka.streams.kstream.KTable;
32+
import org.apache.kafka.streams.kstream.Materialized;
33+
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.Test;
35+
36+
import org.springframework.beans.factory.annotation.Value;
37+
import org.springframework.context.annotation.Bean;
38+
import org.springframework.context.annotation.Configuration;
39+
import org.springframework.kafka.annotation.EnableKafkaStreams;
40+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
41+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
42+
import org.springframework.kafka.test.context.EmbeddedKafka;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
45+
46+
/**
47+
* @author Cédric Schaller
48+
*/
49+
@SpringJUnitConfig
50+
@DirtiesContext
51+
@EmbeddedKafka
52+
public class StreamsBuilderFactoryBeanInMemoryStateStoreTests {
53+
54+
private static Path stateStoreDir;
55+
56+
@BeforeAll
57+
static void beforeAll() throws IOException {
58+
stateStoreDir = Files.createTempDirectory(StreamsBuilderFactoryBeanInMemoryStateStoreTests.class.getSimpleName());
59+
}
60+
61+
@Test
62+
void testStateStoreIsInMemory() {
63+
assertThat(stateStoreDir).isEmptyDirectory();
64+
}
65+
66+
@Configuration
67+
@EnableKafkaStreams
68+
static class KafkaStreamsConfig {
69+
70+
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
71+
private String brokerAddresses;
72+
73+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
74+
public KafkaStreamsConfiguration kStreamsConfigWithInMemoryStateStores() {
75+
Map<String, Object> props = new HashMap<>();
76+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "should-be-stored-in-memory");
77+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
78+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
79+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
80+
props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString());
81+
82+
// Property introduced with KIP-591 (Kafka 3.2) and deprecated (but still supported) with Kafka 3.7
83+
props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "in_memory");
84+
return new KafkaStreamsConfiguration(props);
85+
}
86+
87+
@Bean
88+
public KTable<?, ?> table(StreamsBuilder builder) {
89+
KStream<Object, Object> stream = builder.stream("source-topic");
90+
return stream.groupByKey()
91+
.count(Materialized.as("store"));
92+
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)