Skip to content

Commit 6e91b89

Browse files
committed
fix(lazer-sdk): implemented function to convert all non-string WS frames to an array buffer, then leverage an isomorphic buffer implementation
1 parent 200237d commit 6e91b89

File tree

6 files changed

+359
-88
lines changed

6 files changed

+359
-88
lines changed

lazer/sdk/js/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
"license": "Apache-2.0",
6060
"dependencies": {
6161
"@isaacs/ttlcache": "^1.4.1",
62+
"buffer": "^6.0.3",
6263
"isomorphic-ws": "^5.0.0",
6364
"ts-log": "^2.2.7",
6465
"ws": "^8.18.0"

lazer/sdk/js/src/buffer/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// need to disable this as this is actually a polyfill for the browser,
2+
// which we use, conditionally, depending on environment
3+
// eslint-disable-next-line unicorn/prefer-node-protocol
4+
import { Buffer as BrowserBuffer } from 'buffer';
5+
6+
import { envIsBrowser, envIsWorker } from '../util.js';
7+
8+
export const IsomorphicBuffer = envIsBrowser() || envIsWorker() ? BrowserBuffer : Buffer;

lazer/sdk/js/src/client.ts

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import WebSocket from "isomorphic-ws";
22

3+
import { IsomorphicBuffer } from "./buffer/index.js";
34
import type { ParsedPayload, Request, Response } from "./protocol.js";
45
import { BINARY_UPDATE_FORMAT_MAGIC_LE, FORMAT_MAGICS_LE } from "./protocol.js";
56
import type { WebSocketPoolConfig } from "./socket/websocket-pool.js";
67
import { WebSocketPool } from "./socket/websocket-pool.js";
8+
import { convertWebsocketDataToArrayBuffer } from "./util.js";
79

810
export type BinaryResponse = {
911
subscriptionId: number;
@@ -15,17 +17,17 @@ export type BinaryResponse = {
1517
};
1618
export type JsonOrBinaryResponse =
1719
| {
18-
type: "json";
19-
value: Response;
20-
}
20+
type: "json";
21+
value: Response;
22+
}
2123
| { type: "binary"; value: BinaryResponse };
2224

2325
const UINT16_NUM_BYTES = 2;
2426
const UINT32_NUM_BYTES = 4;
2527
const UINT64_NUM_BYTES = 8;
2628

2729
export class PythLazerClient {
28-
private constructor(private readonly wsp: WebSocketPool) {}
30+
private constructor(private readonly wsp: WebSocketPool) { }
2931

3032
/**
3133
* Creates a new PythLazerClient instance.
@@ -46,53 +48,55 @@ export class PythLazerClient {
4648
* or a binary response containing EVM, Solana, or parsed payload data.
4749
*/
4850
addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
49-
this.wsp.addMessageListener((data: WebSocket.Data) => {
50-
if (typeof data == "string") {
51+
this.wsp.addMessageListener(async (data: WebSocket.Data) => {
52+
if (typeof data === "string") {
5153
handler({
5254
type: "json",
5355
value: JSON.parse(data) as Response,
5456
});
55-
} else if (Buffer.isBuffer(data)) {
56-
let pos = 0;
57-
const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
58-
pos += UINT32_NUM_BYTES;
59-
if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
60-
throw new Error("binary update format magic mismatch");
61-
}
62-
// TODO: some uint64 values may not be representable as Number.
63-
const subscriptionId = Number(
64-
data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
65-
);
66-
pos += UINT64_NUM_BYTES;
57+
return;
58+
}
59+
60+
const buff = IsomorphicBuffer.from(await convertWebsocketDataToArrayBuffer(data));
61+
62+
let pos = 0;
63+
const magic = buff.subarray(pos, pos + UINT32_NUM_BYTES).readUint32LE();
64+
pos += UINT32_NUM_BYTES;
65+
if (magic != BINARY_UPDATE_FORMAT_MAGIC_LE) {
66+
throw new Error("binary update format magic mismatch");
67+
}
68+
// TODO: some uint64 values may not be representable as Number.
69+
const subscriptionId = Number(
70+
buff.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE(),
71+
);
72+
pos += UINT64_NUM_BYTES;
6773

68-
const value: BinaryResponse = { subscriptionId };
69-
while (pos < data.length) {
70-
const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
71-
pos += UINT16_NUM_BYTES;
72-
const magic = data
73-
.subarray(pos, pos + UINT32_NUM_BYTES)
74-
.readUint32LE();
75-
if (magic == FORMAT_MAGICS_LE.EVM) {
76-
value.evm = data.subarray(pos, pos + len);
77-
} else if (magic == FORMAT_MAGICS_LE.SOLANA) {
78-
value.solana = data.subarray(pos, pos + len);
79-
} else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
80-
value.leEcdsa = data.subarray(pos, pos + len);
81-
} else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
82-
value.leUnsigned = data.subarray(pos, pos + len);
83-
} else if (magic == FORMAT_MAGICS_LE.JSON) {
84-
value.parsed = JSON.parse(
85-
data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
86-
) as ParsedPayload;
87-
} else {
88-
throw new Error("unknown magic: " + magic.toString());
89-
}
90-
pos += len;
74+
const value: BinaryResponse = { subscriptionId };
75+
while (pos < buff.length) {
76+
const len = buff.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
77+
pos += UINT16_NUM_BYTES;
78+
const magic = buff
79+
.subarray(pos, pos + UINT32_NUM_BYTES)
80+
.readUint32LE();
81+
if (magic == FORMAT_MAGICS_LE.EVM) {
82+
value.evm = buff.subarray(pos, pos + len);
83+
} else if (magic == FORMAT_MAGICS_LE.SOLANA) {
84+
value.solana = buff.subarray(pos, pos + len);
85+
} else if (magic == FORMAT_MAGICS_LE.LE_ECDSA) {
86+
value.leEcdsa = buff.subarray(pos, pos + len);
87+
} else if (magic == FORMAT_MAGICS_LE.LE_UNSIGNED) {
88+
value.leUnsigned = buff.subarray(pos, pos + len);
89+
} else if (magic == FORMAT_MAGICS_LE.JSON) {
90+
value.parsed = JSON.parse(
91+
buff.subarray(pos + UINT32_NUM_BYTES, pos + len).toString(),
92+
) as ParsedPayload;
93+
} else {
94+
throw new Error("unknown magic: " + magic.toString());
9195
}
92-
handler({ type: "binary", value });
93-
} else {
94-
throw new TypeError("unexpected event data type");
96+
pos += len;
9597
}
98+
handler({ type: "binary", value });
99+
return;
96100
});
97101
}
98102

lazer/sdk/js/src/socket/websocket-pool.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import { dummyLogger } from "ts-log";
99
import type { Request, Response } from "../protocol.js";
1010
import type { ResilientWebSocketConfig } from "./resilient-websocket.js";
1111
import { ResilientWebSocket } from "./resilient-websocket.js";
12-
import { envIsBrowser, envIsWorker } from "../util.js";
12+
import { IsomorphicBuffer } from "../buffer/index.js";
13+
import { convertWebsocketDataToArrayBuffer, envIsBrowser, envIsWorker } from "../util.js";
1314

1415
type AddAuthenticationReturnType = {
1516
endpoint: string;
@@ -43,17 +44,17 @@ function addAuthentication(
4344
wsOptions: undefined,
4445
};
4546
}
46-
// Node.js: Add Authorization header
47-
return {
48-
endpoint: url,
49-
wsOptions: {
50-
...wsOptions,
51-
headers: {
52-
...(wsOptions.headers as Record<string, string> | undefined),
53-
Authorization: `Bearer ${token}`,
54-
},
47+
// Node.js: Add Authorization header
48+
return {
49+
endpoint: url,
50+
wsOptions: {
51+
...wsOptions,
52+
headers: {
53+
...(wsOptions.headers as Record<string, string> | undefined),
54+
Authorization: `Bearer ${token}`,
5555
},
56-
};
56+
},
57+
};
5758
}
5859

5960
const DEFAULT_NUM_CONNECTIONS = 4;
@@ -67,11 +68,13 @@ export type WebSocketPoolConfig = {
6768
onError?: (error: ErrorEvent) => void;
6869
};
6970

71+
type WebSocketOnMessageCallback = (data: WebSocket.Data) => Promise<void>;
72+
7073
export class WebSocketPool {
7174
rwsPool: ResilientWebSocket[];
7275
private cache: TTLCache<string, boolean>;
7376
private subscriptions: Map<number, Request>; // id -> subscription Request
74-
private messageListeners: ((event: WebSocket.Data) => void)[];
77+
private messageListeners: WebSocketOnMessageCallback[];
7578
private allConnectionsDownListeners: (() => void)[];
7679
private wasAllDown = true;
7780
private checkConnectionStatesInterval: NodeJS.Timeout;
@@ -148,6 +151,7 @@ export class WebSocketPool {
148151
rws.onError = config.onError;
149152
}
150153
// Handle all client messages ourselves. Dedupe before sending to registered message handlers.
154+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
151155
rws.onMessage = pool.dedupeHandler;
152156
pool.rwsPool.push(rws);
153157
rws.startWebSocket();
@@ -184,15 +188,24 @@ export class WebSocketPool {
184188
}
185189
}
186190

191+
/**
192+
* internal utility function that will choose the best
193+
* parsing mechanism, based on the environment and the
194+
* data type received, and return a key, computed from the data,
195+
* which can be used as a cache lookup.
196+
* this function helps maintain isomorphism for this socket library
197+
*/
198+
private async parseSocketDataToKey(data: WebSocket.Data) {
199+
const buff = IsomorphicBuffer.from(await convertWebsocketDataToArrayBuffer(data));
200+
return buff.toString('hex');
201+
}
202+
187203
/**
188204
* Handles incoming websocket messages by deduplicating identical messages received across
189205
* multiple connections before forwarding to registered handlers
190206
*/
191-
dedupeHandler = (data: WebSocket.Data): void => {
192-
const cacheKey =
193-
typeof data === "string"
194-
? data
195-
: Buffer.from(data as Buffer).toString("hex");
207+
dedupeHandler = async (data: WebSocket.Data): Promise<void> => {
208+
const cacheKey = await this.parseSocketDataToKey(data);
196209

197210
if (this.cache.has(cacheKey)) {
198211
this.logger.debug("Dropping duplicate message");
@@ -205,9 +218,7 @@ export class WebSocketPool {
205218
this.handleErrorMessages(data);
206219
}
207220

208-
for (const handler of this.messageListeners) {
209-
handler(data);
210-
}
221+
await Promise.all(this.messageListeners.map(handler => handler(data)));
211222
};
212223

213224
sendRequest(request: Request) {
@@ -233,7 +244,7 @@ export class WebSocketPool {
233244
this.sendRequest(request);
234245
}
235246

236-
addMessageListener(handler: (data: WebSocket.Data) => void): void {
247+
addMessageListener(handler: WebSocketOnMessageCallback): void {
237248
this.messageListeners.push(handler);
238249
}
239250

lazer/sdk/js/src/util.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
// is specific to node, but also exists in various browser contexts:
66
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/globalThis
77

8+
import type { Data } from 'isomorphic-ws';
9+
810
/**
911
* Utility function for detecting whether the current JS execution
1012
* context is that of a background WebWorker or a ServiceWorker (not the main UI thread)
1113
*/
1214
export function envIsWorker() {
13-
const possiblyInAWorker = typeof WorkerGlobalScope === 'undefined' || typeof self === 'undefined';
15+
const possiblyInAWorker = typeof WorkerGlobalScope !== 'undefined' && typeof self !== 'undefined';
1416
return possiblyInAWorker && self instanceof WorkerGlobalScope;
1517
}
1618

@@ -33,3 +35,27 @@ export function envIsBrowser() {
3335
export function envIsNode() {
3436
return typeof globalThis !== 'undefined' && typeof globalThis.process === 'object';
3537
}
38+
39+
/**
40+
* given a relatively unknown websocket frame data object,
41+
* returns a consistent ArrayBuffer to use in any environment
42+
*/
43+
export async function convertWebsocketDataToArrayBuffer(data: Data): Promise<ArrayBuffer> {
44+
if (typeof data === 'string') {
45+
return new TextEncoder().encode(data).buffer;
46+
}
47+
if (data instanceof Blob) {
48+
// let the uncaught promise exception bubble up if there's an issue
49+
return data.arrayBuffer();
50+
}
51+
if (data instanceof ArrayBuffer) return data;
52+
if (Buffer.isBuffer(data)) {
53+
const arrBuffer = new ArrayBuffer(data.length);
54+
const v = new Uint8Array(arrBuffer);
55+
for (const [i, item] of data.entries()) {
56+
v[i] = item;
57+
}
58+
return arrBuffer;
59+
}
60+
throw new TypeError("unexpected event data type found when convertWebsocketDataToArrayBuffer() to called");
61+
}

0 commit comments

Comments
 (0)