diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a3879db..0d68b12 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: services: rabbitmq: - image: rabbitmq:4.1.1-management + image: rabbitmq:4.2.0-beta.3-management options: --hostname test-node --name test-node env: RABBITMQ_HOSTNAME: "test-node" diff --git a/docker-compose.yml b/docker-compose.yml index d756f70..f350554 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: rabbitmq-js-client: - image: rabbitmq:4.1.1-management + image: rabbitmq:4.2.0-beta.3-management container_name: rabbitmq-js-client restart: unless-stopped hostname: "rabbitmq" diff --git a/src/consumer.ts b/src/consumer.ts index c50bf9b..208a952 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -8,6 +8,9 @@ import { EventContext, Message, Dictionary, + types, + Typed, + MessageProperties, } from "rhea" import { Offset, @@ -15,6 +18,9 @@ import { STREAM_FILTER_MATCH_UNFILTERED, STREAM_FILTER_SPEC, STREAM_OFFSET_SPEC, + STREAM_FILTER_SQL, + STREAM_FILTER_MESSAGE_PROPERTIES, + STREAM_FILTER_APPLICATION_PROPERTIES, } from "./utils.js" import { openLink } from "./rhea_wrapper.js" import { createConsumerAddressFrom } from "./message.js" @@ -27,6 +33,9 @@ export type StreamOptions = { name: string offset?: Offset filterValues?: string[] + messagePropertiesFilter?: MessageProperties + applicationPropertiesFilter?: Dictionary + sqlFilter?: string matchUnfiltered?: boolean } @@ -125,16 +134,27 @@ function createConsumerFilterFrom(params: CreateConsumerParams): SourceFilter | throw new Error("At least one between offset and filterValues must be set when using filtering") } - const filters: Dictionary = {} + const filters: Dictionary = {} if (params.stream.offset) { filters[STREAM_OFFSET_SPEC] = params.stream.offset.toValue() } if (params.stream.filterValues) { filters[STREAM_FILTER_SPEC] = params.stream.filterValues } + if (params.stream.sqlFilter) { + filters[STREAM_FILTER_SQL] = types.wrap_described(params.stream.sqlFilter, 0x120) + } if (params.stream.matchUnfiltered) { filters[STREAM_FILTER_MATCH_UNFILTERED] = params.stream.matchUnfiltered } + if (params.stream.messagePropertiesFilter) { + const symbolicMap = types.wrap_symbolic_map(params.stream.messagePropertiesFilter) + filters[STREAM_FILTER_MESSAGE_PROPERTIES] = types.wrap_described(symbolicMap, 0x173) + } + if (params.stream.applicationPropertiesFilter) { + const map = types.wrap_map(params.stream.applicationPropertiesFilter) + filters[STREAM_FILTER_APPLICATION_PROPERTIES] = types.wrap_described(map, 0x174) + } return filters } diff --git a/src/message.ts b/src/message.ts index ce4d38e..b9181fc 100644 --- a/src/message.ts +++ b/src/message.ts @@ -1,4 +1,4 @@ -import { generate_uuid, MessageAnnotations, Message as RheaMessage } from "rhea" +import { Dictionary, generate_uuid, MessageAnnotations, MessageProperties, Message as RheaMessage } from "rhea" import { AmqpEndpoints } from "./link_message_builder.js" import { inspect } from "util" import { CreateConsumerParams } from "./consumer.js" @@ -18,6 +18,8 @@ type MessageOptions = { body: string destination?: DestinationOptions annotations?: MessageAnnotations + message_properties?: MessageProperties + application_properties?: Dictionary } export function createAmqpMessage(options: MessageOptions): RheaMessage { @@ -28,10 +30,19 @@ export function createAmqpMessage(options: MessageOptions): RheaMessage { to: createPublisherAddressFrom(options.destination), durable: true, message_annotations: options.annotations, + application_properties: options.application_properties, + ...(options.message_properties ? options.message_properties : {}), } } - return { message_id: generate_uuid(), body: options.body, durable: true, message_annotations: options.annotations } + return { + message_id: generate_uuid(), + body: options.body, + durable: true, + message_annotations: options.annotations, + application_properties: options.application_properties, + ...(options.message_properties ? options.message_properties : {}), + } } export function createPublisherAddressFrom(options?: DestinationOptions): string | undefined { diff --git a/src/utils.ts b/src/utils.ts index 6801e54..f7308cf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,4 @@ -import { Dictionary, Message } from "rhea" +import { Dictionary, Message, Typed } from "rhea" import { QueueType } from "./queue.js" export enum AmqpResponseCodes { @@ -20,11 +20,14 @@ export const DURABLE = 1 export const AUTO_DELETE = 1 export const EXCLUSIVE = 1 -export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter" export const STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" +export const STREAM_FILTER_SPEC = "rabbitmq:stream-filter" export const STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" +export const STREAM_FILTER_MESSAGE_PROPERTIES = "properties-filter" +export const STREAM_FILTER_APPLICATION_PROPERTIES = "application-properties-filter" +export const STREAM_FILTER_SQL = "sql-filter" -export type SourceFilter = Dictionary +export type SourceFilter = Dictionary export type Result = OkResult | ErrorResult diff --git a/test/e2e/consumer.test.ts b/test/e2e/consumer.test.ts index bfb2c6b..4d56f9b 100644 --- a/test/e2e/consumer.test.ts +++ b/test/e2e/consumer.test.ts @@ -202,6 +202,158 @@ describe("Consumer", () => { }) }) + test("consumer can handle message on stream with filter on message properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + message_properties: { + subject: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + message_properties: { + subject: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: number = 0 + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + messagePropertiesFilter: { subject: "foo" }, + }, + messageHandler: (context) => { + received++ + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql(1) + }) + }) + + test("consumer can handle message on stream with filter on application properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + application_properties: { + test: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + application_properties: { + test: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: number = 0 + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + applicationPropertiesFilter: { test: "foo" }, + }, + messageHandler: (context) => { + received++ + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql(1) + }) + }) + + test("consumer can handle message on stream with SQL filters on message properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + message_properties: { + subject: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + message_properties: { + subject: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: string = "" + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + sqlFilter: "properties.subject = 'foo'", + }, + messageHandler: (context, message) => { + if (message.subject && message.subject == "foo") { + received = message.body + } + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql("my body") + }) + }) + + test("consumer can handle message on stream with SQL filters on message application properties", async () => { + const publisher = await connection.createPublisher({ queue: { name: streamName } }) + const filteredMessage = createAmqpMessage({ + body: "my body", + application_properties: { + test: "foo", + }, + }) + const discardedMessage = createAmqpMessage({ + body: "discard me", + application_properties: { + test: "bar", + }, + }) + await publisher.publish(filteredMessage) + await publisher.publish(discardedMessage) + let received: string = "" + + const consumer = await connection.createConsumer({ + stream: { + name: streamName, + offset: Offset.first(), + matchUnfiltered: false, + sqlFilter: "application_properties.test = 'foo'", + }, + messageHandler: (context, message) => { + if (message.application_properties && message.application_properties.test == "foo") { + received = message.body + } + context.accept() + }, + }) + consumer.start() + + await eventually(() => { + expect(received).to.be.eql("my body") + }) + }) + test("consumer can discard a message published to a queue", async () => { const publisher = await connection.createPublisher({ queue: { name: discardQueueName } }) const expectedBody = "ciao"