Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions hyperliquid/websocket_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,50 @@ def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]:
return f'activeAssetData:{ws_msg["data"]["coin"].lower()},{ws_msg["data"]["user"].lower()}'


def identifier_to_subscription(identifier: str) -> dict:
if identifier == "allMids":
return {"type": "allMids"}
elif identifier == "userEvents":
return {"type": "userEvents"}
elif identifier == "orderUpdates":
return {"type": "orderUpdates"}

if identifier.startswith("l2Book:"):
coin = identifier[len("l2Book:") :]
return {"type": "l2Book", "coin": coin}
elif identifier.startswith("trades:"):
coin = identifier[len("trades:") :]
return {"type": "trades", "coin": coin}
elif identifier.startswith("userFills:"):
user = identifier[len("userFills:") :]
return {"type": "userFills", "user": user}
elif identifier.startswith("candle:"):
data = identifier[len("candle:") :]
coin, interval = data.split(",", 1)
return {"type": "candle", "coin": coin, "interval": interval}
elif identifier.startswith("userFundings:"):
user = identifier[len("userFundings:") :]
return {"type": "userFundings", "user": user}
elif identifier.startswith("userNonFundingLedgerUpdates:"):
user = identifier[len("userNonFundingLedgerUpdates:") :]
return {"type": "userNonFundingLedgerUpdates", "user": user}
elif identifier.startswith("webData2:"):
user = identifier[len("webData2:") :]
return {"type": "webData2", "user": user}
elif identifier.startswith("bbo:"):
coin = identifier[len("bbo:") :]
return {"type": "bbo", "coin": coin}
elif identifier.startswith("activeAssetCtx:"):
coin = identifier[len("activeAssetCtx:") :]
return {"type": "activeAssetCtx", "coin": coin}
elif identifier.startswith("activeAssetData:"):
data = identifier[len("activeAssetData:") :]
coin, user = data.split(",", 1)
return {"type": "activeAssetData", "coin": coin, "user": user}

raise ValueError(f"Unknown subscription identifier: {identifier}")


class WebsocketManager(threading.Thread):
def __init__(self, base_url):
super().__init__()
Expand Down Expand Up @@ -160,3 +204,83 @@ def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool:
self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription}))
self.active_subscriptions[identifier] = new_active_subscriptions
return len(active_subscriptions) != len(new_active_subscriptions)


class ReconnectableWebsocketManager(WebsocketManager):
def __init__(self, base_url: str, *args, **kwargs):
# Store base_url for reuse during reconnection
self.base_url = base_url
super().__init__(base_url, *args, **kwargs)
# Override close and error handlers to trigger reconnection
self.ws.on_close = self.on_close
self.ws.on_error = self.on_error
# Lock to protect shared state (queued_subscriptions, active_subscriptions)

def on_open(self, _ws):
logging.debug("on_open")
self.ws_ready = True
# Process queued subscriptions and move them to active_subscriptions
for subscription, active_subscription in self.queued_subscriptions:
self.subscribe(
subscription,
active_subscription.callback,
active_subscription.subscription_id,
)
self.queued_subscriptions.clear()

def _reconnect(self):
# Mark connection as not ready
self.ws_ready = False
# Move all active subscriptions back into the subscription queue
for identifier, active_subscriptions in self.active_subscriptions.items():
subscription = identifier_to_subscription(identifier)
for active_subscription in active_subscriptions:
self.subscribe(
subscription,
active_subscription.callback,
active_subscription.subscription_id,
)
self.active_subscriptions.clear()

def _start_ping_sender(self):
if not self.ping_sender.is_alive():
self.ping_sender = threading.Thread(target=self.send_ping, daemon=True)
self.ping_sender.start()

def on_close(self, ws, close_status_code, close_msg):
logging.debug(f"ReconnectableWebsocketManager on_close: {close_status_code} - {close_msg}")
# If stop_event is set, skip reconnection
if self.stop_event.is_set():
return
self._reconnect()

def on_error(self, ws, error):
logging.debug(f"ReconnectableWebsocketManager on_error: {error}")
# If stop_event is set, skip reconnection
if self.stop_event.is_set():
return
self._reconnect()

def run(self, *, ping_timeout=15, ping_interval=30, reconnect_interval=5):
# Start the ping sender thread (keeps connection alive)
self._start_ping_sender()
# Main loop to maintain the connection and handle reconnections
while not self.stop_event.is_set():
logging.debug("ReconnectableWebsocketManager connecting...")
self.ws.run_forever(ping_timeout=ping_timeout, ping_interval=ping_interval)
logging.debug(
f"ReconnectableWebsocketManager disconnected. Reconnecting in {reconnect_interval} seconds..."
)
# Wait for the reconnection interval or break if stop_event is set
if self.stop_event.wait(reconnect_interval):
break
# Create a new WebSocketApp instance for reconnection
ws_url = "ws" + self.base_url[len("http") :] + "/ws"
self.ws = websocket.WebSocketApp(
ws_url,
on_message=self.on_message,
on_open=self.on_open,
on_close=self.on_close,
on_error=self.on_error,
)
self._start_ping_sender()