-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue? 3.2.5
When using the @embeddedkafka annotation with a list of topics, it will occasionally timeout on a slow build pipeline with the following exception
ERROR org.springframework.boot.SpringApplication : Application run failed
org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException
at org.springframework.kafka.test.EmbeddedKafkaZKBroker.createTopics(EmbeddedKafkaZKBroker.java:429) ~[spring-kafka-test-3.2.5.jar:3.2.5]
at org.springframework.kafka.test.EmbeddedKafkaZKBroker.lambda$createKafkaTopics$6(EmbeddedKafkaZKBroker.java:416) ~[spring-kafka-test-3.2.5.jar:3.2.5]
at org.springframework.kafka.test.EmbeddedKafkaZKBroker.doWithAdmin(EmbeddedKafkaZKBroker.java:514) ~[spring-kafka-test-3.2.5.jar:3.2.5]
at org.springframework.kafka.test.EmbeddedKafkaZKBroker.createKafkaTopics(EmbeddedKafkaZKBroker.java:415) ~[spring-kafka-test-3.2.5.jar:3.2.5]
at org.springframework.kafka.test.EmbeddedKafkaZKBroker.afterPropertiesSet(EmbeddedKafkaZKBroker.java:325) ~[spring-kafka-test-3.2.5.jar:3.2.5]
at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:130) ~[spring-kafka-test-3.2.5.jar:3.2.5]
at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:443) ~[spring-boot-test-3.3.6.jar:3.3.6]
Checking in the debugger, I see that the admin timeout is set to the default 10 seconds.
Setting the adminTimeout attribute in the EmbeddedKafka annotation has no effect.
To Reproduce
Use the EmbeddedKafka annotation with the adminTimeout attribute set to something other than the default 10s.
Use a debugger and set a breakpoint in EmbeddedKafkaZKBroker.createTopics
Look at the adminTimeout property - it will be 10 seconds
Expected behavior
I believe EmbeddedKafkaContextCustomizer should call
embeddedKafkaBroker.setAdminTimeout(embeddedKafka.adminTimeout())
before calling embeddedKafkaBroker.afterPropertiesSet()