Skip to content

Implement adapter.close function #485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 75 additions & 11 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ export class RedisAdapter extends Adapter {
private readonly channel: string;
private readonly requestChannel: string;
private readonly responseChannel: string;
private readonly specificResponseChannel: string;
private requests: Map<string, Request> = new Map();
private ackRequests: Map<string, AckRequest> = new Map();
private redisListeners: Map<string, Function> = new Map();

/**
* Adapter constructor.
Expand Down Expand Up @@ -133,34 +135,51 @@ export class RedisAdapter extends Adapter {
this.channel = prefix + "#" + nsp.name + "#";
this.requestChannel = prefix + "-request#" + this.nsp.name + "#";
this.responseChannel = prefix + "-response#" + this.nsp.name + "#";
const specificResponseChannel = this.responseChannel + this.uid + "#";
this.specificResponseChannel = this.responseChannel + this.uid + "#";

const isRedisV4 = typeof this.pubClient.pSubscribe === "function";
if (isRedisV4) {
this.redisListeners.set("psub", (msg, channel) => {
this.onmessage(null, channel, msg);
});

this.redisListeners.set("sub", (msg, channel) => {
this.onrequest(channel, msg);
});

this.subClient.pSubscribe(
this.channel + "*",
(msg, channel) => {
this.onmessage(null, channel, msg);
},
this.redisListeners.get("psub"),
true
);
this.subClient.subscribe(
[this.requestChannel, this.responseChannel, specificResponseChannel],
(msg, channel) => {
this.onrequest(channel, msg);
},
[
this.requestChannel,
this.responseChannel,
this.specificResponseChannel,
],
this.redisListeners.get("sub"),
true
);
} else {
this.redisListeners.set("pmessageBuffer", this.onmessage.bind(this));
this.redisListeners.set("messageBuffer", this.onrequest.bind(this));

this.subClient.psubscribe(this.channel + "*");
this.subClient.on("pmessageBuffer", this.onmessage.bind(this));
this.subClient.on(
"pmessageBuffer",
this.redisListeners.get("pmessageBuffer")
);

this.subClient.subscribe([
this.requestChannel,
this.responseChannel,
specificResponseChannel,
this.specificResponseChannel,
]);
this.subClient.on("messageBuffer", this.onrequest.bind(this));
this.subClient.on(
"messageBuffer",
this.redisListeners.get("messageBuffer")
);
}

const registerFriendlyErrorHandler = (redisClient) => {
Expand Down Expand Up @@ -917,4 +936,49 @@ export class RedisAdapter extends Adapter {
serverCount(): Promise<number> {
return this.getNumSub();
}

close(): Promise<void> | void {
const isRedisV4 = typeof this.pubClient.pSubscribe === "function";
if (isRedisV4) {
this.subClient.pUnsubscribe(
this.channel + "*",
this.redisListeners.get("psub"),
true
);

// There is a bug in redis v4 when unsubscribing multiple channels at once, so we'll unsub one at a time.
// See https://github.com/redis/node-redis/issues/2052
this.subClient.unsubscribe(
this.requestChannel,
this.redisListeners.get("sub"),
true
);
this.subClient.unsubscribe(
this.responseChannel,
this.redisListeners.get("sub"),
true
);
this.subClient.unsubscribe(
this.specificResponseChannel,
this.redisListeners.get("sub"),
true
);
} else {
this.subClient.punsubscribe(this.channel + "*");
this.subClient.off(
"pmessageBuffer",
this.redisListeners.get("pmessageBuffer")
);

this.subClient.unsubscribe([
this.requestChannel,
this.responseChannel,
this.specificResponseChannel,
]);
this.subClient.off(
"messageBuffer",
this.redisListeners.get("messageBuffer")
);
}
}
}
71 changes: 71 additions & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,77 @@ describe(`socket.io-redis with ${
});
});

it("unsubscribes when close is called", async () => {
const parseInfo = (rawInfo: string) => {
const info = {};

rawInfo.split("\r\n").forEach((line) => {
if (line.length > 0 && !line.startsWith("#")) {
const fieldVal = line.split(":");
info[fieldVal[0]] = fieldVal[1];
}
});

return info;
};

const getInfo = async (): Promise<any> => {
if (process.env.REDIS_CLIENT === undefined) {
return parseInfo(
await namespace3.adapter.pubClient.sendCommand(["info"])
);
} else if (process.env.REDIS_CLIENT === "ioredis") {
return parseInfo(await namespace3.adapter.pubClient.call("info"));
} else {
return await new Promise((resolve, reject) => {
namespace3.adapter.pubClient.sendCommand(
"info",
[],
(err, result) => {
if (err) {
reject(err);
}
resolve(parseInfo(result));
}
);
});
}
};

return new Promise(async (resolve, reject) => {
// Give it a moment to subscribe to all the channels
setTimeout(async () => {
try {
const info = await getInfo();

// Depending on the version of redis this may be 3 (redis < v5) or 1 (redis > v4)
// Older versions subscribed multiple times on the same pattern. Newer versions only sub once.
expect(info.pubsub_patterns).to.be.greaterThan(0);
expect(info.pubsub_channels).to.eql(5); // 2 shared (request/response) + 3 unique for each namespace

namespace1.adapter.close();
namespace2.adapter.close();
namespace3.adapter.close();

// Give it a moment to unsubscribe
setTimeout(async () => {
try {
const info = await getInfo();

expect(info.pubsub_patterns).to.eql(0); // All patterns subscriptions should be unsubscribed
expect(info.pubsub_channels).to.eql(0); // All subscriptions should be unsubscribed
resolve();
} catch (error) {
reject(error);
}
}, 100);
} catch (error) {
reject(error);
}
}, 100);
});
});

if (process.env.REDIS_CLIENT === undefined) {
// redis@4
it("ignores messages from unknown channels", (done) => {
Expand Down