From aa2fcef8d00c58694e6e2aa41ea14668073e67d9 Mon Sep 17 00:00:00 2001 From: Andrea Pietroni Date: Fri, 19 Sep 2025 15:54:45 +0200 Subject: [PATCH 1/6] update docker version --- .github/workflows/main.yml | 2 +- docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a3879db..0d68b12 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: services: rabbitmq: - image: rabbitmq:4.1.1-management + image: rabbitmq:4.2.0-beta.3-management options: --hostname test-node --name test-node env: RABBITMQ_HOSTNAME: "test-node" diff --git a/docker-compose.yml b/docker-compose.yml index d756f70..f350554 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: rabbitmq-js-client: - image: rabbitmq:4.1.1-management + image: rabbitmq:4.2.0-beta.3-management container_name: rabbitmq-js-client restart: unless-stopped hostname: "rabbitmq" From 65fbef551d041d230b6945bacfa0cf968573d667 Mon Sep 17 00:00:00 2001 From: Andrea Pietroni Date: Fri, 19 Sep 2025 15:55:38 +0200 Subject: [PATCH 2/6] added application properties to message --- src/message.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/message.ts b/src/message.ts index ce4d38e..b944fcf 100644 --- a/src/message.ts +++ b/src/message.ts @@ -1,4 +1,4 @@ -import { generate_uuid, MessageAnnotations, Message as RheaMessage } from "rhea" +import { generate_uuid, MessageAnnotations, MessageProperties, Message as RheaMessage } from "rhea" import { AmqpEndpoints } from "./link_message_builder.js" import { inspect } from "util" import { CreateConsumerParams } from "./consumer.js" @@ -18,6 +18,7 @@ type MessageOptions = { body: string destination?: DestinationOptions annotations?: MessageAnnotations + properties?: MessageProperties } export function createAmqpMessage(options: MessageOptions): RheaMessage { @@ -28,10 +29,11 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage { to: createPublisherAddressFrom(options.destination), durable: true, message_annotations: options.annotations, + application_properties: options.properties, } } - return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations } + return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations, application_properties: options.properties, } } export function createPublisherAddressFrom(options?: DestinationOptions): string | undefined { From e003faec53ff6a6e606fc4c466a106bd1407c758 Mon Sep 17 00:00:00 2001 From: Andrea Pietroni Date: Fri, 19 Sep 2025 15:56:06 +0200 Subject: [PATCH 3/6] including sql filter --- src/consumer.ts | 10 +++++++++- src/utils.ts | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/consumer.ts b/src/consumer.ts index c50bf9b..16a7dce 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -15,6 +15,7 @@ import { STREAM_FILTER_MATCH_UNFILTERED, STREAM_FILTER_SPEC, STREAM_OFFSET_SPEC, + STREAM_SQL_FILTER, } from "./utils.js" import { openLink } from "./rhea_wrapper.js" import { createConsumerAddressFrom } from "./message.js" @@ -27,6 +28,7 @@ export type StreamOptions = { name: string offset?: Offset filterValues?: string[] + sqlFilter?: string matchUnfiltered?: boolean } @@ -51,7 +53,10 @@ const getConsumerReceiverLinkConfigurationFrom = ( timeout: 0, dynamic: false, durable: 0, - filter, + filter: { + selector: (s: string) => filter ? filter[s] : undefined, + ...filter + }, }, }) @@ -132,6 +137,9 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | if (params.stream.filterValues) { filters[STREAM_FILTER_SPEC] = params.stream.filterValues } + if (params.stream.sqlFilter) { + filters[STREAM_SQL_FILTER] = params.stream.sqlFilter + } if (params.stream.matchUnfiltered) { filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered } diff --git a/src/utils.ts b/src/utils.ts index 6801e54..34dff13 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -23,6 +23,7 @@ export const EXCLUSIVE = 1 export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter" export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" +export const STREAM_SQL_FILTER = "amqp:sql-filter" export type SourceFilter = Dictionary From 44403d61db6b458686ab672a0f550706b79c7087 Mon Sep 17 00:00:00 2001 From: Andrea Pietroni Date: Fri, 19 Sep 2025 15:56:13 +0200 Subject: [PATCH 4/6] test --- test/e2e/consumer.test.ts | 46 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/e2e/consumer.test.ts b/test/e2e/consumer.test.ts index bfb2c6b..24c7a7e 100644 --- a/test/e2e/consumer.test.ts +++ b/test/e2e/consumer.test.ts @@ -202,6 +202,52 @@ describe("Consumer", () => { }) }) + + + test("consumer can handle message on stream with sql filters", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + properties: { + subject: "foo" + } + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + properties: { + subject: "bar" + } + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: string = "" + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + sqlFilter: "properties.subject = '123'", + }, + messageHandler: (context, message) => { + console.log("message", message.application_properties) + if ( + message.application_properties + && message.application_properties.subject == "foo" + ) { + console.log("sono qui ") + received = message.body + } + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql("my body") + }) + }) + test("consumer can discard a message published to a queue", async () => { const publisher = await connection.createPublisher({ queue: { name: discardQueueName } }) const expectedBody = "ciao" From 4765548a5d44fd8668c7cca9b6515fe738fdb76f Mon Sep 17 00:00:00 2001 From: magne Date: Mon, 29 Sep 2025 09:03:04 +0200 Subject: [PATCH 5/6] feat: add filters --- src/consumer.ts | 12 ++++++------ src/message.ts | 17 +++++++++++++---- test/e2e/consumer.test.ts | 23 +++++++++-------------- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index 16a7dce..63b6522 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -8,6 +8,7 @@ import { EventContext, Message, Dictionary, + filter, } from "rhea" import { Offset, @@ -53,10 +54,7 @@ const getConsumerReceiverLinkConfigurationFrom = ( timeout: 0, dynamic: false, durable: 0, - filter: { - selector: (s: string) => filter ? filter[s] : undefined, - ...filter - }, + filter, }, }) @@ -130,7 +128,7 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | throw new Error("At least one between offset and filterValues must be set when using filtering") } - const filters: Dictionary = {} + const filters: Dictionary = {} if (params.stream.offset) { filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue() } @@ -138,7 +136,9 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | filters[STREAM_FILTER_SPEC] = params.stream.filterValues } if (params.stream.sqlFilter) { - filters[STREAM_SQL_FILTER] = params.stream.sqlFilter + console.log(filter.selector(`Described(${String(Symbol("amqp:sql-filter"))}, ${params.stream.sqlFilter})`)) + const { descriptor, value } = filter.selector(params.stream.sqlFilter)["jms-selector"] + filters["sql-filter"] = { descriptor: descriptor.value, value } } if (params.stream.matchUnfiltered) { filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered diff --git a/src/message.ts b/src/message.ts index b944fcf..b9181fc 100644 --- a/src/message.ts +++ b/src/message.ts @@ -1,4 +1,4 @@ -import { generate_uuid, MessageAnnotations, MessageProperties, Message as RheaMessage } from "rhea" +import { Dictionary, generate_uuid, MessageAnnotations, MessageProperties, Message as RheaMessage } from "rhea" import { AmqpEndpoints } from "./link_message_builder.js" import { inspect } from "util" import { CreateConsumerParams } from "./consumer.js" @@ -18,7 +18,8 @@ type MessageOptions = { body: string destination?: DestinationOptions annotations?: MessageAnnotations - properties?: MessageProperties + message_properties?: MessageProperties + application_properties?: Dictionary } export function createAmqpMessage(options: MessageOptions): RheaMessage { @@ -29,11 +30,19 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage { to: createPublisherAddressFrom(options.destination), durable: true, message_annotations: options.annotations, - application_properties: options.properties, + application_properties: options.application_properties, + ...(options.message_properties ? options.message_properties : {}), } } - return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations, application_properties: options.properties, } + return { + message_id: generate_uuid(), + body: options.body, + durable: true, + message_annotations: options.annotations, + application_properties: options.application_properties, + ...(options.message_properties ? options.message_properties : {}), + } } export function createPublisherAddressFrom(options?: DestinationOptions): string | undefined { diff --git a/test/e2e/consumer.test.ts b/test/e2e/consumer.test.ts index 24c7a7e..3e2cace 100644 --- a/test/e2e/consumer.test.ts +++ b/test/e2e/consumer.test.ts @@ -202,21 +202,19 @@ describe("Consumer", () => { }) }) - - test("consumer can handle message on stream with sql filters", async () => { const publisher = await connection.createPublisher({ queue: { name: streamName } }) const filteredMessage = createAmqpMessage({ body: "my body", - properties: { - subject: "foo" - } + message_properties: { + subject: "foo", + }, }) const discardedMessage = createAmqpMessage({ body: "discard me", - properties: { - subject: "bar" - } + message_properties: { + subject: "bar", + }, }) await publisher.publish(filteredMessage) await publisher.publish(discardedMessage) @@ -230,12 +228,9 @@ describe("Consumer", () => { sqlFilter: "properties.subject = '123'", }, messageHandler: (context, message) => { - console.log("message", message.application_properties) - if ( - message.application_properties - && message.application_properties.subject == "foo" - ) { - console.log("sono qui ") + console.log("message", message.subject) + if (message.subject && message.subject == "foo") { + console.log("hello ") received = message.body } context.accept() From 86f7d794b63268e4bd9872ee42a9d41bea39f3c3 Mon Sep 17 00:00:00 2001 From: magne Date: Wed, 1 Oct 2025 17:20:25 +0200 Subject: [PATCH 6/6] feat: sql filters, properties filter, application properties filter --- src/consumer.ts | 24 ++++++-- src/utils.ts | 10 ++-- test/e2e/consumer.test.ts | 119 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 139 insertions(+), 14 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index 63b6522..208a952 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -8,7 +8,9 @@ import { EventContext, Message, Dictionary, - filter, + types, + Typed, + MessageProperties, } from "rhea" import { Offset, @@ -16,7 +18,9 @@ import { STREAM_FILTER_MATCH_UNFILTERED, STREAM_FILTER_SPEC, STREAM_OFFSET_SPEC, - STREAM_SQL_FILTER, + STREAM_FILTER_SQL, + STREAM_FILTER_MESSAGE_PROPERTIES, + STREAM_FILTER_APPLICATION_PROPERTIES, } from "./utils.js" import { openLink } from "./rhea_wrapper.js" import { createConsumerAddressFrom } from "./message.js" @@ -29,6 +33,8 @@ export type StreamOptions = { name: string offset?: Offset filterValues?: string[] + messagePropertiesFilter?: MessageProperties + applicationPropertiesFilter?: Dictionary sqlFilter?: string matchUnfiltered?: boolean } @@ -128,7 +134,7 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | throw new Error("At least one between offset and filterValues must be set when using filtering") } - const filters: Dictionary = {} + const filters: Dictionary = {} if (params.stream.offset) { filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue() } @@ -136,13 +142,19 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | filters[STREAM_FILTER_SPEC] = params.stream.filterValues } if (params.stream.sqlFilter) { - console.log(filter.selector(`Described(${String(Symbol("amqp:sql-filter"))}, ${params.stream.sqlFilter})`)) - const { descriptor, value } = filter.selector(params.stream.sqlFilter)["jms-selector"] - filters["sql-filter"] = { descriptor: descriptor.value, value } + filters[STREAM_FILTER_SQL] = types.wrap_described(params.stream.sqlFilter, 0x120) } if (params.stream.matchUnfiltered) { filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered } + if (params.stream.messagePropertiesFilter) { + const symbolicMap = types.wrap_symbolic_map(params.stream.messagePropertiesFilter) + filters[STREAM_FILTER_MESSAGE_PROPERTIES] = types.wrap_described(symbolicMap, 0x173) + } + if (params.stream.applicationPropertiesFilter) { + const map = types.wrap_map(params.stream.applicationPropertiesFilter) + filters[STREAM_FILTER_APPLICATION_PROPERTIES] = types.wrap_described(map, 0x174) + } return filters } diff --git a/src/utils.ts b/src/utils.ts index 34dff13..f7308cf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,4 @@ -import { Dictionary, Message } from "rhea" +import { Dictionary, Message, Typed } from "rhea" import { QueueType } from "./queue.js" export enum AmqpResponseCodes { @@ -20,12 +20,14 @@ export const DURABLE = 1 export const AUTO_DELETE = 1 export const EXCLUSIVE = 1 -export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter" export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" +export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter" export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" -export const STREAM_SQL_FILTER = "amqp:sql-filter" +export const STREAM_FILTER_MESSAGE_PROPERTIES = "properties-filter" +export const STREAM_FILTER_APPLICATION_PROPERTIES = "application-properties-filter" +export const STREAM_FILTER_SQL = "sql-filter" -export type SourceFilter = Dictionary +export type SourceFilter = Dictionary export type Result = OkResult | ErrorResult diff --git a/test/e2e/consumer.test.ts b/test/e2e/consumer.test.ts index 3e2cace..4d56f9b 100644 --- a/test/e2e/consumer.test.ts +++ b/test/e2e/consumer.test.ts @@ -202,7 +202,81 @@ describe("Consumer", () => { }) }) - test("consumer can handle message on stream with sql filters", async () => { + test("consumer can handle message on stream with filter on message properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + message_properties: { + subject: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + message_properties: { + subject: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: number = 0 + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + messagePropertiesFilter: { subject: "foo" }, + }, + messageHandler: (context) => { + received++ + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql(1) + }) + }) + + test("consumer can handle message on stream with filter on application properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + application_properties: { + test: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + application_properties: { + test: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: number = 0 + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + applicationPropertiesFilter: { test: "foo" }, + }, + messageHandler: (context) => { + received++ + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql(1) + }) + }) + + test("consumer can handle message on stream with SQL filters on message properties", async () => { const publisher = await connection.createPublisher({ queue: { name: streamName } }) const filteredMessage = createAmqpMessage({ body: "my body", @@ -225,12 +299,49 @@ describe("Consumer", () => { name: streamName, offset: Offset.first(), matchUnfiltered: false, - sqlFilter: "properties.subject = '123'", + sqlFilter: "properties.subject = 'foo'", }, messageHandler: (context, message) => { - console.log("message", message.subject) if (message.subject && message.subject == "foo") { - console.log("hello ") + received = message.body + } + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql("my body") + }) + }) + + test("consumer can handle message on stream with SQL filters on message application properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + application_properties: { + test: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + application_properties: { + test: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: string = "" + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + sqlFilter: "application_properties.test = 'foo'", + }, + messageHandler: (context, message) => { + if (message.application_properties && message.application_properties.test == "foo") { received = message.body } context.accept()