11package dev.vality.vortigon.config
22
3- import com.rbkmoney.kafka.common.exception.handler.SeekToCurrentWithSleepBatchErrorHandler
43import dev.vality.machinegun.eventsink.MachineEvent
54import dev.vality.mg.event.sink.service.ConsumerGroupIdService
6- import dev.vality.vortigon.config.properties.KafkaProperties
75import dev.vality.vortigon.serializer.MachineEventDeserializer
86import lombok.RequiredArgsConstructor
9- import mu.KotlinLogging
107import org.apache.kafka.clients.consumer.ConsumerConfig
118import org.apache.kafka.clients.consumer.OffsetResetStrategy
129import org.apache.kafka.common.serialization.Deserializer
1310import org.apache.kafka.common.serialization.StringDeserializer
11+ import org.springframework.beans.factory.annotation.Value
12+ import org.springframework.boot.autoconfigure.kafka.KafkaProperties
1413import org.springframework.context.annotation.Bean
1514import org.springframework.context.annotation.Configuration
1615import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
1716import org.springframework.kafka.core.DefaultKafkaConsumerFactory
1817import org.springframework.kafka.listener.ContainerProperties
19- import java.io.File
20-
21- private val log = KotlinLogging .logger {}
18+ import org.springframework.kafka.listener.DefaultErrorHandler
19+ import org.springframework.util.backoff.ExponentialBackOff
2220
2321@Configuration
2422@RequiredArgsConstructor
2523class KafkaConfig (
2624 private val kafkaProperties : KafkaProperties ,
2725 private val consumerGroupIdService : ConsumerGroupIdService ,
26+ @Value(" \$ {kafka.consumer.concurrency}" )
27+ private val concurrencyListenerCount : Int ,
28+ @Value(" \$ {kafka.max.poll.records}" )
29+ private val maxPollRecords : String ,
30+ @Value(" \$ {kafka.error-handler.backoff.initial-interval}" )
31+ private val initialInterval : Long ,
32+ @Value(" \$ {kafka.error-handler.backoff.max-interval}" )
33+ private val maxInterval : Long
2834) {
35+
2936 @Bean
3037 fun partyListenerContainerFactory (): ConcurrentKafkaListenerContainerFactory <String , MachineEvent > {
3138 val factory = ConcurrentKafkaListenerContainerFactory <String , MachineEvent >()
3239 val consumerGroup: String = consumerGroupIdService.generateGroupId(PARTY_CONSUMER_GROUP_NAME )
33- initDefaultListenerProperties< MachineEvent > (
40+ initDefaultListenerProperties(
3441 factory,
3542 consumerGroup,
3643 MachineEventDeserializer (),
37- kafkaProperties. maxPollRecords
44+ maxPollRecords
3845 )
3946 return factory
4047 }
@@ -49,8 +56,12 @@ class KafkaConfig(
4956 consumerGroup, deserializer, maxPollRecords
5057 )
5158 factory.consumerFactory = consumerFactory
52- factory.setConcurrency(kafkaProperties.consumer.concurrency.toInt())
53- factory.setBatchErrorHandler(SeekToCurrentWithSleepBatchErrorHandler ())
59+ factory.setConcurrency(concurrencyListenerCount)
60+ val exponentialBackOff = ExponentialBackOff ().apply {
61+ maxInterval = this @KafkaConfig.maxInterval
62+ initialInterval = this @KafkaConfig.initialInterval
63+ }
64+ factory.setCommonErrorHandler(DefaultErrorHandler (exponentialBackOff))
5465 factory.isBatchListener = true
5566 factory.containerProperties.ackMode = ContainerProperties .AckMode .MANUAL
5667 }
@@ -60,7 +71,7 @@ class KafkaConfig(
6071 deserializer : Deserializer <T >,
6172 maxPollRecords : String ,
6273 ): DefaultKafkaConsumerFactory <String , T > {
63- val props: MutableMap <String , Any > = createDefaultProperties (consumerGroup)
74+ val props: MutableMap <String , Any > = createDefaultConsumerProperties (consumerGroup)
6475 props[ConsumerConfig .MAX_POLL_RECORDS_CONFIG ] = maxPollRecords
6576 return DefaultKafkaConsumerFactory (
6677 props,
@@ -69,33 +80,13 @@ class KafkaConfig(
6980 )
7081 }
7182
72- private fun createDefaultProperties (value : String ): MutableMap <String , Any > {
73- return HashMap <String , Any >().apply {
74- put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , kafkaProperties.bootstrapServers)
75- put(ConsumerConfig .GROUP_ID_CONFIG , value)
76- put(ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , OffsetResetStrategy .EARLIEST .name.lowercase())
77- put(ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG , false )
78- put(ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG , kafkaProperties.maxSessionTimeoutMs)
79- put(ConsumerConfig .MAX_POLL_INTERVAL_MS_CONFIG , kafkaProperties.maxPollIntervalMs)
80- putAll(createSslConfig())
81- }
82- }
83+ private fun createDefaultConsumerProperties (value : String ): MutableMap <String , Any > {
84+ val properties = kafkaProperties.buildConsumerProperties()
85+ properties[ConsumerConfig .GROUP_ID_CONFIG ] = value
86+ properties[ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ] = OffsetResetStrategy .EARLIEST .name.lowercase()
87+ properties[ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG ] = false
8388
84- private fun createSslConfig (): Map <String , Any > {
85- val kafkaSslProperties = kafkaProperties.ssl
86- log.info(" Kafka ssl isEnabled: {}" , kafkaSslProperties.enabled)
87- val configProps = HashMap <String , Any >()
88- if (kafkaSslProperties.enabled) {
89- configProps[" security.protocol" ] = " SSL"
90- configProps[" ssl.truststore.location" ] = File (kafkaSslProperties.trustStoreLocation).absolutePath
91- configProps[" ssl.truststore.password" ] = kafkaSslProperties.trustStorePassword
92- configProps[" ssl.keystore.type" ] = " PKCS12"
93- configProps[" ssl.truststore.type" ] = " PKCS12"
94- configProps[" ssl.keystore.location" ] = File (kafkaSslProperties.keyStoreLocation).absolutePath
95- configProps[" ssl.keystore.password" ] = kafkaSslProperties.keyStorePassword
96- configProps[" ssl.key.password" ] = kafkaSslProperties.keyPassword
97- }
98- return configProps
89+ return properties
9990 }
10091
10192 companion object {
0 commit comments