From f7dea58362775ead6f3c978137832544959f24be Mon Sep 17 00:00:00 2001 From: magne Date: Fri, 24 Oct 2025 17:09:04 +0200 Subject: [PATCH 1/4] feat: add new super stream consumer handle --- src/super_stream_consumer.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 87fac430..6941e115 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -1,8 +1,11 @@ import { Client } from "./client" -import { Consumer, ConsumerFunc } from "./consumer" +import { Consumer } from "./consumer" import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy" +import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" +type SuperStreamConsumerFunc = (msg: Message, consumer?: Consumer) => Promise | void + export class SuperStreamConsumer { private consumers: Map = new Map() public consumerRef: string @@ -13,7 +16,7 @@ export class SuperStreamConsumer { private creditPolicy: ConsumerCreditPolicy private constructor( - readonly handle: ConsumerFunc, + readonly handle: SuperStreamConsumerFunc, params: { superStream: string locator: Client @@ -42,7 +45,7 @@ export class SuperStreamConsumer { singleActive: true, creditPolicy: this.creditPolicy, }, - this.handle, + (msg) => this.handle(msg, this.consumers.get(p)), this ) this.consumers.set(p, partitionConsumer) @@ -52,7 +55,7 @@ export class SuperStreamConsumer { } static async create( - handle: ConsumerFunc, + handle: SuperStreamConsumerFunc, params: { superStream: string locator: Client From ee11759472d24652c64da63b5ff31fa01d6a761f Mon Sep 17 00:00:00 2001 From: magne Date: Thu, 30 Oct 2025 15:58:35 +0100 Subject: [PATCH 2/4] fix: export on super stream consumer handle --- example/package-lock.json | 6 +++--- src/client.ts | 4 ++-- src/super_stream_consumer.ts | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/example/package-lock.json b/example/package-lock.json index b7777d66..9162920b 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.6.1", + "version": "0.6.2", "license": "ISC", "dependencies": { "semver": "^7.5.4" @@ -51,7 +51,7 @@ "eslint-plugin-prettier": "^5.1.3", "got": "^11.8.5", "mocha": "^10.2.0", - "ts-node": "^10.9.1", + "tsx": "^4.20.3", "typescript": "^5.3.3", "winston": "^3.8.2" } @@ -268,7 +268,7 @@ "got": "^11.8.5", "mocha": "^10.2.0", "semver": "^7.5.4", - "ts-node": "^10.9.1", + "tsx": "^4.20.3", "typescript": "^5.3.3", "winston": "^3.8.2" } diff --git a/src/client.ts b/src/client.ts index 2690b145..1ae9377a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -39,7 +39,7 @@ import { RouteResponse } from "./responses/route_response" import { StreamStatsResponse } from "./responses/stream_stats_response" import { SubscribeResponse } from "./responses/subscribe_response" import { UnsubscribeResponse } from "./responses/unsubscribe_response" -import { SuperStreamConsumer } from "./super_stream_consumer" +import { SuperStreamConsumer, SuperStreamConsumerFunc } from "./super_stream_consumer" import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher" import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util" import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy" @@ -249,7 +249,7 @@ export class Client { public async declareSuperStreamConsumer( { superStream, offset, consumerRef, creditPolicy }: DeclareSuperStreamConsumerParams, - handle: ConsumerFunc + handle: SuperStreamConsumerFunc ): Promise { const partitions = await this.queryPartitions({ superStream }) return SuperStreamConsumer.create(handle, { diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 6941e115..7b9cb7a0 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -4,7 +4,7 @@ import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_pol import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" -type SuperStreamConsumerFunc = (msg: Message, consumer?: Consumer) => Promise | void +export type SuperStreamConsumerFunc = (msg: Message, consumer?: Consumer) => Promise | void export class SuperStreamConsumer { private consumers: Map = new Map() From 2a2f3958ef993038db34316bec4c4110a1e3a63d Mon Sep 17 00:00:00 2001 From: magne Date: Wed, 5 Nov 2025 16:13:11 +0100 Subject: [PATCH 3/4] fix: super stream consumer handle parameter optionality --- src/super_stream_consumer.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 7b9cb7a0..6521fcfc 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -4,7 +4,7 @@ import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_pol import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" -export type SuperStreamConsumerFunc = (msg: Message, consumer?: Consumer) => Promise | void +export type SuperStreamConsumerFunc = (msg: Message, consumer: Consumer) => Promise | void export class SuperStreamConsumer { private consumers: Map = new Map() @@ -45,7 +45,12 @@ export class SuperStreamConsumer { singleActive: true, creditPolicy: this.creditPolicy, }, - (msg) => this.handle(msg, this.consumers.get(p)), + (msg) => { + const consumer = this.consumers.get(p) + if (consumer) { + return this.handle(msg, consumer) + } + }, this ) this.consumers.set(p, partitionConsumer) From cbfb16c425b236f5c04a2d12b9101a50e65ced7e Mon Sep 17 00:00:00 2001 From: magne Date: Wed, 12 Nov 2025 15:04:17 +0100 Subject: [PATCH 4/4] fix: bug in writing after ending socket --- .tool-versions | 2 +- cluster/docker-compose.yml | 7 +++---- example/docker-compose.yaml | 2 -- example/package.json | 2 +- src/connection.ts | 17 ++++++++++------- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/.tool-versions b/.tool-versions index d478f392..e1de8056 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ nodejs 20.15.0 -python 3.8.12 \ No newline at end of file +python 3.13.5 \ No newline at end of file diff --git a/cluster/docker-compose.yml b/cluster/docker-compose.yml index 9fff48b7..efb2ce4a 100755 --- a/cluster/docker-compose.yml +++ b/cluster/docker-compose.yml @@ -1,4 +1,3 @@ -version: "2" services: rabbit_node0: environment: @@ -16,8 +15,8 @@ services: - "5562:5552" tty: true volumes: - - ./conf/:/etc/rabbitmq/ - - "./tls-gen/basic/result/:/certs" + - ./conf/:/etc/rabbitmq/ + - "./tls-gen/basic/result/:/certs" rabbit_node1: environment: - RABBITMQ_ERLANG_COOKIE='secret_cookie' @@ -65,4 +64,4 @@ services: networks: - back networks: - back: \ No newline at end of file + back: diff --git a/example/docker-compose.yaml b/example/docker-compose.yaml index 9025cb77..314f6044 100644 --- a/example/docker-compose.yaml +++ b/example/docker-compose.yaml @@ -1,5 +1,3 @@ -version: "2" - services: rabbitmq-stream: image: rabbitmq:4.0.5-management diff --git a/example/package.json b/example/package.json index c52c4a22..f1e1b7a9 100644 --- a/example/package.json +++ b/example/package.json @@ -6,7 +6,7 @@ "scripts": { "test": "echo \"Error: no test specified\" && exit 1", "start": "node index.js", - "cluster-example": "node cluster_example.js", + "cluster-example": "node src/cluster_example.js", "rebuild-source": "cd .. && npm run build && cd - && npm install --force" }, "author": "", diff --git a/src/connection.ts b/src/connection.ts index a8d83a36..5f8b3eef 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -431,13 +431,16 @@ export class Connection { body.byteLength }` ) - this.socket.write(body, (err) => { - this.logger.debug(`Write COMPLETED for cmd key: ${cmd.key.toString(16)} - no correlationId - err: ${err}`) - if (err) { - return rej(err) - } - return res() - }) + if (this.socket.readyState === "open") { + /* Check because there is still a delivery after the consumer is closed */ + this.socket.write(body, (err) => { + this.logger.debug(`Write COMPLETED for cmd key: ${cmd.key.toString(16)} - no correlationId - err: ${err}`) + if (err) { + return rej(err) + } + return res() + }) + } }) }