Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,11 @@ export function createAdapter(
export class RedisAdapter extends Adapter {
public readonly uid;
public readonly requestsTimeout: number;
public readonly publishOnSpecificResponseChannel: boolean;
public readonly parser: Parser;

private readonly channel: string;
private readonly requestChannel: string;
private readonly responseChannel: string;
private readonly specificResponseChannel: string;
private requests: Map<string, Request> = new Map();
private ackRequests: Map<string, AckRequest> = new Map();
private redisListeners: Map<string, Function> = new Map();
Expand All @@ -129,16 +127,16 @@ export class RedisAdapter extends Adapter {

this.uid = uid2(6);
this.requestsTimeout = opts.requestsTimeout || 5000;
this.publishOnSpecificResponseChannel =
!!opts.publishOnSpecificResponseChannel;
this.parser = opts.parser || msgpack;

const prefix = opts.key || "socket.io";

this.channel = prefix + "#" + nsp.name + "#";
this.requestChannel = prefix + "-request#" + this.nsp.name + "#";
this.responseChannel = prefix + "-response#" + this.nsp.name + "#";
this.specificResponseChannel = this.responseChannel + this.uid + "#";
if (opts.publishOnSpecificResponseChannel) {
this.responseChannel = this.responseChannel + this.uid + "#";
}

const isRedisV4 = typeof this.pubClient.pSubscribe === "function";
if (isRedisV4) {
Expand All @@ -159,7 +157,6 @@ export class RedisAdapter extends Adapter {
[
this.requestChannel,
this.responseChannel,
this.specificResponseChannel,
],
this.redisListeners.get("sub"),
true
Expand All @@ -177,7 +174,6 @@ export class RedisAdapter extends Adapter {
this.subClient.subscribe([
this.requestChannel,
this.responseChannel,
this.specificResponseChannel,
]);
this.subClient.on(
"messageBuffer",
Expand Down Expand Up @@ -478,11 +474,8 @@ export class RedisAdapter extends Adapter {
* @private
*/
private publishResponse(request, response) {
const responseChannel = this.publishOnSpecificResponseChannel
? `${this.responseChannel}${request.uid}#`
: this.responseChannel;
debug("publishing response to channel %s", responseChannel);
this.pubClient.publish(responseChannel, response);
debug("publishing response to channel %s", this.responseChannel);
this.pubClient.publish(this.responseChannel, response);
}

/**
Expand Down Expand Up @@ -915,23 +908,13 @@ export class RedisAdapter extends Adapter {
this.redisListeners.get("sub"),
true
);
this.subClient.unsubscribe(
this.specificResponseChannel,
this.redisListeners.get("sub"),
true
);
} else {
this.subClient.punsubscribe(this.channel + "*");
this.subClient.off(
"pmessageBuffer",
this.redisListeners.get("pmessageBuffer")
);

this.subClient.unsubscribe([
this.requestChannel,
this.responseChannel,
this.specificResponseChannel,
]);
this.subClient.off(
"messageBuffer",
this.redisListeners.get("messageBuffer")
Expand Down