Skip to content

Commit 13652e7

Browse files
committed
update RecordFilterStrategy to use stream.filter
1 parent 665a345 commit 13652e7

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@
173173
* @author Sanghyeok An
174174
* @author Christian Fredriksson
175175
* @author Timofey Barabanov
176+
* @author Janek Lasocki-Biczysko
176177
*/
177178
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
178179
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
*
3333
* @author Gary Russell
3434
* @author Sanghyeok An
35+
* @author Janek Lasocki-Biczysko
3536
*/
3637
public interface RecordFilterStrategy<K, V> {
3738

@@ -50,8 +51,7 @@ public interface RecordFilterStrategy<K, V> {
5051
* @since 2.8
5152
*/
5253
default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> records) {
53-
records.removeIf(this::filter);
54-
return records;
54+
return records.stream().filter(record -> !this.filter(record)).toList();
5555
}
5656

5757
/**

0 commit comments

Comments
 (0)