-
Notifications
You must be signed in to change notification settings - Fork 4k
Closed
Description
Implement support non-data sections sent from AMQP 1.0 to stream.
How to reproduce the Issue:
Send a message from an AMQP 1.0 ex: amqpnetlite with the AMQP 1.0 enabled.
var address = new Address("amqp://guest:guest@localhost:5672");
var connection = new Connection(address);
var session = new Session(connection);
Message message = new Message("A");
// stream queue
var sender = new SenderLink(session, "mixing", "/amq/queue/my_stream_queue");
sender.Send(message);The data section contains an AMQPValue field. So a client (ex: AMQP or Stream) reads the ApplicationData Frame but it should be an AMQPValue.
for example:
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_qos(prefetch_count=100) # mandatory
q_name = "my_stream_queue"
for method, properties, body in channel.consume(q_name, inactivity_timeout=2, auto_ack=False, arguments={
'x-stream-offset': 'first'
}):
if channel.is_open:
print('{} {} {}'.format(body, properties, method))
try:
channel.basic_ack(method.delivery_tag)
except AttributeError as e:
break
requeued_messages = channel.cancel()
# Close the channel and the connection
channel.close()
connection.close()result:
$ python3 consumer.py
b'\x00Sw\xa1\x01A'
The first 5 bytes are the AMQPValue
We should improve the rabbit_msg_record.
Metadata
Metadata
Assignees
Labels
No labels