Skip to content

Commit 487ae1e

Browse files
authored
Merge pull request #3139 from pyth-network/bduran/lazer-js-sdk/browser-support
fix(lazer-sdk): implemented function to convert all non-string WS frames to an array buffer, then leverage an isomorphic buffer implementation
2 parents 6344c00 + f7277e9 commit 487ae1e

File tree

12 files changed

+286
-66
lines changed

12 files changed

+286
-66
lines changed

lazer/sdk/js/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,60 @@
33
## Contributing & Development
44

55
See [contributing.md](docs/contributing/contributing.md) for information on how to develop or contribute to this project!
6+
7+
## How to use
8+
9+
```javascript
10+
import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk";
11+
12+
const c = await PythLazerClient.create({
13+
token: "YOUR-AUTH-TOKEN-HERE",
14+
logger: console, // Optionally log operations (to the console in this case.)
15+
webSocketPoolConfig: {
16+
numConnections: 4, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 4.
17+
onError: (error) => {
18+
console.error("⛔️ WebSocket error:", error.message);
19+
},
20+
// Optional configuration for resilient WebSocket connections
21+
rwsConfig: {
22+
heartbeatTimeoutDurationMs: 5000, // Optional heartbeat timeout duration in milliseconds
23+
maxRetryDelayMs: 1000, // Optional maximum retry delay in milliseconds
24+
logAfterRetryCount: 10, // Optional log after how many retries
25+
},
26+
},
27+
});
28+
29+
c.addMessageListener((message) => {
30+
console.info("received the following from the Lazer stream:", message);
31+
});
32+
33+
// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down)
34+
// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown().
35+
c.addAllConnectionsDownListener(() => {
36+
console.error("All connections are down!");
37+
});
38+
39+
// Create and remove one or more subscriptions on the fly
40+
c.subscribe({
41+
type: "subscribe",
42+
subscriptionId: 1,
43+
priceFeedIds: [1, 2],
44+
properties: ["price"],
45+
formats: ["solana"],
46+
deliveryFormat: "binary",
47+
channel: "fixed_rate@200ms",
48+
parsed: false,
49+
jsonBinaryEncoding: "base64",
50+
});
51+
c.subscribe({
52+
type: "subscribe",
53+
subscriptionId: 2,
54+
priceFeedIds: [1, 2, 3, 4, 5],
55+
properties: ["price", "exponent", "publisherCount", "confidence"],
56+
formats: ["evm"],
57+
deliveryFormat: "json",
58+
channel: "fixed_rate@200ms",
59+
parsed: true,
60+
jsonBinaryEncoding: "hex",
61+
});
62+
```

lazer/sdk/js/package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
{
22
"name": "@pythnetwork/pyth-lazer-sdk",
3-
"version": "4.0.0",
3+
"version": "5.0.0",
44
"description": "Pyth Lazer SDK",
5+
"engines": {
6+
"node": ">=22"
7+
},
58
"publishConfig": {
69
"access": "public"
710
},
@@ -61,7 +64,7 @@
6164
"license": "Apache-2.0",
6265
"dependencies": {
6366
"@isaacs/ttlcache": "^1.4.1",
64-
"cross-fetch": "^4.0.0",
67+
"buffer": "^6.0.3",
6568
"isomorphic-ws": "^5.0.0",
6669
"ts-log": "^2.2.7",
6770
"ws": "^8.18.0"

lazer/sdk/js/src/client.ts

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import fetch from "cross-fetch";
21
import WebSocket from "isomorphic-ws";
32
import type { Logger } from "ts-log";
43
import { dummyLogger } from "ts-log";
@@ -20,6 +19,7 @@ import type {
2019
import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js";
2120
import type { WebSocketPoolConfig } from "./socket/websocket-pool.js";
2221
import { WebSocketPool } from "./socket/websocket-pool.js";
22+
import { bufferFromWebsocketData } from "./util/buffer-util.js";
2323

2424
export type BinaryResponse = {
2525
subscriptionId: number;
@@ -113,53 +113,56 @@ export class PythLazerClient {
113113
*/
114114
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
115115
const wsp = this.getWebSocketPool();
116-
wsp.addMessageListener((data: WebSocket.Data) => {
116+
wsp.addMessageListener(async (data: WebSocket.Data) => {
117117
if (typeof data == "string") {
118118
handler({
119119
type: "json",
120120
value: JSON.parse(data) as Response,
121121
});
122-
} else if (Buffer.isBuffer(data)) {
123-
let pos = 0;
124-
const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
125-
pos += UINT32_NUM_BYTES;
126-
if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
127-
throw new Error("binary update format magic mismatch");
128-
}
129-
// TODO: some uint64 values may not be representable as Number.
130-
const subscriptionId = Number(
131-
data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
132-
);
133-
pos += UINT64_NUM_BYTES;
122+
return;
123+
}
124+
const buffData = await bufferFromWebsocketData(data);
125+
let pos = 0;
126+
const magic = buffData
127+
.subarray(pos, pos + UINT32_NUM_BYTES)
128+
.readUint32LE();
129+
pos += UINT32_NUM_BYTES;
130+
if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
131+
throw new Error("binary update format magic mismatch");
132+
}
133+
// TODO: some uint64 values may not be representable as Number.
134+
const subscriptionId = Number(
135+
buffData.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
136+
);
137+
pos += UINT64_NUM_BYTES;
134138

135-
const value: BinaryResponse = { subscriptionId };
136-
while (pos < data.length) {
137-
const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
138-
pos += UINT16_NUM_BYTES;
139-
const magic = data
140-
.subarray(pos, pos + UINT32_NUM_BYTES)
141-
.readUint32LE();
142-
if (magic == FORMAT_MAGICS_LE.EVM) {
143-
value.evm = data.subarray(pos, pos + len);
144-
} else if (magic == FORMAT_MAGICS_LE.SOLANA) {
145-
value.solana = data.subarray(pos, pos + len);
146-
} else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
147-
value.leEcdsa = data.subarray(pos, pos + len);
148-
} else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
149-
value.leUnsigned = data.subarray(pos, pos + len);
150-
} else if (magic == FORMAT_MAGICS_LE.JSON) {
151-
value.parsed = JSON.parse(
152-
data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
153-
) as ParsedPayload;
154-
} else {
155-
throw new Error("unknown magic: " + magic.toString());
156-
}
157-
pos += len;
139+
const value: BinaryResponse = { subscriptionId };
140+
while (pos < buffData.length) {
141+
const len = buffData
142+
.subarray(pos, pos + UINT16_NUM_BYTES)
143+
.readUint16BE();
144+
pos += UINT16_NUM_BYTES;
145+
const magic = buffData
146+
.subarray(pos, pos + UINT32_NUM_BYTES)
147+
.readUint32LE();
148+
if (magic == FORMAT_MAGICS_LE.EVM) {
149+
value.evm = buffData.subarray(pos, pos + len);
150+
} else if (magic == FORMAT_MAGICS_LE.SOLANA) {
151+
value.solana = buffData.subarray(pos, pos + len);
152+
} else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
153+
value.leEcdsa = buffData.subarray(pos, pos + len);
154+
} else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
155+
value.leUnsigned = buffData.subarray(pos, pos + len);
156+
} else if (magic == FORMAT_MAGICS_LE.JSON) {
157+
value.parsed = JSON.parse(
158+
buffData.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
159+
) as ParsedPayload;
160+
} else {
161+
throw new Error(`unknown magic: ${magic.toString()}`);
158162
}
159-
handler({ type: "binary", value });
160-
} else {
161-
throw new TypeError("unexpected event data type");
163+
pos += len;
162164
}
165+
handler({ type: "binary", value });
163166
});
164167
}
165168

lazer/sdk/js/src/protocol.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,3 +158,7 @@ export type JsonUpdate = {
158158
leEcdsa?: JsonBinaryData;
159159
leUnsigned?: JsonBinaryData;
160160
};
161+
162+
export enum CustomSocketClosureCodes {
163+
CLIENT_TIMEOUT_BUT_RECONNECTING = 4000,
164+
}

lazer/sdk/js/src/socket/resilient-websocket.ts

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import WebSocket from "isomorphic-ws";
55
import type { Logger } from "ts-log";
66
import { dummyLogger } from "ts-log";
77

8+
import { CustomSocketClosureCodes } from "../protocol.js";
9+
import { envIsBrowserOrWorker } from "../util/env-util.js";
10+
811
const DEFAULT_HEARTBEAT_TIMEOUT_DURATION_MS = 5000; // 5 seconds
912
const DEFAULT_MAX_RETRY_DELAY_MS = 1000; // 1 second'
1013
const DEFAULT_LOG_AFTER_RETRY_COUNT = 10;
@@ -18,6 +21,21 @@ export type ResilientWebSocketConfig = {
1821
logAfterRetryCount?: number;
1922
};
2023

24+
/**
25+
* the isomorphic-ws package ships with some slightly-erroneous typings.
26+
* namely, it returns a WebSocket with typings that indicate the "terminate()" function
27+
* is available on all platforms.
28+
* Given that, under the hood, it is using the globalThis.WebSocket class, if it's available,
29+
* and falling back to using the https://www.npmjs.com/package/ws package, this
30+
* means there are API differences between the native WebSocket (the one in a web browser)
31+
* and the server-side version from the "ws" package.
32+
*
33+
* This type creates a WebSocket type reference we use to indicate the unknown
34+
* nature of the env in which is code is run.
35+
*/
36+
type UnsafeWebSocket = Omit<WebSocket, "terminate"> &
37+
Partial<Pick<WebSocket, "terminate">>;
38+
2139
export class ResilientWebSocket {
2240
private endpoint: string;
2341
private wsOptions?: ClientOptions | ClientRequestArgs | undefined;
@@ -26,7 +44,7 @@ export class ResilientWebSocket {
2644
private maxRetryDelayMs: number;
2745
private logAfterRetryCount: number;
2846

29-
wsClient: undefined | WebSocket;
47+
wsClient: UnsafeWebSocket | undefined;
3048
wsUserClosed = false;
3149
private wsFailedAttempts: number;
3250
private heartbeatTimeout?: NodeJS.Timeout | undefined;
@@ -106,7 +124,13 @@ export class ResilientWebSocket {
106124
this.retryTimeout = undefined;
107125
}
108126

109-
this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
127+
// browser constructor supports a different 2nd argument for the constructor,
128+
// so we need to ensure it's not included if we're running in that environment:
129+
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#protocols
130+
this.wsClient = new WebSocket(
131+
this.endpoint,
132+
envIsBrowserOrWorker() ? undefined : this.wsOptions,
133+
);
110134

111135
this.wsClient.addEventListener("open", () => {
112136
this.logger.info("WebSocket connection established");
@@ -154,8 +178,21 @@ export class ResilientWebSocket {
154178
}
155179

156180
this.heartbeatTimeout = setTimeout(() => {
157-
this.logger.warn("Connection timed out. Reconnecting...");
158-
this.wsClient?.terminate();
181+
const warnMsg = "Connection timed out. Reconnecting...";
182+
this.logger.warn(warnMsg);
183+
if (this.wsClient) {
184+
if (typeof this.wsClient.terminate === "function") {
185+
this.wsClient.terminate();
186+
} else {
187+
// terminate is an implementation detail of the node-friendly
188+
// https://www.npmjs.com/package/ws package, but is not a native WebSocket API,
189+
// so we have to use the close method
190+
this.wsClient.close(
191+
CustomSocketClosureCodes.CLIENT_TIMEOUT_BUT_RECONNECTING,
192+
warnMsg,
193+
);
194+
}
195+
}
159196
this.handleReconnect();
160197
}, this.heartbeatTimeoutDurationMs);
161198
}

lazer/sdk/js/src/socket/websocket-pool.ts

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,18 @@ import {
1111
DEFAULT_STREAM_SERVICE_0_URL,
1212
DEFAULT_STREAM_SERVICE_1_URL,
1313
} from "../constants.js";
14+
import {
15+
addAuthTokenToWebSocketUrl,
16+
bufferFromWebsocketData,
17+
envIsBrowserOrWorker,
18+
} from "../util/index.js";
1419

1520
const DEFAULT_NUM_CONNECTIONS = 4;
1621

22+
type WebSocketOnMessageCallback = (
23+
data: WebSocket.Data,
24+
) => void | Promise<void>;
25+
1726
export type WebSocketPoolConfig = {
1827
urls?: string[];
1928
numConnections?: number;
@@ -25,7 +34,7 @@ export class WebSocketPool {
2534
rwsPool: ResilientWebSocket[];
2635
private cache: TTLCache<string, boolean>;
2736
private subscriptions: Map<number, Request>; // id -> subscription Request
28-
private messageListeners: ((event: WebSocket.Data) => void)[];
37+
private messageListeners: WebSocketOnMessageCallback[];
2938
private allConnectionsDownListeners: (() => void)[];
3039
private wasAllDown = true;
3140
private checkConnectionStatesInterval: NodeJS.Timeout;
@@ -65,16 +74,19 @@ export class WebSocketPool {
6574
const numConnections = config.numConnections ?? DEFAULT_NUM_CONNECTIONS;
6675

6776
for (let i = 0; i < numConnections; i++) {
68-
const url = urls[i % urls.length];
77+
const baseUrl = urls[i % urls.length];
78+
const isBrowser = envIsBrowserOrWorker();
79+
const url = isBrowser
80+
? addAuthTokenToWebSocketUrl(baseUrl, token)
81+
: baseUrl;
6982
if (!url) {
7083
throw new Error(`URLs must not be null or empty`);
7184
}
72-
const wsOptions = {
85+
const wsOptions: ResilientWebSocketConfig["wsOptions"] = {
7386
...config.rwsConfig?.wsOptions,
74-
headers: {
75-
Authorization: `Bearer ${token}`,
76-
},
87+
headers: isBrowser ? undefined : { Authorization: `Bearer ${token}` },
7788
};
89+
7890
const rws = new ResilientWebSocket({
7991
...config.rwsConfig,
8092
endpoint: url,
@@ -104,7 +116,12 @@ export class WebSocketPool {
104116
rws.onError = config.onError;
105117
}
106118
// Handle all client messages ourselves. Dedupe before sending to registered message handlers.
107-
rws.onMessage = pool.dedupeHandler;
119+
rws.onMessage = (data) => {
120+
pool.dedupeHandler(data).catch((error: unknown) => {
121+
const errMsg = `An error occurred in the WebSocket pool's dedupeHandler: ${error instanceof Error ? error.message : String(error)}`;
122+
throw new Error(errMsg);
123+
});
124+
};
108125
pool.rwsPool.push(rws);
109126
rws.startWebSocket();
110127
}
@@ -140,15 +157,18 @@ export class WebSocketPool {
140157
}
141158
}
142159

160+
private async constructCacheKeyFromWebsocketData(data: WebSocket.Data) {
161+
if (typeof data === "string") return data;
162+
const buff = await bufferFromWebsocketData(data);
163+
return buff.toString("hex");
164+
}
165+
143166
/**
144167
* Handles incoming websocket messages by deduplicating identical messages received across
145168
* multiple connections before forwarding to registered handlers
146169
*/
147-
dedupeHandler = (data: WebSocket.Data): void => {
148-
const cacheKey =
149-
typeof data === "string"
150-
? data
151-
: Buffer.from(data as Buffer).toString("hex");
170+
dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
171+
const cacheKey = await this.constructCacheKeyFromWebsocketData(data);
152172

153173
if (this.cache.has(cacheKey)) {
154174
this.logger.debug("Dropping duplicate message");
@@ -161,9 +181,7 @@ export class WebSocketPool {
161181
this.handleErrorMessages(data);
162182
}
163183

164-
for (const handler of this.messageListeners) {
165-
handler(data);
166-
}
184+
await Promise.all(this.messageListeners.map((handler) => handler(data)));
167185
};
168186

169187
sendRequest(request: Request) {
@@ -189,7 +207,7 @@ export class WebSocketPool {
189207
this.sendRequest(request);
190208
}
191209

192-
addMessageListener(handler: (data: WebSocket.Data) => void): void {
210+
addMessageListener(handler: WebSocketOnMessageCallback): void {
193211
this.messageListeners.push(handler);
194212
}
195213

0 commit comments

Comments
 (0)