diff --git a/.tool-versions b/.tool-versions index d478f392..e1de8056 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ nodejs 20.15.0 -python 3.8.12 \ No newline at end of file +python 3.13.5 \ No newline at end of file diff --git a/cluster/docker-compose.yml b/cluster/docker-compose.yml index 9fff48b7..efb2ce4a 100755 --- a/cluster/docker-compose.yml +++ b/cluster/docker-compose.yml @@ -1,4 +1,3 @@ -version: "2" services: rabbit_node0: environment: @@ -16,8 +15,8 @@ services: - "5562:5552" tty: true volumes: - - ./conf/:/etc/rabbitmq/ - - "./tls-gen/basic/result/:/certs" + - ./conf/:/etc/rabbitmq/ + - "./tls-gen/basic/result/:/certs" rabbit_node1: environment: - RABBITMQ_ERLANG_COOKIE='secret_cookie' @@ -65,4 +64,4 @@ services: networks: - back networks: - back: \ No newline at end of file + back: diff --git a/example/docker-compose.yaml b/example/docker-compose.yaml index 9025cb77..314f6044 100644 --- a/example/docker-compose.yaml +++ b/example/docker-compose.yaml @@ -1,5 +1,3 @@ -version: "2" - services: rabbitmq-stream: image: rabbitmq:4.0.5-management diff --git a/example/package-lock.json b/example/package-lock.json index b7777d66..9162920b 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.6.1", + "version": "0.6.2", "license": "ISC", "dependencies": { "semver": "^7.5.4" @@ -51,7 +51,7 @@ "eslint-plugin-prettier": "^5.1.3", "got": "^11.8.5", "mocha": "^10.2.0", - "ts-node": "^10.9.1", + "tsx": "^4.20.3", "typescript": "^5.3.3", "winston": "^3.8.2" } @@ -268,7 +268,7 @@ "got": "^11.8.5", "mocha": "^10.2.0", "semver": "^7.5.4", - "ts-node": "^10.9.1", + "tsx": "^4.20.3", "typescript": "^5.3.3", "winston": "^3.8.2" } diff --git a/example/package.json b/example/package.json index c52c4a22..f1e1b7a9 100644 --- a/example/package.json +++ b/example/package.json @@ -6,7 +6,7 @@ "scripts": { "test": "echo \"Error: no test specified\" && exit 1", "start": "node index.js", - "cluster-example": "node cluster_example.js", + "cluster-example": "node src/cluster_example.js", "rebuild-source": "cd .. && npm run build && cd - && npm install --force" }, "author": "", diff --git a/src/client.ts b/src/client.ts index 2690b145..1ae9377a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -39,7 +39,7 @@ import { RouteResponse } from "./responses/route_response" import { StreamStatsResponse } from "./responses/stream_stats_response" import { SubscribeResponse } from "./responses/subscribe_response" import { UnsubscribeResponse } from "./responses/unsubscribe_response" -import { SuperStreamConsumer } from "./super_stream_consumer" +import { SuperStreamConsumer, SuperStreamConsumerFunc } from "./super_stream_consumer" import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher" import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util" import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy" @@ -249,7 +249,7 @@ export class Client { public async declareSuperStreamConsumer( { superStream, offset, consumerRef, creditPolicy }: DeclareSuperStreamConsumerParams, - handle: ConsumerFunc + handle: SuperStreamConsumerFunc ): Promise { const partitions = await this.queryPartitions({ superStream }) return SuperStreamConsumer.create(handle, { diff --git a/src/connection.ts b/src/connection.ts index a8d83a36..5f8b3eef 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -431,13 +431,16 @@ export class Connection { body.byteLength }` ) - this.socket.write(body, (err) => { - this.logger.debug(`Write COMPLETED for cmd key: ${cmd.key.toString(16)} - no correlationId - err: ${err}`) - if (err) { - return rej(err) - } - return res() - }) + if (this.socket.readyState === "open") { + /* Check because there is still a delivery after the consumer is closed */ + this.socket.write(body, (err) => { + this.logger.debug(`Write COMPLETED for cmd key: ${cmd.key.toString(16)} - no correlationId - err: ${err}`) + if (err) { + return rej(err) + } + return res() + }) + } }) } diff --git a/src/super_stream_consumer.ts b/src/super_stream_consumer.ts index 87fac430..6521fcfc 100644 --- a/src/super_stream_consumer.ts +++ b/src/super_stream_consumer.ts @@ -1,8 +1,11 @@ import { Client } from "./client" -import { Consumer, ConsumerFunc } from "./consumer" +import { Consumer } from "./consumer" import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy" +import { Message } from "./publisher" import { Offset } from "./requests/subscribe_request" +export type SuperStreamConsumerFunc = (msg: Message, consumer: Consumer) => Promise | void + export class SuperStreamConsumer { private consumers: Map = new Map() public consumerRef: string @@ -13,7 +16,7 @@ export class SuperStreamConsumer { private creditPolicy: ConsumerCreditPolicy private constructor( - readonly handle: ConsumerFunc, + readonly handle: SuperStreamConsumerFunc, params: { superStream: string locator: Client @@ -42,7 +45,12 @@ export class SuperStreamConsumer { singleActive: true, creditPolicy: this.creditPolicy, }, - this.handle, + (msg) => { + const consumer = this.consumers.get(p) + if (consumer) { + return this.handle(msg, consumer) + } + }, this ) this.consumers.set(p, partitionConsumer) @@ -52,7 +60,7 @@ export class SuperStreamConsumer { } static async create( - handle: ConsumerFunc, + handle: SuperStreamConsumerFunc, params: { superStream: string locator: Client