Skip to content

Commit 6bd58b2

Browse files
realcbbartembilan
authored andcommitted
Catch exceptions thrown from error handler
In situations like committing acks while consumer group has already rebalanced, and the container error handler is instance of `ConsumerAwareErrorHandler` or `ConsumerAwareBatchErrorHandler`, the error handler will throw an exception which would not be caught. Then the consumer will be dead, it can not receive messages any more.
1 parent 17e255d commit 6bd58b2

File tree

2 files changed

+83
-5
lines changed

2 files changed

+83
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
* @author Artem Bilan
9393
* @author Loic Talhouarne
9494
* @author Vladimir Tsanev
95+
* @author Chen Binbin
9596
* @author Yang Qiju
9697
* @author Tom van den Berge
9798
*/
@@ -719,11 +720,23 @@ public void run() {
719720
break;
720721
}
721722
catch (Exception e) {
722-
if (this.containerProperties.getGenericErrorHandler() != null) {
723-
this.containerProperties.getGenericErrorHandler().handle(e, null);
723+
try {
724+
GenericErrorHandler<?> containerErrorHandler = this.containerProperties.getGenericErrorHandler();
725+
if (containerErrorHandler != null) {
726+
if (containerErrorHandler instanceof ConsumerAwareErrorHandler
727+
|| containerErrorHandler instanceof ConsumerAwareBatchErrorHandler) {
728+
containerErrorHandler.handle(e, null, this.consumer);
729+
}
730+
else {
731+
containerErrorHandler.handle(e, null);
732+
}
733+
}
734+
else {
735+
this.logger.error("Container exception", e);
736+
}
724737
}
725-
else {
726-
this.logger.error("Container exception", e);
738+
catch (Exception ex) {
739+
this.logger.error("Container exception", ex);
727740
}
728741
}
729742
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,13 @@ public class KafkaMessageListenerContainerTests {
134134

135135
private static String topic18 = "testTopic18";
136136

137+
private static String topic19 = "testTopic19";
138+
137139

138140
@ClassRule
139141
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
140142
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17,
141-
topic18);
143+
topic18, topic19);
142144

143145
@Rule
144146
public TestName testName = new TestName();
@@ -1718,6 +1720,69 @@ public void testInitialSeek() throws Exception {
17181720
container.stop();
17191721
}
17201722

1723+
@Test
1724+
public void testExceptionWhenCommitAfterRebalance() throws Exception {
1725+
final CountDownLatch rebalanceLatch = new CountDownLatch(2);
1726+
final CountDownLatch consumeLatch = new CountDownLatch(7);
1727+
1728+
Map<String, Object> props = KafkaTestUtils.consumerProps("test19", "false", embeddedKafka);
1729+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
1730+
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 15000);
1731+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1732+
ContainerProperties containerProps = new ContainerProperties(topic19);
1733+
containerProps.setMessageListener((MessageListener<Integer, String>) messages -> {
1734+
logger.info("listener: " + messages);
1735+
consumeLatch.countDown();
1736+
try {
1737+
Thread.sleep(3000);
1738+
}
1739+
catch (InterruptedException e) {
1740+
e.printStackTrace();
1741+
}
1742+
});
1743+
containerProps.setSyncCommits(true);
1744+
containerProps.setAckMode(AckMode.BATCH);
1745+
containerProps.setPollTimeout(100);
1746+
containerProps.setAckOnError(false);
1747+
containerProps.setErrorHandler(new SeekToCurrentErrorHandler());
1748+
1749+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
1750+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
1751+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1752+
template.setDefaultTopic(topic19);
1753+
1754+
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
1755+
1756+
@Override
1757+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
1758+
}
1759+
1760+
@Override
1761+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
1762+
logger.info("rebalance occurred.");
1763+
rebalanceLatch.countDown();
1764+
}
1765+
});
1766+
1767+
KafkaMessageListenerContainer<Integer, String> container =
1768+
new KafkaMessageListenerContainer<>(cf, containerProps);
1769+
container.setBeanName("testContainerException");
1770+
container.start();
1771+
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
1772+
container.pause();
1773+
1774+
for (int i = 0; i < 6; i++) {
1775+
template.sendDefault(0, 0, "a");
1776+
}
1777+
template.flush();
1778+
1779+
container.resume();
1780+
// should be rebalanced and consume again
1781+
assertThat(rebalanceLatch.await(60, TimeUnit.SECONDS)).isTrue();
1782+
assertThat(consumeLatch.await(60, TimeUnit.SECONDS)).isTrue();
1783+
container.stop();
1784+
}
1785+
17211786
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
17221787
Consumer<?, ?> consumer = spy(
17231788
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)