Skip to content

Make KafkaAdmin Easier to Subclass and Add a Filter Mechanism #2720

@garyrussell

Description

@garyrussell

Discussed in #2719

Originally posted by hariraogotit June 29, 2023
Team,

  1. I have two of this bean - RetryTopicConfiguration. One configured for London cluster and other for Sydney cluster.
  2. When I run the application, they are creating retry topics in both the clusters i.e london and sydney retry topics are created both in London and in Sydney
  3. Expectation is london retry topics to be created in London and sydney retry topics to be created in Sydney cluster.

Any lead is much appreciated guys please.

Please find my listener

@KafkaListener(
         topics = {
                 "AP-sydney"
         },
         id = "sydney",
         containerFactory = "sydneyCluster")
 public void consume(ConsumerRecord<String, String> record) {
------------
@KafkaListener(
         topics = {
                 "EU-london"
         },
         id = "london",
         containerFactory = "londonCluster")
 public void consumeLondon(ConsumerRecord<String, String> record) {

Configuration details are below

@Bean("sydneyCluster")
 ConcurrentKafkaListenerContainerFactory<Integer, String> sydneyListenerContainer() {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
           new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerSydneyFactory());

   return factory;
 }

 @Bean // Beans for the First Consumer Factory
 public ConsumerFactory<Integer, String> consumerSydneyFactory() {
   return new DefaultKafkaConsumerFactory<>(consumerSydneyConfigs());
 }

 @Bean
 public Map<String, Object> consumerSydneyConfigs() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sydneyServers);
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "hr");
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   return props;
 }


 @Bean("londonCluster")
 ConcurrentKafkaListenerContainerFactory<Integer, String> londonListenerContainer() {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
           new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerLondonFactory());

   return factory;
 }


 @Bean // Beans for the First Consumer Factory
 public ConsumerFactory<Integer, String> consumerLondonFactory() {
   return new DefaultKafkaConsumerFactory<>(consumerLondonConfigs());
 }



 @Bean
 public Map<String, Object> consumerLondonConfigs() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, londonServers);
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "hr");
   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   return props;
 }

 @Bean(name = "sydneyKafkaTemplate")
 public KafkaTemplate<String, String> sydneyKafkaTemplate() {
   Map<String, Object> props = new HashMap<>();
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sydneyServers);
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

   ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
   return new KafkaTemplate<>(producerFactory);
 }

 @Bean(name = "londonKafkaTemplate")
 public KafkaTemplate<String, String> londonKafkaTemplate() {
   Map<String, Object> props = new HashMap<>();
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, londonServers);
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

   ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
   return new KafkaTemplate<>(producerFactory);
 }

 @Bean
 public RetryTopicConfiguration retrySydneyKafkaConfig(
         @Qualifier("sydneyKafkaTemplate") KafkaTemplate<String, String> sydneyKafkaTemplate) {
   return RetryTopicConfigurationBuilder
           .newInstance()
           .includeTopics(List.of("AP-sydney"))
           .exponentialBackoff(1000, 2, 20000)
           .maxAttempts(4)
           .retryTopicSuffix("-AP-Australia-retry")
           .dltSuffix("-AP-Australia-dlt")
           .useSingleTopicForSameIntervals()
           .listenerFactory("sydneyCluster")
           .create(sydneyKafkaTemplate);
 }

 @Bean
 public RetryTopicConfiguration retryLondonKafkaConfig(
         @Qualifier("londonKafkaTemplate") KafkaTemplate<String, String> londonKafkaTemplate) {
   return   RetryTopicConfigurationBuilder
           .newInstance()
           .includeTopics(List.of("EU-london"))
           .exponentialBackoff(1000, 2, 20000)
           .maxAttempts(4)
           .retryTopicSuffix("-EU-England-retry")
           .dltSuffix("-EU-England-dlt")
           .useSingleTopicForSameIntervals()
           .listenerFactory("londonCluster")
           .create(londonKafkaTemplate);
 }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions