diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java index dc4c15d507..22e84f5d7b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicDeserializer.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; +import org.jspecify.annotations.Nullable; /** * A {@link Deserializer} that delegates to other deserializers based on the topic name. @@ -74,12 +75,12 @@ public Object deserialize(String topic, byte[] data) { } @Override - public Object deserialize(String topic, Headers headers, byte[] data) { + public @Nullable Object deserialize(String topic, Headers headers, byte[] data) { return findDelegate(topic).deserialize(topic, headers, data); } @Override - public Object deserialize(String topic, Headers headers, ByteBuffer data) { + public @Nullable Object deserialize(String topic, Headers headers, ByteBuffer data) { return findDelegate(topic).deserialize(topic, headers, data); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java index 73629f9882..30ec3998b1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingDeserializer.java @@ -167,13 +167,13 @@ public Object deserialize(String topic, byte[] data) { } @Override - public Object deserialize(String topic, Headers headers, byte[] data) { + public @Nullable Object deserialize(String topic, Headers headers, byte[] data) { Deserializer deserializer = getDeserializerByHeaders(headers); return deserializer == null ? data : deserializer.deserialize(topic, headers, data); } @Override - public Object deserialize(String topic, Headers headers, ByteBuffer data) { + public @Nullable Object deserialize(String topic, Headers headers, ByteBuffer data) { Deserializer deserializer = getDeserializerByHeaders(headers); return deserializer == null ? data : deserializer.deserialize(topic, headers, data); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java index 430c580575..9a2d480ab8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java @@ -68,17 +68,17 @@ public void configure(Map configs, boolean isKey) { } @Override - public T deserialize(String topic, byte[] data) { + public @Nullable T deserialize(String topic, byte[] data) { return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data), this.recoveryCallback); } @Override - public T deserialize(String topic, Headers headers, byte[] data) { + public @Nullable T deserialize(String topic, Headers headers, byte[] data) { return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback); } @Override - public T deserialize(String topic, Headers headers, ByteBuffer data) { + public @Nullable T deserialize(String topic, Headers headers, ByteBuffer data) { return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback); }