|
13 | 13 | |
14 | 14 | package com.rabbitmq.stream.impl; |
15 | 15 |
|
| 16 | +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok; |
16 | 17 | import static com.rabbitmq.stream.impl.TestUtils.b; |
| 18 | +import static com.rabbitmq.stream.impl.TestUtils.latchAssert; |
17 | 19 | import static com.rabbitmq.stream.impl.TestUtils.streamName; |
| 20 | +import static java.util.Collections.singletonList; |
18 | 21 | import static java.util.concurrent.TimeUnit.SECONDS; |
19 | 22 | import static org.assertj.core.api.Assertions.assertThat; |
20 | 23 |
|
|
27 | 30 | import com.rabbitmq.stream.amqp.UnsignedByte; |
28 | 31 | import com.rabbitmq.stream.codec.QpidProtonCodec; |
29 | 32 | import com.rabbitmq.stream.codec.SwiftMqCodec; |
| 33 | +import com.rabbitmq.stream.impl.Client.ClientParameters; |
| 34 | +import com.rabbitmq.stream.impl.Client.Response; |
| 35 | +import com.rabbitmq.stream.impl.TestUtils.BrokerVersion; |
| 36 | +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; |
30 | 37 | import java.nio.charset.Charset; |
31 | 38 | import java.nio.charset.StandardCharsets; |
32 | 39 | import java.util.*; |
33 | 40 | import java.util.concurrent.ConcurrentHashMap; |
34 | 41 | import java.util.concurrent.CountDownLatch; |
| 42 | +import java.util.concurrent.atomic.AtomicReference; |
35 | 43 | import java.util.function.Consumer; |
36 | 44 | import java.util.function.Supplier; |
37 | 45 | import java.util.stream.IntStream; |
@@ -469,7 +477,7 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) { |
469 | 477 | messageOperation -> |
470 | 478 | messageOperation.messageBuilderConsumer.accept(messageBuilder)); |
471 | 479 |
|
472 | | - client.publish(b(1), Collections.singletonList(messageBuilder.build())); |
| 480 | + client.publish(b(1), singletonList(messageBuilder.build())); |
473 | 481 | }); |
474 | 482 |
|
475 | 483 | try (Connection c = connectionFactory.newConnection()) { |
@@ -517,6 +525,37 @@ void publishToStreamConsumeFromStreamQueue(Codec codec, TestInfo info) { |
517 | 525 | }); |
518 | 526 | } |
519 | 527 |
|
| 528 | + @ParameterizedTest |
| 529 | + @MethodSource("codecs") |
| 530 | + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_7) |
| 531 | + void messageWithEmptyBodyAndPropertiesShouldBeConvertedInAmqp(Codec codec) throws Exception { |
| 532 | + Client client = cf.get(new ClientParameters().codec(codec)); |
| 533 | + Response response = client.declarePublisher(b(1), null, stream); |
| 534 | + assertThat(response).is(ok()); |
| 535 | + Message message = codec.messageBuilder().properties().messageId(1L).messageBuilder().build(); |
| 536 | + client.publish(b(1), singletonList(message)); |
| 537 | + |
| 538 | + CountDownLatch consumeLatch = new CountDownLatch(1); |
| 539 | + AtomicReference<Delivery> delivery = new AtomicReference<>(); |
| 540 | + ConnectionFactory cf = new ConnectionFactory(); |
| 541 | + try (Connection c = cf.newConnection()) { |
| 542 | + Channel ch = c.createChannel(); |
| 543 | + ch.basicQos(10); |
| 544 | + ch.basicConsume( |
| 545 | + stream, |
| 546 | + false, |
| 547 | + Collections.singletonMap("x-stream-offset", "first"), |
| 548 | + (consumerTag, message1) -> { |
| 549 | + ch.basicAck(message1.getEnvelope().getDeliveryTag(), false); |
| 550 | + delivery.set(message1); |
| 551 | + consumeLatch.countDown(); |
| 552 | + }, |
| 553 | + consumerTag -> {}); |
| 554 | + |
| 555 | + assertThat(latchAssert(consumeLatch)).completes(); |
| 556 | + } |
| 557 | + } |
| 558 | + |
520 | 559 | private static class PropertiesTestConfiguration { |
521 | 560 | final Consumer<AMQP.BasicProperties.Builder> builder; |
522 | 561 | final Consumer<Message> assertion; |
|
0 commit comments