Skip to content

Commit c6ac811

Browse files
committed
Increase timeout in the ReactiveStreamsTests
* Use `ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 12` for the `MessageDrivenAdapterTests.testInboundBatch()` to ensure that both published records are received in the batch
1 parent 443eb93 commit c6ac811

File tree

2 files changed

+6
-51
lines changed

2 files changed

+6
-51
lines changed

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void testPollableReactiveFlow() throws Exception {
154154
.take(7)
155155
.map(Message::getPayload)
156156
.collectList()
157-
.block(Duration.ofSeconds(20))
157+
.block(Duration.ofSeconds(30))
158158
);
159159

160160
this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 5 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.kafka.common.TopicPartition;
3939
import org.apache.kafka.common.header.Headers;
4040
import org.apache.kafka.common.header.internals.RecordHeaders;
41+
import org.assertj.core.api.InstanceOfAssertFactories;
4142
import org.junit.jupiter.api.Test;
4243

4344
import org.springframework.core.retry.RetryListener;
@@ -396,6 +397,8 @@ protected boolean doSend(Message<?> message, long timeout) {
396397
void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception {
397398
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", true);
398399
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
400+
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 12);
401+
399402
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
400403
ContainerProperties containerProps = new ContainerProperties(topic2);
401404
containerProps.setIdleEventInterval(100L);
@@ -411,7 +414,6 @@ void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception {
411414
adapter.setOnPartitionsAssignedSeekCallback((map, consumer) -> onPartitionsAssignedCalledLatch.countDown());
412415
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
413416
adapter.afterPropertiesSet();
414-
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
415417
adapter.setBatchMessageConverter(new BatchMessagingMessageConverter() {
416418

417419
@Override
@@ -436,9 +438,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
436438
Message<?> received = out.receive(10000);
437439
assertThat(received).isNotNull();
438440
Object payload = received.getPayload();
439-
assertThat(payload).isInstanceOf(List.class);
440-
List<?> list = (List<?>) payload;
441-
assertThat(list.size()).isGreaterThan(0);
441+
assertThat(payload).asInstanceOf(InstanceOfAssertFactories.LIST).hasSize(2);
442442

443443
MessageHeaders headers = received.getHeaders();
444444
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(Arrays.asList(1, 1));
@@ -632,52 +632,7 @@ void testPauseResume() throws Exception {
632632
adapter.stop();
633633
}
634634

635-
static class Foo {
636-
637-
private String bar;
638-
639-
Foo() {
640-
}
641-
642-
Foo(String bar) {
643-
this.bar = bar;
644-
}
645-
646-
protected String getBar() {
647-
return this.bar;
648-
}
649-
650-
protected void setBar(String bar) {
651-
this.bar = bar;
652-
}
653-
654-
@Override
655-
public int hashCode() {
656-
final int prime = 31;
657-
int result = 1;
658-
result = prime * result + ((this.bar == null) ? 0 : this.bar.hashCode());
659-
return result;
660-
}
661-
662-
@Override
663-
public boolean equals(Object obj) {
664-
if (this == obj) {
665-
return true;
666-
}
667-
if (obj == null) {
668-
return false;
669-
}
670-
if (getClass() != obj.getClass()) {
671-
return false;
672-
}
673-
Foo other = (Foo) obj;
674-
if (this.bar == null) {
675-
return other.bar == null;
676-
}
677-
else {
678-
return this.bar.equals(other.bar);
679-
}
680-
}
635+
record Foo(String bar) {
681636

682637
}
683638

0 commit comments

Comments
 (0)