Skip to content

Commit 374836e

Browse files
committed
Add AMQP 1.0 publishing test
References rabbitmq/rabbitmq-server#7001
1 parent 26a34b5 commit 374836e

File tree

3 files changed

+254
-2
lines changed

3 files changed

+254
-2
lines changed

ci/start-broker.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ mkdir -p rabbitmq-configuration/tls
1919
cp -R "${PWD}"/tls-gen/basic/result/* rabbitmq-configuration/tls
2020
chmod o+r rabbitmq-configuration/tls/*
2121

22-
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp]." >> rabbitmq-configuration/enabled_plugins
22+
echo "[rabbitmq_stream,rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_amqp1_0]." >> rabbitmq-configuration/enabled_plugins
2323

2424
echo "loopback_users = none
2525
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.impl;
15+
16+
import static com.rabbitmq.stream.impl.TestUtils.ClientFactory;
17+
import static com.rabbitmq.stream.impl.TestUtils.StreamTestInfrastructureExtension;
18+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
import com.rabbitmq.stream.Codec;
23+
import com.rabbitmq.stream.Message;
24+
import com.rabbitmq.stream.OffsetSpecification;
25+
import com.rabbitmq.stream.codec.QpidProtonCodec;
26+
import com.rabbitmq.stream.codec.SwiftMqCodec;
27+
import com.rabbitmq.stream.impl.Client.ClientParameters;
28+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersion;
29+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
30+
import com.rabbitmq.stream.impl.TestUtils.DisabledIfAmqp10NotEnabled;
31+
import com.swiftmq.amqp.AMQPContext;
32+
import com.swiftmq.amqp.v100.client.Connection;
33+
import com.swiftmq.amqp.v100.client.Producer;
34+
import com.swiftmq.amqp.v100.client.QoS;
35+
import com.swiftmq.amqp.v100.client.Session;
36+
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpSequence;
37+
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue;
38+
import com.swiftmq.amqp.v100.generated.messaging.message_format.ApplicationProperties;
39+
import com.swiftmq.amqp.v100.generated.messaging.message_format.Data;
40+
import com.swiftmq.amqp.v100.generated.messaging.message_format.MessageAnnotations;
41+
import com.swiftmq.amqp.v100.generated.messaging.message_format.Properties;
42+
import com.swiftmq.amqp.v100.generated.transport.definitions.SequenceNo;
43+
import com.swiftmq.amqp.v100.messaging.AMQPMessage;
44+
import com.swiftmq.amqp.v100.types.AMQPString;
45+
import com.swiftmq.amqp.v100.types.AMQPSymbol;
46+
import com.swiftmq.amqp.v100.types.AMQPType;
47+
import java.nio.charset.StandardCharsets;
48+
import java.util.ArrayList;
49+
import java.util.Arrays;
50+
import java.util.HashMap;
51+
import java.util.List;
52+
import java.util.Map;
53+
import java.util.concurrent.CountDownLatch;
54+
import java.util.concurrent.atomic.AtomicReference;
55+
import java.util.stream.Collectors;
56+
import org.assertj.core.api.InstanceOfAssertFactories;
57+
import org.assertj.core.api.InstanceOfAssertFactory;
58+
import org.assertj.core.api.ObjectAssert;
59+
import org.junit.jupiter.api.AfterEach;
60+
import org.junit.jupiter.api.BeforeEach;
61+
import org.junit.jupiter.api.Test;
62+
import org.junit.jupiter.api.extension.ExtendWith;
63+
64+
@ExtendWith(StreamTestInfrastructureExtension.class)
65+
@DisabledIfAmqp10NotEnabled
66+
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11_9)
67+
public class Amqp10InteroperabilityTest {
68+
69+
String stream;
70+
ClientFactory cf;
71+
Connection connection;
72+
Session session;
73+
74+
private static InstanceOfAssertFactory<
75+
org.apache.qpid.proton.amqp.messaging.AmqpValue,
76+
ObjectAssert<org.apache.qpid.proton.amqp.messaging.AmqpValue>>
77+
qpidAmqpValue() {
78+
return InstanceOfAssertFactories.type(org.apache.qpid.proton.amqp.messaging.AmqpValue.class);
79+
}
80+
81+
@BeforeEach
82+
void init() throws Exception {
83+
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
84+
connection = new Connection(ctx, "localhost", 5672, "guest", "guest");
85+
86+
connection.connect();
87+
session = connection.createSession(100, 100);
88+
}
89+
90+
@AfterEach
91+
void tearDown() {
92+
connection.close();
93+
}
94+
95+
@Test
96+
void publishToStreamQueueConsumeFromStream() throws Exception {
97+
Producer p = session.createProducer("/amq/queue/" + stream, QoS.AT_LEAST_ONCE);
98+
AMQPMessage message = new AMQPMessage();
99+
Properties properties = new Properties();
100+
properties.setContentType(new AMQPSymbol("text/plain"));
101+
properties.setGroupId(new AMQPString("my-group"));
102+
properties.setGroupSequence(new SequenceNo(42L));
103+
message.setProperties(properties);
104+
105+
Map<AMQPType, AMQPType> applicationProperties = new HashMap<>();
106+
applicationProperties.put(new AMQPString("foo"), new AMQPString("bar"));
107+
message.setApplicationProperties(new ApplicationProperties(applicationProperties));
108+
Map<AMQPType, AMQPType> messageAnnotations = new HashMap<>();
109+
messageAnnotations.put(new AMQPSymbol("x-route"), new AMQPString("dummy"));
110+
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
111+
112+
message.addData(new Data("hello".getBytes(StandardCharsets.UTF_8)));
113+
114+
p.send(message);
115+
p.close();
116+
117+
Message msg = consumeMessage();
118+
119+
assertThat(msg.getBodyAsBinary()).isEqualTo("hello".getBytes(StandardCharsets.UTF_8));
120+
com.rabbitmq.stream.Properties props = msg.getProperties();
121+
assertThat(props.getContentType()).isEqualTo("text/plain");
122+
assertThat(props.getGroupId()).isEqualTo("my-group");
123+
assertThat(props.getGroupSequence()).isEqualTo(42L);
124+
125+
assertThat(msg.getApplicationProperties())
126+
.hasSameSizeAs(applicationProperties)
127+
.containsEntry("foo", "bar");
128+
129+
assertThat(msg.getMessageAnnotations())
130+
.hasSize(messageAnnotations.size() + 2)
131+
.containsEntry("x-route", "dummy")
132+
.containsEntry("x-exchange", "")
133+
.containsEntry("x-routing-key", stream);
134+
}
135+
136+
@Test
137+
void publishAmqpValueToStreamQueueConsumeFromStream() throws Exception {
138+
Producer p = session.createProducer("/amq/queue/" + stream, QoS.AT_LEAST_ONCE);
139+
AMQPMessage message = new AMQPMessage();
140+
message.setAmqpValue(new AmqpValue(new AMQPString("hello")));
141+
p.send(message);
142+
p.close();
143+
144+
Message msg = consumeMessage();
145+
assertThatThrownBy(() -> msg.getBodyAsBinary()).isInstanceOf(IllegalStateException.class);
146+
assertThat(msg.getBody())
147+
.asInstanceOf(qpidAmqpValue())
148+
.matches(v -> v.getValue().equals("hello"));
149+
}
150+
151+
@Test
152+
void publishDataSectionsToStreamQueueConsumeFromStream() throws Exception {
153+
Producer p = session.createProducer("/amq/queue/" + stream, QoS.AT_LEAST_ONCE);
154+
AMQPMessage message = new AMQPMessage();
155+
String body = "hello brave new world";
156+
Arrays.stream(body.split(" "))
157+
.forEach(d -> message.addData(new Data(d.getBytes(StandardCharsets.UTF_8))));
158+
p.send(message);
159+
p.close();
160+
161+
// QPid does not support body with multiple sections, so using SwiftMQ
162+
Message msg = consumeMessage(new SwiftMqCodec());
163+
@SuppressWarnings("unchecked")
164+
String receivedBody =
165+
((List<Data>) msg.getBody())
166+
.stream()
167+
.map(d -> new String(d.getValue(), StandardCharsets.UTF_8))
168+
.collect(Collectors.joining(" "));
169+
assertThat(receivedBody).isEqualTo(receivedBody);
170+
}
171+
172+
@Test
173+
void publishSequenceSectionsToStreamQueueConsumeFromStream() throws Exception {
174+
Producer p = session.createProducer("/amq/queue/" + stream, QoS.AT_LEAST_ONCE);
175+
AMQPMessage message = new AMQPMessage();
176+
List<AMQPType> sequence = new ArrayList<>();
177+
sequence.add(new AMQPString("hello"));
178+
sequence.add(new AMQPString("brave"));
179+
message.addAmqpSequence(new AmqpSequence(sequence));
180+
sequence = new ArrayList<>();
181+
sequence.add(new AMQPString("new"));
182+
sequence.add(new AMQPString("world"));
183+
message.addAmqpSequence(new AmqpSequence(sequence));
184+
p.send(message);
185+
p.close();
186+
187+
Message msg = consumeMessage();
188+
assertThat(msg.getBody())
189+
.isInstanceOf(org.apache.qpid.proton.amqp.messaging.AmqpSequence.class);
190+
org.apache.qpid.proton.amqp.messaging.AmqpSequence body =
191+
(org.apache.qpid.proton.amqp.messaging.AmqpSequence) msg.getBody();
192+
// SwiftMQ does not seem to read any AMQP sequences, and QPid reads only the first one
193+
@SuppressWarnings("unchecked")
194+
List<String> bodyValue = body.getValue();
195+
assertThat(bodyValue).containsExactly("hello", "brave");
196+
}
197+
198+
Message consumeMessage() {
199+
return consumeMessage(new QpidProtonCodec());
200+
}
201+
202+
Message consumeMessage(Codec codec) {
203+
AtomicReference<Message> messageReference = new AtomicReference<>();
204+
CountDownLatch latch = new CountDownLatch(1);
205+
Client client =
206+
cf.get(
207+
new ClientParameters()
208+
.codec(codec)
209+
.chunkListener(
210+
(client1, subscriptionId, offset, messageCount, dataSize) ->
211+
client1.credit(subscriptionId, 1))
212+
.messageListener(
213+
(subscriptionId, offset, chunkTimestamp, committedChunkId, msg) -> {
214+
messageReference.set(msg);
215+
latch.countDown();
216+
}));
217+
client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10);
218+
assertThat(latchAssert(latch)).completes();
219+
return messageReference.get();
220+
}
221+
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,12 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) {
474474
@ExtendWith(DisabledIfStompNotEnabledCondition.class)
475475
@interface DisabledIfStompNotEnabled {}
476476

477+
@Target({ElementType.TYPE, ElementType.METHOD})
478+
@Retention(RetentionPolicy.RUNTIME)
479+
@Documented
480+
@ExtendWith(DisabledIfAmqp10NotEnabledCondition.class)
481+
@interface DisabledIfAmqp10NotEnabled {}
482+
477483
@Target({ElementType.TYPE, ElementType.METHOD})
478484
@Retention(RetentionPolicy.RUNTIME)
479485
@Documented
@@ -749,6 +755,30 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
749755
}
750756
}
751757

758+
static class DisabledIfAmqp10NotEnabledCondition implements ExecutionCondition {
759+
760+
@Override
761+
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
762+
if (Host.rabbitmqctlCommand() == null) {
763+
return ConditionEvaluationResult.disabled(
764+
"rabbitmqctl.bin system property not set, cannot check if STOMP plugin is enabled");
765+
} else {
766+
try {
767+
Process process = Host.rabbitmqctl("status");
768+
String output = capture(process.getInputStream());
769+
if (output.contains("rabbitmq_amqp1_0") && output.contains("AMQP 1.0")) {
770+
return ConditionEvaluationResult.enabled("STOMP plugin enabled");
771+
} else {
772+
return ConditionEvaluationResult.disabled("STOMP plugin disabled");
773+
}
774+
} catch (Exception e) {
775+
return ConditionEvaluationResult.disabled(
776+
"Error while trying to detect STOMP plugin: " + e.getMessage());
777+
}
778+
}
779+
}
780+
}
781+
752782
static class DisabledIfTlsNotEnabledCondition implements ExecutionCondition {
753783

754784
@Override
@@ -907,7 +937,8 @@ static void waitMs(long waitTime) {
907937

908938
public enum BrokerVersion {
909939
RABBITMQ_3_11("3.11.0"),
910-
RABBITMQ_3_11_7("3.11.7");
940+
RABBITMQ_3_11_7("3.11.7"),
941+
RABBITMQ_3_11_9("3.11.9");
911942

912943
final String value;
913944

0 commit comments

Comments
 (0)