Skip to content

Commit 14e385e

Browse files
committed
turns the simplest approach works best
1 parent 2e3e2a9 commit 14e385e

File tree

2 files changed

+6
-16
lines changed

2 files changed

+6
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.lang.reflect.Array;
2019
import java.nio.ByteBuffer;
2120
import java.time.Duration;
2221
import java.util.AbstractMap.SimpleEntry;
@@ -2239,15 +2238,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
22392238
}
22402239

22412240
private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
2242-
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
2243-
@SuppressWarnings("unchecked") ConsumerRecord<K, V>[] recordsArray =
2244-
(ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count());
2245-
int index = 0;
2246-
while (iterator.hasNext()) {
2247-
recordsArray[index] = iterator.next();
2248-
index += 1;
2249-
}
2250-
return Arrays.asList(recordsArray);
2241+
List<ConsumerRecord<K, V>> recordList = new ArrayList<>(records.count());
2242+
records.forEach(recordList::add);
2243+
return recordList;
22512244
}
22522245

22532246
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordFilterStrategy.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2025 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19-
import java.util.Arrays;
2019
import java.util.List;
2120

2221
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -33,7 +32,6 @@
3332
*
3433
* @author Gary Russell
3534
* @author Sanghyeok An
36-
* @author Janek Lasocki-Biczysko
3735
*/
3836
public interface RecordFilterStrategy<K, V> {
3937

@@ -51,10 +49,9 @@ public interface RecordFilterStrategy<K, V> {
5149
* @return the filtered records.
5250
* @since 2.8
5351
*/
54-
@SuppressWarnings("unchecked")
5552
default List<ConsumerRecord<K, V>> filterBatch(List<ConsumerRecord<K, V>> records) {
56-
var recordsArray = records.stream().filter(record -> !this.filter(record)).toArray(ConsumerRecord[]::new);
57-
return Arrays.asList(recordsArray);
53+
records.removeIf(this::filter);
54+
return records;
5855
}
5956

6057
/**

0 commit comments

Comments
 (0)