From b5c4fc4d93186ed0951209fb719e85f531491a3d Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Wed, 9 Aug 2023 22:39:48 -0400 Subject: [PATCH 1/3] Allow custom app logic to trigger when a websocket connects/reconnects Signed-off-by: Peter Broadhurst --- lib/firefly.ts | 3 +++ lib/interfaces.ts | 9 +++++++++ lib/websocket.ts | 9 +++++++++ 3 files changed, 21 insertions(+) diff --git a/lib/firefly.ts b/lib/firefly.ts index 6167bd6..675c2d6 100644 --- a/lib/firefly.ts +++ b/lib/firefly.ts @@ -79,6 +79,7 @@ import { FireFlyDeleteOptions, FireFlyTokenApprovalFilter, FireFlyTokenApprovalResponse, + FireFlyWebSocketConnectCallback, } from './interfaces'; import { FireFlyWebSocket, FireFlyWebSocketCallback } from './websocket'; import HttpBase, { mapConfig } from './http'; @@ -598,6 +599,7 @@ export default class FireFly extends HttpBase { subscriptions: string | string[] | FireFlySubscriptionBase, callback: FireFlyWebSocketCallback, socketOptions?: WebSocket.ClientOptions | http.ClientRequestArgs, + afterConnect?: FireFlyWebSocketConnectCallback, ): FireFlyWebSocket { const options: FireFlyWebSocketOptions = { host: this.options.websocket.host, @@ -609,6 +611,7 @@ export default class FireFly extends HttpBase { reconnectDelay: this.options.websocket.reconnectDelay, heartbeatInterval: this.options.websocket.heartbeatInterval, socketOptions: socketOptions, + afterConnect: afterConnect, }; const handler: FireFlyWebSocketCallback = (socket, event) => { diff --git a/lib/interfaces.ts b/lib/interfaces.ts index 1c0a96c..e6fd09f 100644 --- a/lib/interfaces.ts +++ b/lib/interfaces.ts @@ -61,6 +61,14 @@ export interface FireFlyOptions extends FireFlyOptionsInput { }; } +export interface FireFlyWebSocketSender { + send: (json: JSON) => void; +} + +export interface FireFlyWebSocketConnectCallback { + (sender: FireFlyWebSocketSender): void | Promise; +} + export interface FireFlyWebSocketOptions { host: string; namespace: string; @@ -72,6 +80,7 @@ export interface FireFlyWebSocketOptions { reconnectDelay: number; heartbeatInterval: number; socketOptions?: WebSocket.ClientOptions | http.ClientRequestArgs; + afterConnect?: FireFlyWebSocketConnectCallback; } // Namespace diff --git a/lib/websocket.ts b/lib/websocket.ts index 3b9679a..1b84320 100644 --- a/lib/websocket.ts +++ b/lib/websocket.ts @@ -83,6 +83,9 @@ export class FireFlyWebSocket { ); this.logger.log(`Started listening on subscription ${this.options.namespace}:${name}`); } + if (this.options?.afterConnect !== undefined) { + this.options.afterConnect(this); + } }) .on('error', (err) => { this.logger.error('Error', err.stack); @@ -156,6 +159,12 @@ export class FireFlyWebSocket { } } + send(json: JSON) { + if (this.socket !== undefined) { + this.socket.send(JSON.stringify(json)); + } + } + ack(event: FireFlyEventDelivery) { if (this.socket !== undefined && event.id !== undefined) { this.socket.send( From 029e343ee2311a3ccbb06abd7773d366bbdc873f Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Aug 2023 13:44:20 -0400 Subject: [PATCH 2/3] Add option to wait for complete cleanup of the websocket on close Signed-off-by: Peter Broadhurst --- lib/websocket.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/websocket.ts b/lib/websocket.ts index 1b84320..ee2f48b 100644 --- a/lib/websocket.ts +++ b/lib/websocket.ts @@ -26,7 +26,7 @@ export class FireFlyWebSocket { private readonly logger = new Logger(FireFlyWebSocket.name); private socket?: WebSocket; - private closed = false; + private closed? = () => {}; private pingTimer?: NodeJS.Timeout; private disconnectTimer?: NodeJS.Timeout; private reconnectTimer?: NodeJS.Timeout; @@ -61,7 +61,7 @@ export class FireFlyWebSocket { auth, handshakeTimeout: this.options.heartbeatInterval, })); - this.closed = false; + this.closed = undefined; socket .on('open', () => { @@ -177,8 +177,10 @@ export class FireFlyWebSocket { } } - close() { - this.closed = true; + async close(wait?: boolean): Promise { + const closedPromise = new Promise(resolve => { + this.closed = resolve; + }); this.clearPingTimers(); if (this.socket) { try { @@ -186,6 +188,7 @@ export class FireFlyWebSocket { } catch (e: any) { this.logger.warn(`Failed to clean up websocket: ${e.message}`); } + if (wait) await closedPromise; this.socket = undefined; } } From e396f5d7a7db5157446014fccb59affcc5f33d3b Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 11 Aug 2023 13:50:02 -0400 Subject: [PATCH 3/3] Resolve the close promise Signed-off-by: Peter Broadhurst --- lib/websocket.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/websocket.ts b/lib/websocket.ts index ee2f48b..59377a5 100644 --- a/lib/websocket.ts +++ b/lib/websocket.ts @@ -93,6 +93,7 @@ export class FireFlyWebSocket { .on('close', () => { if (this.closed) { this.logger.log('Closed'); + this.closed(); // do this after all logging } else { this.disconnectDetected = true; this.reconnect('Closed by peer');