Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +35,7 @@

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.KafkaException.Level;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.backoff.BackOff;

Expand All @@ -50,6 +49,7 @@
*
* @author Gary Russell
* @author Francois Rosiere
* @author Wang Zhiyang
* @since 2.8
*
*/
Expand Down Expand Up @@ -120,7 +120,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
@Override
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
notRetryable.forEach(handler::addNotRetryableExceptions);
}
}

Expand Down Expand Up @@ -178,7 +178,6 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
else {
return String.format("Record not found in batch, index %d out of bounds (0, %d); "
+ "re-seeking batch", index, data.count() - 1);

}
});
fallback(thrownException, data, consumer, container, invokeListener);
Expand All @@ -201,11 +200,9 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
return -1;
}
int i = 0;
Iterator<?> iterator = data.iterator();
while (iterator.hasNext()) {
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
&& candidate.offset() == record.offset()) {
for (ConsumerRecord<?, ?> datum : data) {
if (datum.topic().equals(record.topic()) && datum.partition() == record.partition()
&& datum.offset() == record.offset()) {
break;
}
i++;
Expand All @@ -220,29 +217,25 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
if (data == null) {
return ConsumerRecords.empty();
}
Iterator<?> iterator = data.iterator();
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
int index = indexArg;
while (iterator.hasNext()) {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) iterator.next();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<?, ?> datum : data) {
if (index-- > 0) {
toCommit.add(record);
offsets.compute(new TopicPartition(datum.topic(), datum.partition()),
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, datum.offset() + 1));
}
else {
remaining.add(record);
remaining.add(datum);
}
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0) {
commit(consumer, container, offsets);
}
if (isSeekAfterError()) {
if (remaining.size() > 0) {
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
getFailureTracker()::recovered, this.logger, getLogLevel());
getFailureTracker(), this.logger, getLogLevel());
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
Expand All @@ -254,35 +247,43 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
return ConsumerRecords.empty();
}
else {
if (indexArg == 0) {
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
}
else {
if (remaining.size() > 0) {
try {
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
consumer)) {
remaining.remove(0);
}
}
catch (Exception e) {
if (SeekUtils.isBackoffException(thrownException)) {
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
+ " included in remaining due to retry back off " + thrownException);
}
else {
this.logger.error(e, KafkaUtils.format(remaining.get(0))
+ " included in remaining due to " + thrownException);
}
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty catch - can we address this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (remaining.isEmpty()) {
return ConsumerRecords.empty();
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
return new ConsumerRecords<>(remains);
}
}

private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
Map<TopicPartition, OffsetAndMetadata> offsets) {

boolean syncCommits = container.getContainerProperties().isSyncCommits();
Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
if (syncCommits) {
consumer.commitSync(offsets, timeout);
ContainerProperties properties = container.getContainerProperties();
if (properties.isSyncCommits()) {
consumer.commitSync(offsets, properties.getSyncCommitTimeout());
}
else {
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
OffsetCommitCallback commitCallback = properties.getCommitCallback();
if (commitCallback == null) {
commitCallback = LOGGING_COMMIT_CALLBACK;
}
Expand All @@ -304,8 +305,8 @@ private BatchListenerFailedException getBatchListenerFailedException(Throwable t
throwable = throwable.getCause();
checked.add(throwable);

if (throwable instanceof BatchListenerFailedException) {
target = (BatchListenerFailedException) throwable;
if (throwable instanceof BatchListenerFailedException batchListenerFailedException) {
target = batchListenerFailedException;
break;
}
}
Expand Down
Loading