Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ Starting with version 2.7, the recoverer checks that the partition selected by t
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
You can disable this check by setting the `verifyPartition` property to `false`.

Starting with version 3.1, setting the `logRecoveryRecord` property to `true` will log the recovery record and exception.

[[dlpr-headers]]
== Managing Dead Letter Record Headers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ default boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record
}

/**
* Handle the exception for a record listener when {@link #remainingRecords()} returns
* Handle the exception for a record listener when {@link #seeksAfterHandling()} returns
* true. The failed record and all the remaining records from the poll are passed in.
* Usually used when the error handler performs seeks so that the remaining records
* will be redelivered on the next poll.
* @param thrownException the exception.
* @param records the remaining records including the one that failed.
* @param consumer the consumer.
* @param container the container.
* @see #remainingRecords()
* @see #seeksAfterHandling()
*/
default void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private boolean skipSameTopicFatalExceptions = true;

private boolean logRecoveryRecord = false;

private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders;

private Supplier<HeaderNames> headerNamesSupplier = () -> HeaderNames.Builder
Expand Down Expand Up @@ -400,6 +402,15 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions
this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions;
}

/**
* Set to true if you want to log recovery record and exception.
* @param logRecoveryRecord true to log record and exception.
* @since 3.1
*/
public void setLogRecoveryRecord(boolean logRecoveryRecord) {
this.logRecoveryRecord = logRecoveryRecord;
}

/**
* Set a {@link ExceptionHeadersCreator} implementation to completely take over
* setting the exception headers in the output record. Disables all headers that are
Expand Down Expand Up @@ -503,6 +514,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
+ " and the destination resolver routed back to the same topic");
return;
}
if (this.logRecoveryRecord) {
this.logger.info(exception, () -> "Recovery record " + KafkaUtils.format(record));
}
if (consumer != null && this.verifyPartition) {
tp = checkPartition(tp, consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public DeadLetterPublishingRecoverer create(String mainListenerId) {
recoverer.setAppendOriginalHeaders(false);
recoverer.setThrowIfNoDestinationReturned(false);
recoverer.setSkipSameTopicFatalExceptions(false);
recoverer.setLogRecoveryRecord(false);
this.recovererCustomizer.accept(recoverer);
this.fatalExceptions.forEach(recoverer::addNotRetryableExceptions);
this.nonFatalExceptions.forEach(recoverer::removeClassification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco
return null;
}
if (header != null) {
byte[] value = header.value();
DeserializationException exception = byteArrayToDeserializationException(logger, header);
if (exception != null) {
Headers headers = new RecordHeaders(record.headers().toArray());
Expand Down