diff --git a/lib/index.ts b/lib/index.ts index ee7cc7c..8cca9c8 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -102,8 +102,10 @@ export class RedisAdapter extends Adapter { private readonly channel: string; private readonly requestChannel: string; private readonly responseChannel: string; + private readonly specificResponseChannel: string; private requests: Map = new Map(); private ackRequests: Map = new Map(); + private redisListeners: Map = new Map(); /** * Adapter constructor. @@ -133,34 +135,51 @@ export class RedisAdapter extends Adapter { this.channel = prefix + "#" + nsp.name + "#"; this.requestChannel = prefix + "-request#" + this.nsp.name + "#"; this.responseChannel = prefix + "-response#" + this.nsp.name + "#"; - const specificResponseChannel = this.responseChannel + this.uid + "#"; + this.specificResponseChannel = this.responseChannel + this.uid + "#"; const isRedisV4 = typeof this.pubClient.pSubscribe === "function"; if (isRedisV4) { + this.redisListeners.set("psub", (msg, channel) => { + this.onmessage(null, channel, msg); + }); + + this.redisListeners.set("sub", (msg, channel) => { + this.onrequest(channel, msg); + }); + this.subClient.pSubscribe( this.channel + "*", - (msg, channel) => { - this.onmessage(null, channel, msg); - }, + this.redisListeners.get("psub"), true ); this.subClient.subscribe( - [this.requestChannel, this.responseChannel, specificResponseChannel], - (msg, channel) => { - this.onrequest(channel, msg); - }, + [ + this.requestChannel, + this.responseChannel, + this.specificResponseChannel, + ], + this.redisListeners.get("sub"), true ); } else { + this.redisListeners.set("pmessageBuffer", this.onmessage.bind(this)); + this.redisListeners.set("messageBuffer", this.onrequest.bind(this)); + this.subClient.psubscribe(this.channel + "*"); - this.subClient.on("pmessageBuffer", this.onmessage.bind(this)); + this.subClient.on( + "pmessageBuffer", + this.redisListeners.get("pmessageBuffer") + ); this.subClient.subscribe([ this.requestChannel, this.responseChannel, - specificResponseChannel, + this.specificResponseChannel, ]); - this.subClient.on("messageBuffer", this.onrequest.bind(this)); + this.subClient.on( + "messageBuffer", + this.redisListeners.get("messageBuffer") + ); } const registerFriendlyErrorHandler = (redisClient) => { @@ -917,4 +936,49 @@ export class RedisAdapter extends Adapter { serverCount(): Promise { return this.getNumSub(); } + + close(): Promise | void { + const isRedisV4 = typeof this.pubClient.pSubscribe === "function"; + if (isRedisV4) { + this.subClient.pUnsubscribe( + this.channel + "*", + this.redisListeners.get("psub"), + true + ); + + // There is a bug in redis v4 when unsubscribing multiple channels at once, so we'll unsub one at a time. + // See https://github.com/redis/node-redis/issues/2052 + this.subClient.unsubscribe( + this.requestChannel, + this.redisListeners.get("sub"), + true + ); + this.subClient.unsubscribe( + this.responseChannel, + this.redisListeners.get("sub"), + true + ); + this.subClient.unsubscribe( + this.specificResponseChannel, + this.redisListeners.get("sub"), + true + ); + } else { + this.subClient.punsubscribe(this.channel + "*"); + this.subClient.off( + "pmessageBuffer", + this.redisListeners.get("pmessageBuffer") + ); + + this.subClient.unsubscribe([ + this.requestChannel, + this.responseChannel, + this.specificResponseChannel, + ]); + this.subClient.off( + "messageBuffer", + this.redisListeners.get("messageBuffer") + ); + } + } } diff --git a/test/index.ts b/test/index.ts index a5df888..9784dc8 100644 --- a/test/index.ts +++ b/test/index.ts @@ -192,6 +192,77 @@ describe(`socket.io-redis with ${ }); }); + it("unsubscribes when close is called", async () => { + const parseInfo = (rawInfo: string) => { + const info = {}; + + rawInfo.split("\r\n").forEach((line) => { + if (line.length > 0 && !line.startsWith("#")) { + const fieldVal = line.split(":"); + info[fieldVal[0]] = fieldVal[1]; + } + }); + + return info; + }; + + const getInfo = async (): Promise => { + if (process.env.REDIS_CLIENT === undefined) { + return parseInfo( + await namespace3.adapter.pubClient.sendCommand(["info"]) + ); + } else if (process.env.REDIS_CLIENT === "ioredis") { + return parseInfo(await namespace3.adapter.pubClient.call("info")); + } else { + return await new Promise((resolve, reject) => { + namespace3.adapter.pubClient.sendCommand( + "info", + [], + (err, result) => { + if (err) { + reject(err); + } + resolve(parseInfo(result)); + } + ); + }); + } + }; + + return new Promise(async (resolve, reject) => { + // Give it a moment to subscribe to all the channels + setTimeout(async () => { + try { + const info = await getInfo(); + + // Depending on the version of redis this may be 3 (redis < v5) or 1 (redis > v4) + // Older versions subscribed multiple times on the same pattern. Newer versions only sub once. + expect(info.pubsub_patterns).to.be.greaterThan(0); + expect(info.pubsub_channels).to.eql(5); // 2 shared (request/response) + 3 unique for each namespace + + namespace1.adapter.close(); + namespace2.adapter.close(); + namespace3.adapter.close(); + + // Give it a moment to unsubscribe + setTimeout(async () => { + try { + const info = await getInfo(); + + expect(info.pubsub_patterns).to.eql(0); // All patterns subscriptions should be unsubscribed + expect(info.pubsub_channels).to.eql(0); // All subscriptions should be unsubscribed + resolve(); + } catch (error) { + reject(error); + } + }, 100); + } catch (error) { + reject(error); + } + }, 100); + }); + }); + if (process.env.REDIS_CLIENT === undefined) { // redis@4 it("ignores messages from unknown channels", (done) => {