From 25ce9b023ab010e7550cd018e4896270ccf6ac61 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 26 Jun 2025 23:08:25 +0300 Subject: [PATCH] feat(client): add command timeout option Co-authored-by: Florian Schunk <149071178+florian-schunk@users.noreply.github.com> --- docs/command-options.md | 13 ++ packages/client/lib/client/commands-queue.ts | 56 +++++++-- packages/client/lib/client/index.spec.ts | 118 +++++++++++++++---- packages/client/lib/client/index.ts | 12 +- packages/client/lib/cluster/index.ts | 28 +++-- packages/client/lib/sentinel/index.ts | 38 +++--- packages/client/lib/sentinel/utils.ts | 2 +- packages/test-utils/lib/index.ts | 21 ++-- 8 files changed, 212 insertions(+), 76 deletions(-) diff --git a/docs/command-options.md b/docs/command-options.md index b246445ad74..8583eae135b 100644 --- a/docs/command-options.md +++ b/docs/command-options.md @@ -37,6 +37,19 @@ try { } ``` + +## Timeout + +This option is similar to the Abort Signal one, but provides an easier way to set timeout for commands. Again, this applies to commands that haven't been written to the socket yet. + +```javascript +const client = createClient({ + commandOptions: { + timeout: 1000 + } +}) +``` + ## ASAP Commands that are executed in the "asap" mode are added to the beginning of the "to sent" queue. diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 78c0a01b203..52a07a7e3b5 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -3,7 +3,7 @@ import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; -import { AbortError, ErrorReply } from '../errors'; +import { AbortError, ErrorReply, TimeoutError } from '../errors'; import { MonitorCallback } from '.'; export interface CommandOptions { @@ -14,6 +14,10 @@ export interface CommandOptions { * Maps between RESP and JavaScript types */ typeMapping?: T; + /** + * Timeout for the command in milliseconds + */ + timeout?: number; } export interface CommandToWrite extends CommandWaitingForReply { @@ -23,6 +27,10 @@ export interface CommandToWrite extends CommandWaitingForReply { signal: AbortSignal; listener: () => unknown; } | undefined; + timeout: { + signal: AbortSignal; + listener: () => unknown; + } | undefined; } interface CommandWaitingForReply { @@ -80,7 +88,7 @@ export default class RedisCommandsQueue { #onPush(push: Array) { // TODO: type if (this.#pubSub.handleMessageReply(push)) return true; - + const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push); if (isShardedUnsubscribe && !this.#waitingForReply.length) { const channel = push[1].toString(); @@ -153,12 +161,26 @@ export default class RedisCommandsQueue { args, chainId: options?.chainId, abort: undefined, + timeout: undefined, resolve, reject, channelsCounter: undefined, typeMapping: options?.typeMapping }; + const timeout = options?.timeout; + if (timeout) { + const signal = AbortSignal.timeout(timeout); + value.timeout = { + signal, + listener: () => { + this.#toWrite.remove(node); + value.reject(new TimeoutError()); + } + }; + signal.addEventListener('abort', value.timeout.listener, { once: true }); + } + const signal = options?.abortSignal; if (signal) { value.abort = { @@ -181,6 +203,7 @@ export default class RedisCommandsQueue { args: command.args, chainId, abort: undefined, + timeout: undefined, resolve() { command.resolve(); resolve(); @@ -202,7 +225,7 @@ export default class RedisCommandsQueue { this.decoder.onReply = (reply => { if (Array.isArray(reply)) { if (this.#onPush(reply)) return; - + if (PONG.equals(reply[0] as Buffer)) { const { resolve, typeMapping } = this.#waitingForReply.shift()!, buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; @@ -250,7 +273,7 @@ export default class RedisCommandsQueue { if (!this.#pubSub.isActive) { this.#resetDecoderCallbacks(); } - + resolve(); }; } @@ -299,6 +322,7 @@ export default class RedisCommandsQueue { args: ['MONITOR'], chainId: options?.chainId, abort: undefined, + timeout: undefined, // using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply resolve: () => { // after running `MONITOR` only `MONITOR` and `RESET` replies are expected @@ -317,7 +341,7 @@ export default class RedisCommandsQueue { reject, channelsCounter: undefined, typeMapping - }, options?.asap); + }, options?.asap); }); } @@ -340,11 +364,11 @@ export default class RedisCommandsQueue { this.#resetDecoderCallbacks(); this.#resetFallbackOnReply = undefined; this.#pubSub.reset(); - + this.#waitingForReply.shift()!.resolve(reply); return; } - + this.#resetFallbackOnReply!(reply); }) as Decoder['onReply']; @@ -352,6 +376,7 @@ export default class RedisCommandsQueue { args: ['RESET'], chainId, abort: undefined, + timeout: undefined, resolve, reject, channelsCounter: undefined, @@ -376,16 +401,20 @@ export default class RedisCommandsQueue { continue; } - // TODO reuse `toSend` or create new object? + // TODO reuse `toSend` or create new object? (toSend as any).args = undefined; if (toSend.abort) { RedisCommandsQueue.#removeAbortListener(toSend); toSend.abort = undefined; } + if (toSend.timeout) { + RedisCommandsQueue.#removeTimeoutListener(toSend); + toSend.timeout = undefined; + } this.#chainInExecution = toSend.chainId; toSend.chainId = undefined; this.#waitingForReply.push(toSend); - + yield encoded; toSend = this.#toWrite.shift(); } @@ -402,11 +431,18 @@ export default class RedisCommandsQueue { command.abort!.signal.removeEventListener('abort', command.abort!.listener); } + static #removeTimeoutListener(command: CommandToWrite) { + command.timeout!.signal.removeEventListener('abort', command.timeout!.listener); + } + static #flushToWrite(toBeSent: CommandToWrite, err: Error) { if (toBeSent.abort) { RedisCommandsQueue.#removeAbortListener(toBeSent); } - + if (toBeSent.timeout) { + RedisCommandsQueue.#removeTimeoutListener(toBeSent); + } + toBeSent.reject(err); } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 4f752210dbe..f04d6467062 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -1,9 +1,9 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisClient, { RedisClientOptions, RedisClientType } from '.'; -import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors'; +import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, SocketClosedUnexpectedlyError, TimeoutError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; -import { spy } from 'sinon'; +import { spy, stub } from 'sinon'; import { once } from 'node:events'; import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'; import { RESP_TYPES } from '../RESP/decoder'; @@ -239,30 +239,84 @@ describe('Client', () => { assert.equal(await client.sendCommand(['PING']), 'PONG'); }, GLOBAL.SERVERS.OPEN); - describe('AbortController', () => { - before(function () { - if (!global.AbortController) { - this.skip(); - } + testUtils.testWithClient('Unactivated AbortController should not abort', async client => { + await client.sendCommand(['PING'], { + abortSignal: new AbortController().signal }); + }, GLOBAL.SERVERS.OPEN); - testUtils.testWithClient('success', async client => { - await client.sendCommand(['PING'], { - abortSignal: new AbortController().signal - }); - }, GLOBAL.SERVERS.OPEN); + testUtils.testWithClient('AbortError', async client => { + await blockSetImmediate(async () => { + await assert.rejects(client.sendCommand(['PING'], { + abortSignal: AbortSignal.timeout(5) + }), AbortError); + }) + }, GLOBAL.SERVERS.OPEN); - testUtils.testWithClient('AbortError', client => { - const controller = new AbortController(); - controller.abort(); + testUtils.testWithClient('Timeout with custom timeout config', async client => { + await blockSetImmediate(async () => { + await assert.rejects(client.sendCommand(['PING'], { + timeout: 5 + }), TimeoutError); + }) + }, GLOBAL.SERVERS.OPEN); - return assert.rejects( - client.sendCommand(['PING'], { - abortSignal: controller.signal - }), - AbortError - ); - }, GLOBAL.SERVERS.OPEN); + testUtils.testWithCluster('Timeout with custom timeout config (cluster)', async cluster => { + await blockSetImmediate(async () => { + await assert.rejects(cluster.sendCommand(undefined, true, ['PING'], { + timeout: 5 + }), TimeoutError); + }) + }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithClientSentinel('Timeout with custom timeout config (sentinel)', async sentinel => { + await blockSetImmediate(async () => { + await assert.rejects(sentinel.sendCommand(true, ['PING'], { + timeout: 5 + }), TimeoutError); + }) + }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithClient('Timeout with global timeout config', async client => { + await blockSetImmediate(async () => { + await assert.rejects(client.ping(), TimeoutError); + await assert.rejects(client.sendCommand(['PING']), TimeoutError); + }); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + commandOptions: { + timeout: 5 + } + } + }); + + testUtils.testWithCluster('Timeout with global timeout config (cluster)', async cluster => { + await blockSetImmediate(async () => { + await assert.rejects(cluster.HSET('key', 'foo', 'value'), TimeoutError); + await assert.rejects(cluster.sendCommand(undefined, true, ['PING']), TimeoutError); + }); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + commandOptions: { + timeout: 5 + } + } + }); + + testUtils.testWithClientSentinel('Timeout with global timeout config (sentinel)', async sentinel => { + await blockSetImmediate(async () => { + await assert.rejects(sentinel.HSET('key', 'foo', 'value'), TimeoutError); + await assert.rejects(sentinel.sendCommand(true, ['PING']), TimeoutError); + }); + }, { + ...GLOBAL.SENTINEL.OPEN, + clientOptions: { + commandOptions: { + timeout: 5 + } + } }); testUtils.testWithClient('undefined and null should not break the client', async client => { @@ -900,3 +954,23 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); }); }); + +/** + * Executes the provided function in a context where setImmediate is stubbed to not do anything. + * This blocks setImmediate callbacks from executing + */ +async function blockSetImmediate(fn: () => Promise) { + let setImmediateStub: any; + + try { + setImmediateStub = stub(global, 'setImmediate'); + setImmediateStub.callsFake(() => { + //Dont call the callback, effectively blocking execution + }); + await fn(); + } finally { + if (setImmediateStub) { + setImmediateStub.restore(); + } + } +} diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index a446ad8e755..128dc599677 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -526,7 +526,7 @@ export default class RedisClient< async #handshake(chainId: symbol, asap: boolean) { const promises = []; const commandsWithErrorHandlers = await this.#getHandshakeCommands(); - + if (asap) commandsWithErrorHandlers.reverse() for (const { cmd, errorHandler } of commandsWithErrorHandlers) { @@ -632,7 +632,7 @@ export default class RedisClient< // since they could be connected to an older version that doesn't support them. } }); - + commands.push({ cmd: [ 'CLIENT', @@ -889,7 +889,13 @@ export default class RedisClient< return Promise.reject(new ClientOfflineError()); } - const promise = this._self.#queue.addCommand(args, options); + // Merge global options with provided options + const opts = { + ...this._self._commandOptions, + ...options + } + + const promise = this._self.#queue.addCommand(args, opts); this._self.#scheduleWrite(); return promise; } diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index c2c251810e3..6d26ac98c9a 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -38,12 +38,12 @@ export interface RedisClusterOptions< // POLICIES extends CommandPolicies = CommandPolicies > extends ClusterCommander { /** - * Should contain details for some of the cluster nodes that the client will use to discover + * Should contain details for some of the cluster nodes that the client will use to discover * the "cluster topology". We recommend including details for at least 3 nodes here. */ rootNodes: Array; /** - * Default values used for every client in the cluster. Use this to specify global values, + * Default values used for every client in the cluster. Use this to specify global values, * for example: ACL credentials, timeouts, TLS configuration etc. */ defaults?: Partial; @@ -68,13 +68,13 @@ export interface RedisClusterOptions< nodeAddressMap?: NodeAddressMap; /** * Client Side Caching configuration for the pool. - * - * Enables Redis Servers and Clients to work together to cache results from commands + * + * Enables Redis Servers and Clients to work together to cache results from commands * sent to a server. The server will notify the client when cached results are no longer valid. * In pooled mode, the cache is shared across all clients in the pool. - * + * * Note: Client Side Caching is only supported with RESP3. - * + * * @example Anonymous cache configuration * ``` * const client = createCluster({ @@ -86,7 +86,7 @@ export interface RedisClusterOptions< * minimum: 5 * }); * ``` - * + * * @example Using a controllable cache * ``` * const cache = new BasicPooledClientSideCache({ @@ -406,7 +406,7 @@ export default class RedisCluster< proxy._commandOptions[key] = value; return proxy as RedisClusterType< M, - F, + F, S, RESP, K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING @@ -489,7 +489,7 @@ export default class RedisCluster< myFn = this._handleAsk(fn); continue; } - + if (err.message.startsWith('MOVED')) { await this._slots.rediscover(client); client = await this._slots.getClient(firstKey, isReadonly); @@ -497,7 +497,7 @@ export default class RedisCluster< } throw err; - } + } } } @@ -508,10 +508,16 @@ export default class RedisCluster< options?: ClusterCommandOptions, // defaultPolicies?: CommandPolicies ): Promise { + + // Merge global options with local options + const opts = { + ...this._self._commandOptions, + ...options + } return this._self._execute( firstKey, isReadonly, - options, + opts, (client, opts) => client.sendCommand(args, opts) ); } diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index b4a794b871a..b3f3bbf0b8d 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -35,7 +35,7 @@ export class RedisSentinelClient< /** * Indicates if the client connection is open - * + * * @returns `true` if the client connection is open, `false` otherwise */ @@ -45,7 +45,7 @@ export class RedisSentinelClient< /** * Indicates if the client connection is ready to accept commands - * + * * @returns `true` if the client connection is ready, `false` otherwise */ get isReady() { @@ -54,7 +54,7 @@ export class RedisSentinelClient< /** * Gets the command options configured for this client - * + * * @returns The command options for this client or `undefined` if none were set */ get commandOptions() { @@ -241,10 +241,10 @@ export class RedisSentinelClient< /** * Releases the client lease back to the pool - * + * * After calling this method, the client instance should no longer be used as it * will be returned to the client pool and may be given to other operations. - * + * * @returns A promise that resolves when the client is ready to be reused, or undefined * if the client was immediately ready * @throws Error if the lease has already been released @@ -274,7 +274,7 @@ export default class RedisSentinel< /** * Indicates if the sentinel connection is open - * + * * @returns `true` if the sentinel connection is open, `false` otherwise */ get isOpen() { @@ -283,7 +283,7 @@ export default class RedisSentinel< /** * Indicates if the sentinel connection is ready to accept commands - * + * * @returns `true` if the sentinel connection is ready, `false` otherwise */ get isReady() { @@ -554,15 +554,15 @@ export default class RedisSentinel< /** * Acquires a master client lease for exclusive operations - * + * * Used when multiple commands need to run on an exclusive client (for example, using `WATCH/MULTI/EXEC`). * The returned client must be released after use with the `release()` method. - * + * * @returns A promise that resolves to a Redis client connected to the master node * @example * ```javascript * const clientLease = await sentinel.acquire(); - * + * * try { * await clientLease.watch('key'); * const resp = await clientLease.multi() @@ -671,7 +671,7 @@ class RedisSentinelInternal< super(); this.#validateOptions(options); - + this.#name = options.name; this.#RESP = options.RESP; @@ -733,7 +733,7 @@ class RedisSentinelInternal< /** * Gets a client lease from the master client pool - * + * * @returns A client info object or a promise that resolves to a client info object * when a client becomes available */ @@ -748,10 +748,10 @@ class RedisSentinelInternal< /** * Releases a client lease back to the pool - * + * * If the client was used for a transaction that might have left it in a dirty state, * it will be reset before being returned to the pool. - * + * * @param clientInfo The client info object representing the client to release * @returns A promise that resolves when the client is ready to be reused, or undefined * if the client was immediately ready or no longer exists @@ -791,10 +791,10 @@ class RedisSentinelInternal< async #connect() { let count = 0; - while (true) { + while (true) { this.#trace("starting connect loop"); - count+=1; + count+=1; if (this.#destroy) { this.#trace("in #connect and want to destroy") return; @@ -847,7 +847,7 @@ class RedisSentinelInternal< try { /* - // force testing of READONLY errors + // force testing of READONLY errors if (clientInfo !== undefined) { if (Math.floor(Math.random() * 10) < 1) { console.log("throwing READONLY error"); @@ -861,7 +861,7 @@ class RedisSentinelInternal< throw err; } - /* + /* rediscover and retry if doing a command against a "master" a) READONLY error (topology has changed) but we haven't been notified yet via pubsub b) client is "not ready" (disconnected), which means topology might have changed, but sentinel might not see it yet @@ -1574,4 +1574,4 @@ export class RedisSentinelFactory extends EventEmitter { } }); } -} \ No newline at end of file +} diff --git a/packages/client/lib/sentinel/utils.ts b/packages/client/lib/sentinel/utils.ts index 7e2404c2f7a..c124981e257 100644 --- a/packages/client/lib/sentinel/utils.ts +++ b/packages/client/lib/sentinel/utils.ts @@ -6,7 +6,7 @@ import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, Pr /* TODO: should use map interface, would need a transform reply probably? as resp2 is list form, which this depends on */ export function parseNode(node: Record): RedisNode | undefined{ - + if (node.flags.includes("s_down") || node.flags.includes("disconnected") || node.flags.includes("failover_in_progress")) { return undefined; } diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 43dd4debfdf..aab1c700f5e 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -179,7 +179,7 @@ export default class TestUtils { this.#VERSION_NUMBERS = numbers; this.#DOCKER_IMAGE = { image: dockerImageName, - version: string, + version: string, mode: "server" }; } @@ -315,7 +315,7 @@ export default class TestUtils { if (passIndex != 0) { password = options.serverArguments[passIndex]; } - + if (this.isVersionGreaterThan(options.minimumDockerVersion)) { const dockerImage = this.#DOCKER_IMAGE; before(function () { @@ -333,18 +333,19 @@ export default class TestUtils { const promises = await dockerPromises; const rootNodes: Array = promises.map(promise => ({ - host: "127.0.0.1", + host: "127.0.0.1", port: promise.port })); const sentinel = createSentinel({ - name: 'mymaster', - sentinelRootNodes: rootNodes, - nodeClientOptions: { + name: 'mymaster', + sentinelRootNodes: rootNodes, + nodeClientOptions: { + commandOptions: options.clientOptions?.commandOptions, password: password || undefined, }, - sentinelClientOptions: { + sentinelClientOptions: { password: password || undefined, }, replicaPoolSize: options?.replicaPoolSize || 0, @@ -507,7 +508,7 @@ export default class TestUtils { it(title, async function () { if (!dockersPromise) return this.skip(); - + const dockers = await dockersPromise, cluster = createCluster({ rootNodes: dockers.map(({ port }) => ({ @@ -580,12 +581,12 @@ export default class TestUtils { const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), appPrefix)); sentinels.push(await spawnSentinelNode(this.#DOCKER_IMAGE, options.serverArguments, masterPort, sentinelName, tmpDir)) - + if (tmpDir) { fs.rmSync(tmpDir, { recursive: true }); } } - + return sentinels } }