diff --git a/lazer/sdk/js/README.md b/lazer/sdk/js/README.md index 65e587c6c9..356cc8a8c3 100644 --- a/lazer/sdk/js/README.md +++ b/lazer/sdk/js/README.md @@ -3,3 +3,60 @@ ## Contributing & Development See [contributing.md](docs/contributing/contributing.md) for information on how to develop or contribute to this project! + +## How to use + +```javascript +import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk"; + +const c = await PythLazerClient.create({ + token: "YOUR-AUTH-TOKEN-HERE", + logger: console, // Optionally log operations (to the console in this case.) + webSocketPoolConfig: { + numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4. + onError: (error) => { + console.error("⛔️ WebSocket error:", error.message); + }, + // Optional configuration for resilient WebSocket connections + rwsConfig: { + heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds + maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds + logAfterRetryCount: 10, // Optional log after how many retries + }, + }, +}); + +c.addMessageListener((message) => { + console.info("received the following from the Lazer stream:", message); +}); + +// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) +// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). +c.addAllConnectionsDownListener(() => { + console.error("All connections are down!"); +}); + +// Create and remove one or more subscriptions on the fly +c.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + formats: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", +}); +c.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price", "exponent", "publisherCount", "confidence"], + formats: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", +}); +``` diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 809427f2d0..bc18b8d0c5 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -1,7 +1,10 @@ { "name": "@pythnetwork/pyth-lazer-sdk", - "version": "4.0.0", + "version": "5.0.0", "description": "Pyth Lazer SDK", + "engines": { + "node": ">=22" + }, "publishConfig": { "access": "public" }, @@ -61,7 +64,7 @@ "license": "Apache-2.0", "dependencies": { "@isaacs/ttlcache": "^1.4.1", - "cross-fetch": "^4.0.0", + "buffer": "^6.0.3", "isomorphic-ws": "^5.0.0", "ts-log": "^2.2.7", "ws": "^8.18.0" diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 56c220955a..ec87689efb 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -1,4 +1,3 @@ -import fetch from "cross-fetch"; import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; @@ -20,6 +19,7 @@ import type { import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js"; import type { WebSocketPoolConfig } from "./socket/websocket-pool.js"; import { WebSocketPool } from "./socket/websocket-pool.js"; +import { bufferFromWebsocketData } from "./util/buffer-util.js"; export type BinaryResponse = { subscriptionId: number; @@ -113,53 +113,56 @@ export class PythLazerClient { */ addMessageListener(handler: (event: JsonOrBinaryResponse) => void) { const wsp = this.getWebSocketPool(); - wsp.addMessageListener((data: WebSocket.Data) => { + wsp.addMessageListener(async (data: WebSocket.Data) => { if (typeof data == "string") { handler({ type: "json", value: JSON.parse(data) as Response, }); - } else if (Buffer.isBuffer(data)) { - let pos = 0; - const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE(); - pos += UINT32_NUM_BYTES; - if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) { - throw new Error("binary update format magic mismatch"); - } - // TODO: some uint64 values may not be representable as Number. - const subscriptionId = Number( - data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(), - ); - pos += UINT64_NUM_BYTES; + return; + } + const buffData = await bufferFromWebsocketData(data); + let pos = 0; + const magic = buffData + .subarray(pos, pos + UINT32_NUM_BYTES) + .readUint32LE(); + pos += UINT32_NUM_BYTES; + if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) { + throw new Error("binary update format magic mismatch"); + } + // TODO: some uint64 values may not be representable as Number. + const subscriptionId = Number( + buffData.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(), + ); + pos += UINT64_NUM_BYTES; - const value: BinaryResponse = { subscriptionId }; - while (pos < data.length) { - const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE(); - pos += UINT16_NUM_BYTES; - const magic = data - .subarray(pos, pos + UINT32_NUM_BYTES) - .readUint32LE(); - if (magic == FORMAT_MAGICS_LE.EVM) { - value.evm = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.SOLANA) { - value.solana = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) { - value.leEcdsa = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) { - value.leUnsigned = data.subarray(pos, pos + len); - } else if (magic == FORMAT_MAGICS_LE.JSON) { - value.parsed = JSON.parse( - data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(), - ) as ParsedPayload; - } else { - throw new Error("unknown magic: " + magic.toString()); - } - pos += len; + const value: BinaryResponse = { subscriptionId }; + while (pos < buffData.length) { + const len = buffData + .subarray(pos, pos + UINT16_NUM_BYTES) + .readUint16BE(); + pos += UINT16_NUM_BYTES; + const magic = buffData + .subarray(pos, pos + UINT32_NUM_BYTES) + .readUint32LE(); + if (magic == FORMAT_MAGICS_LE.EVM) { + value.evm = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.SOLANA) { + value.solana = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) { + value.leEcdsa = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) { + value.leUnsigned = buffData.subarray(pos, pos + len); + } else if (magic == FORMAT_MAGICS_LE.JSON) { + value.parsed = JSON.parse( + buffData.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(), + ) as ParsedPayload; + } else { + throw new Error(`unknown magic: ${magic.toString()}`); } - handler({ type: "binary", value }); - } else { - throw new TypeError("unexpected event data type"); + pos += len; } + handler({ type: "binary", value }); }); } diff --git a/lazer/sdk/js/src/protocol.ts b/lazer/sdk/js/src/protocol.ts index 30a1bb83ab..2050b46e25 100644 --- a/lazer/sdk/js/src/protocol.ts +++ b/lazer/sdk/js/src/protocol.ts @@ -158,3 +158,7 @@ export type JsonUpdate = { leEcdsa?: JsonBinaryData; leUnsigned?: JsonBinaryData; }; + +export enum CustomSocketClosureCodes { + CLIENT_TIMEOUT_BUT_RECONNECTING = 4000, +} diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 6131100d97..b23af3775e 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -5,6 +5,9 @@ import WebSocket from "isomorphic-ws"; import type { Logger } from "ts-log"; import { dummyLogger } from "ts-log"; +import { CustomSocketClosureCodes } from "../protocol.js"; +import { envIsBrowserOrWorker } from "../util/env-util.js"; + const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second' const DEFAULT_LOG_AFTER_RETRY_COUNT = 10; @@ -18,6 +21,21 @@ export type ResilientWebSocketConfig = { logAfterRetryCount?: number; }; +/** + * the isomorphic-ws package ships with some slightly-erroneous typings. + * namely, it returns a WebSocket with typings that indicate the "terminate()" function + * is available on all platforms. + * Given that, under the hood, it is using the globalThis.WebSocket class, if it's available, + * and falling back to using the https://www.npmjs.com/package/ws package, this + * means there are API differences between the native WebSocket (the one in a web browser) + * and the server-side version from the "ws" package. + * + * This type creates a WebSocket type reference we use to indicate the unknown + * nature of the env in which is code is run. + */ +type UnsafeWebSocket = Omit & + Partial>; + export class ResilientWebSocket { private endpoint: string; private wsOptions?: ClientOptions | ClientRequestArgs | undefined; @@ -26,7 +44,7 @@ export class ResilientWebSocket { private maxRetryDelayMs: number; private logAfterRetryCount: number; - wsClient: undefined | WebSocket; + wsClient: UnsafeWebSocket | undefined; wsUserClosed = false; private wsFailedAttempts: number; private heartbeatTimeout?: NodeJS.Timeout | undefined; @@ -106,7 +124,13 @@ export class ResilientWebSocket { this.retryTimeout = undefined; } - this.wsClient = new WebSocket(this.endpoint, this.wsOptions); + // browser constructor supports a different 2nd argument for the constructor, + // so we need to ensure it's not included if we're running in that environment: + // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols + this.wsClient = new WebSocket( + this.endpoint, + envIsBrowserOrWorker() ? undefined : this.wsOptions, + ); this.wsClient.addEventListener("open", () => { this.logger.info("WebSocket connection established"); @@ -154,8 +178,21 @@ export class ResilientWebSocket { } this.heartbeatTimeout = setTimeout(() => { - this.logger.warn("Connection timed out. Reconnecting..."); - this.wsClient?.terminate(); + const warnMsg = "Connection timed out. Reconnecting..."; + this.logger.warn(warnMsg); + if (this.wsClient) { + if (typeof this.wsClient.terminate === "function") { + this.wsClient.terminate(); + } else { + // terminate is an implementation detail of the node-friendly + // https://www.npmjs.com/package/ws package, but is not a native WebSocket API, + // so we have to use the close method + this.wsClient.close( + CustomSocketClosureCodes.CLIENT_TIMEOUT_BUT_RECONNECTING, + warnMsg, + ); + } + } this.handleReconnect(); }, this.heartbeatTimeoutDurationMs); } diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index f56a91ec24..eee5ddb3bc 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -11,9 +11,18 @@ import { DEFAULT_STREAM_SERVICE_0_URL, DEFAULT_STREAM_SERVICE_1_URL, } from "../constants.js"; +import { + addAuthTokenToWebSocketUrl, + bufferFromWebsocketData, + envIsBrowserOrWorker, +} from "../util/index.js"; const DEFAULT_NUM_CONNECTIONS = 4; +type WebSocketOnMessageCallback = ( + data: WebSocket.Data, +) => void | Promise; + export type WebSocketPoolConfig = { urls?: string[]; numConnections?: number; @@ -25,7 +34,7 @@ export class WebSocketPool { rwsPool: ResilientWebSocket[]; private cache: TTLCache; private subscriptions: Map; // id -> subscription Request - private messageListeners: ((event: WebSocket.Data) => void)[]; + private messageListeners: WebSocketOnMessageCallback[]; private allConnectionsDownListeners: (() => void)[]; private wasAllDown = true; private checkConnectionStatesInterval: NodeJS.Timeout; @@ -65,16 +74,19 @@ export class WebSocketPool { const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS; for (let i = 0; i < numConnections; i++) { - const url = urls[i % urls.length]; + const baseUrl = urls[i % urls.length]; + const isBrowser = envIsBrowserOrWorker(); + const url = isBrowser + ? addAuthTokenToWebSocketUrl(baseUrl, token) + : baseUrl; if (!url) { throw new Error(`URLs must not be null or empty`); } - const wsOptions = { + const wsOptions: ResilientWebSocketConfig["wsOptions"] = { ...config.rwsConfig?.wsOptions, - headers: { - Authorization: `Bearer ${token}`, - }, + headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` }, }; + const rws = new ResilientWebSocket({ ...config.rwsConfig, endpoint: url, @@ -104,7 +116,12 @@ export class WebSocketPool { rws.onError = config.onError; } // Handle all client messages ourselves. Dedupe before sending to registered message handlers. - rws.onMessage = pool.dedupeHandler; + rws.onMessage = (data) => { + pool.dedupeHandler(data).catch((error: unknown) => { + const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`; + throw new Error(errMsg); + }); + }; pool.rwsPool.push(rws); rws.startWebSocket(); } @@ -140,15 +157,18 @@ export class WebSocketPool { } } + private async constructCacheKeyFromWebsocketData(data: WebSocket.Data) { + if (typeof data === "string") return data; + const buff = await bufferFromWebsocketData(data); + return buff.toString("hex"); + } + /** * Handles incoming websocket messages by deduplicating identical messages received across * multiple connections before forwarding to registered handlers */ - dedupeHandler = (data: WebSocket.Data): void => { - const cacheKey = - typeof data === "string" - ? data - : Buffer.from(data as Buffer).toString("hex"); + dedupeHandler = async (data: WebSocket.Data): Promise => { + const cacheKey = await this.constructCacheKeyFromWebsocketData(data); if (this.cache.has(cacheKey)) { this.logger.debug("Dropping duplicate message"); @@ -161,9 +181,7 @@ export class WebSocketPool { this.handleErrorMessages(data); } - for (const handler of this.messageListeners) { - handler(data); - } + await Promise.all(this.messageListeners.map((handler) => handler(data))); }; sendRequest(request: Request) { @@ -189,7 +207,7 @@ export class WebSocketPool { this.sendRequest(request); } - addMessageListener(handler: (data: WebSocket.Data) => void): void { + addMessageListener(handler: WebSocketOnMessageCallback): void { this.messageListeners.push(handler); } diff --git a/lazer/sdk/js/src/util/buffer-util.ts b/lazer/sdk/js/src/util/buffer-util.ts new file mode 100644 index 0000000000..8f9df33a6d --- /dev/null +++ b/lazer/sdk/js/src/util/buffer-util.ts @@ -0,0 +1,38 @@ +// the linting rules don't allow importing anything that might clash with +// a global, top-level import. we disable this rule because we need this +// imported from our installed dependency +// eslint-disable-next-line unicorn/prefer-node-protocol +import { Buffer as BrowserBuffer } from "buffer"; + +import type { Data } from "isomorphic-ws"; + +const BufferClassToUse = + "Buffer" in globalThis ? globalThis.Buffer : BrowserBuffer; + +/** + * given a relatively unknown websocket frame data object, + * returns a valid Buffer instance that is safe to use + * isomorphically in any JS runtime environment + */ +export async function bufferFromWebsocketData(data: Data): Promise { + if (typeof data === "string") { + return BufferClassToUse.from(new TextEncoder().encode(data).buffer); + } + + if (data instanceof BufferClassToUse) return data; + + if (data instanceof Blob) { + // let the uncaught promise exception bubble up if there's an issue + return BufferClassToUse.from(await data.arrayBuffer()); + } + + if (data instanceof ArrayBuffer) return BufferClassToUse.from(data); + + if (Array.isArray(data)) { + // an array of buffers is highly unlikely, but it is a possibility + // indicated by the WebSocket Data interface + return BufferClassToUse.concat(data); + } + + return data; +} diff --git a/lazer/sdk/js/src/util/env-util.ts b/lazer/sdk/js/src/util/env-util.ts new file mode 100644 index 0000000000..78e763ce02 --- /dev/null +++ b/lazer/sdk/js/src/util/env-util.ts @@ -0,0 +1,35 @@ +// we create this local-only type, which has assertions made to indicate +// that we do not know and cannot guarantee which JS environment we are in +const g = globalThis as Partial<{ + self: typeof globalThis.self; + window: typeof globalThis.window; +}>; + +/** + * Detects if this code is running within any Service or WebWorker context. + * @returns true if in a worker of some kind, false if otherwise + */ +export function envIsServiceOrWebWorker() { + return ( + typeof WorkerGlobalScope !== "undefined" && + g.self instanceof WorkerGlobalScope + ); +} + +/** + * Detects if the code is running in a regular DOM or Web Worker context. + * @returns true if running in a DOM or Web Worker context, false if running in Node.js + */ +export function envIsBrowser() { + return g.window !== undefined; +} + +/** + * a convenience method that returns whether or not + * this code is executing in some type of browser-centric environment + * + * @returns true if in the browser's main UI thread or in a worker, false if otherwise + */ +export function envIsBrowserOrWorker() { + return envIsServiceOrWebWorker() || envIsBrowser(); +} diff --git a/lazer/sdk/js/src/util/index.ts b/lazer/sdk/js/src/util/index.ts new file mode 100644 index 0000000000..e4155c22ab --- /dev/null +++ b/lazer/sdk/js/src/util/index.ts @@ -0,0 +1,3 @@ +export * from "./buffer-util.js"; +export * from "./env-util.js"; +export * from "./url-util.js"; diff --git a/lazer/sdk/js/src/util/url-util.ts b/lazer/sdk/js/src/util/url-util.ts new file mode 100644 index 0000000000..db835e5599 --- /dev/null +++ b/lazer/sdk/js/src/util/url-util.ts @@ -0,0 +1,19 @@ +const ACCESS_TOKEN_QUERY_PARAM_KEY = "ACCESS_TOKEN"; + +/** + * Given a URL to a hosted lazer stream service and a possible auth token, + * appends the auth token as a query parameter and returns the URL with the token + * contained within. + * If the URL provided is nullish, it is returned as-is (in the same nullish format). + * If the token is nullish, the baseUrl given is returned, instead. + */ +export function addAuthTokenToWebSocketUrl( + baseUrl: string | null | undefined, + authToken: string | null | undefined, +) { + if (!baseUrl || !authToken) return baseUrl; + const parsedUrl = new URL(baseUrl); + parsedUrl.searchParams.set(ACCESS_TOKEN_QUERY_PARAM_KEY, authToken); + + return parsedUrl.toString(); +} diff --git a/lazer/sdk/js/tsconfig.json b/lazer/sdk/js/tsconfig.json index 32a3705250..e4af8b1248 100644 --- a/lazer/sdk/js/tsconfig.json +++ b/lazer/sdk/js/tsconfig.json @@ -1,4 +1,7 @@ { "extends": "@cprussin/tsconfig/base.json", - "exclude": ["node_modules", "dist"] + "exclude": ["node_modules", "dist"], + "compilerOptions": { + "lib": ["DOM", "DOM.Iterable", "WebWorker"] + } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2990378003..8ecf2e92cf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2077,9 +2077,9 @@ importers: '@isaacs/ttlcache': specifier: ^1.4.1 version: 1.4.1 - cross-fetch: - specifier: ^4.0.0 - version: 4.1.0(encoding@0.1.13) + buffer: + specifier: ^6.0.3 + version: 6.0.3 isomorphic-ws: specifier: ^5.0.0 version: 5.0.0(ws@8.18.1(bufferutil@4.0.9)(utf-8-validate@6.0.3))