Skip to content

Commit f652d1e

Browse files
committed
feat(client): Add socket helpers and pause mechanism
- Introduced `#paused` flag with corresponding `_pause` and `_unpause` methods to temporarily halt writing commands to the socket during maintenance windows. - Updated `#write` method to respect the `#paused` flag, preventing new commands from being written during maintenance. - Added `_ejectSocket` method to safely detach from and return the current socket - Added `_insertSocket` method to receive and start using a new socket
1 parent 8cf7c93 commit f652d1e

File tree

1 file changed

+46
-2
lines changed

1 file changed

+46
-2
lines changed

packages/client/lib/client/index.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ export default class RedisClient<
429429
}
430430

431431
readonly #options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
432-
readonly #socket: RedisSocket;
432+
#socket: RedisSocket;
433433
readonly #queue: RedisCommandsQueue;
434434
#selectedDB = 0;
435435
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
@@ -442,11 +442,16 @@ export default class RedisClient<
442442
#watchEpoch?: number;
443443
#clientSideCache?: ClientSideCacheProvider;
444444
#credentialsSubscription: Disposable | null = null;
445+
// Flag used to pause writing to the socket during maintenance windows.
446+
// When true, prevents new commands from being written while waiting for:
447+
// 1. New socket to be ready after maintenance redirect
448+
// 2. In-flight commands on the old socket to complete
449+
#paused = false;
450+
445451
get clientSideCache() {
446452
return this._self.#clientSideCache;
447453
}
448454

449-
450455
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
451456
return this._self.#options;
452457
}
@@ -912,6 +917,42 @@ export default class RedisClient<
912917
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
913918
}
914919

920+
/**
921+
* @internal
922+
*/
923+
_ejectSocket(): RedisSocket {
924+
const socket = this._self.#socket;
925+
// @ts-ignore
926+
this.#socket = null;
927+
socket.removeAllListeners();
928+
return socket;
929+
}
930+
931+
/**
932+
* @intenal
933+
*/
934+
_insertSocket(socket: RedisSocket) {
935+
if(this._self.#socket) {
936+
this._self._ejectSocket().destroy();
937+
}
938+
this._self.#socket = socket;
939+
this._self.#attachListeners(this._self.#socket);
940+
}
941+
942+
/**
943+
* @internal
944+
*/
945+
_pause() {
946+
this._self.#paused = true;
947+
}
948+
949+
/**
950+
* @internal
951+
*/
952+
_unpause() {
953+
this._self.#paused = false;
954+
}
955+
915956
/**
916957
* @internal
917958
*/
@@ -1141,6 +1182,9 @@ export default class RedisClient<
11411182
}
11421183

11431184
#write() {
1185+
if(this.#paused) {
1186+
return
1187+
}
11441188
this.#socket.write(this.#queue.commandsToWrite());
11451189
}
11461190

0 commit comments

Comments
 (0)