-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
Description
As per docs, we need to provide transactionIdPrefix in order to enable transaction support. It will be nice if we can add a field like transactionIdPrefix in Producer and use it in DefaultKafkaProducerFactory to support properties based configuration like spring.kafka.producer.transaction-id-prefix in application.properties .
Current Situation:
ATM, we have to manually create a bean just to enable the transaction support. E.g. (the code below is in Kotlin, but we have to do the same thing in Java)
@Bean
fun kafkaProducerFactory(): ProducerFactory<*, *> {
val config = mapOf<String, Any>(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ProducerConfig.ACKS_CONFIG to "all",
ProducerConfig.BATCH_SIZE_CONFIG to "4000",
ProducerConfig.COMPRESSION_TYPE_CONFIG to "snappy"
)
val defaultKafkaProducerFactory = DefaultKafkaProducerFactory<Any, Any>(config)
defaultKafkaProducerFactory.setTransactionIdPrefix("txn-ingestor") // only this line is relevant
return defaultKafkaProducerFactory
}
I am using:
Spring-KafkaVersion 2.0.1.RELEASE
SpringBootVersion 2.0.0.M6
If it sounds good than I can submit a PR.