Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions interface/src/lib/stores/socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { writable } from 'svelte/store';

function createWebSocket() {
let listeners = new Map<string, Set<(data?: unknown) => void>>();
const { subscribe, set } = writable(false);
const socketEvents = ['open', 'close', 'error', 'message', 'unresponsive'] as const;
type SocketEvent = (typeof socketEvents)[number];
let unresponsiveTimeoutId: number;
let reconnectTimeoutId: number;
let ws: WebSocket;
let socketUrl: string | URL;

function init(url: string | URL) {
socketUrl = url;
connect();
}

function disconnect(reason: SocketEvent, event?: Event) {
ws.close();
set(false);
clearTimeout(unresponsiveTimeoutId);
clearTimeout(reconnectTimeoutId);
listeners.get(reason)?.forEach((listener) => listener(event));
reconnectTimeoutId = setTimeout(connect, 1000);
}

function connect() {
ws = new WebSocket(socketUrl);
ws.onopen = (ev) => {
set(true);
clearTimeout(reconnectTimeoutId);
listeners.get('open')?.forEach((listener) => listener(ev));
for (const event of listeners.keys()) {
if (socketEvents.includes(event as SocketEvent)) continue;
sendEvent('subscribe', event);
}
};
ws.onmessage = (message) => {
resetUnresponsiveCheck();
let data = message.data;

if (data instanceof ArrayBuffer) {
listeners.get('binary')?.forEach((listener) => listener(data));
return;
}
listeners.get('message')?.forEach((listener) => listener(data));
try {
data = JSON.parse(message.data.substring(2));
} catch (error) {
listeners.get('error')?.forEach((listener) => listener(error));
return;
}
listeners.get('json')?.forEach((listener) => listener(data));
const [event, payload] = data;
if (event) listeners.get(event)?.forEach((listener) => listener(payload));
};
ws.onerror = (ev) => disconnect('error', ev);
ws.onclose = (ev) => disconnect('close', ev);
}

function unsubscribe(event: string, listener?: (data: any) => void) {
let eventListeners = listeners.get(event);
if (!eventListeners) return;

if (!eventListeners.size) {
sendEvent('unsubscribe', event);
}
if (listener) {
eventListeners?.delete(listener);
} else {
listeners.delete(event);
}
}

function resetUnresponsiveCheck() {
clearTimeout(unresponsiveTimeoutId);
unresponsiveTimeoutId = setTimeout(() => disconnect('unresponsive'), 2000);
}

function send(msg: unknown) {
if (!ws || ws.readyState !== WebSocket.OPEN) return;
ws.send(JSON.stringify(msg));
}

function sendEvent(event: string, data: unknown) {
send({ event, data });
}

return {
subscribe,
send,
sendEvent,
init,
on: <T>(event: string, listener: (data: T) => void): (() => void) => {
let eventListeners = listeners.get(event);
if (!eventListeners) {
if (!socketEvents.includes(event as SocketEvent)) {
sendEvent('subscribe', event);
}
eventListeners = new Set();
listeners.set(event, eventListeners);
}
eventListeners.add(listener as (data: any) => void);

return () => {
unsubscribe(event, listener);
};
},
off: (event: string, listener?: (data: any) => void) => {
unsubscribe(event, listener);
}
};
}

export const socket = createWebSocket();
117 changes: 55 additions & 62 deletions interface/src/routes/+layout.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { user } from '$lib/stores/user';
import { telemetry } from '$lib/stores/telemetry';
import { analytics } from '$lib/stores/analytics';
import { socket } from '$lib/stores/socket';
import type { userProfile } from '$lib/stores/user';
import { page } from '$app/stores';
import { Modals, closeModal } from 'svelte-modals';
Expand All @@ -22,10 +23,45 @@
if ($user.bearer_token !== '') {
await validateUser($user);
}
connectToEventSource();
user.subscribe((value) => {
if (value.bearer_token !== '') {
const ws_token = $page.data.features.security ? '?access_token=' + $user.bearer_token : ''
socket.init(`ws://${window.location.host}/ws${ws_token}`)
}
});
addEventListeners()
});

onDestroy(() => {
removeEventListeners()
});

const addEventListeners = () => {
socket.on("analytics", handleAnalytics)
socket.on("open", handleOpen)
socket.on("close", handleClose)
socket.on("error", handleError)
socket.on("rssi", handleNetworkStatus)
socket.on("infoToast", handleInfoToast)
socket.on("successToast", handleSuccessToast)
socket.on("warningToast", handleWarningToast)
socket.on("errorToast", handleErrorToast)
socket.on("battery", handleBattery)
socket.on("download_ota", handleOAT)
}

onDestroy(() => disconnectEventSource());
const removeEventListeners = () => {
socket.off("analytics", handleAnalytics)
socket.off("open", handleOpen)
socket.off("close", handleClose)
socket.off("rssi", handleNetworkStatus)
socket.off("infoToast", handleInfoToast)
socket.off("successToast", handleSuccessToast)
socket.off("warningToast", handleWarningToast)
socket.off("errorToast", handleErrorToast)
socket.off("battery", handleBattery)
socket.off("download_ota", handleOAT)
}

async function validateUser(userdata: userProfile) {
try {
Expand All @@ -43,74 +79,31 @@
console.error('Error:', error);
}
}

const handleOpen = () => {
notifications.success('Connection to device established', 5000)
telemetry.setRSSI('found')
}

let menuOpen = false;

let eventSourceUrl = '/events';
let eventSource: EventSource;
let unresponsiveTimeoutId: number;

function connectToEventSource() {
eventSource = new EventSource(eventSourceUrl);

eventSource.addEventListener('open', () => {
notifications.success('Connection to device established', 5000);
telemetry.setRSSI('found'); // Update store and flag as server being available again
});

eventSource.addEventListener('rssi', (event) => {
telemetry.setRSSI(event.data);
resetUnresponsiveCheck();
});

eventSource.addEventListener('error', (event) => {
reconnectEventSource();
});

eventSource.addEventListener('infoToast', (event) => {
notifications.info(event.data, 5000);
});
const handleClose = () => notifications.error('Connection to device established', 5000);

eventSource.addEventListener('successToast', (event) => {
notifications.success(event.data, 5000);
});
const handleError = (data: any) => console.error(data)

eventSource.addEventListener('warningToast', (event) => {
notifications.warning(event.data, 5000);
});
const handleInfoToast = (data: string) => notifications.info(data, 5000)
const handleWarningToast = (data: string) => notifications.warning(data, 5000)
const handleErrorToast = (data: string) => notifications.error(data, 5000)
const handleSuccessToast = (data: string) => notifications.success(data, 5000)

eventSource.addEventListener('errorToast', (event) => {
notifications.error(event.data, 5000);
});

const handleAnalytics = (data: Analytics) => analytics.addData(data)

eventSource.addEventListener('battery', (event) => {
telemetry.setBattery(event.data);
});
const handleNetworkStatus = (data: string) => telemetry.setRSSI(data);

eventSource.addEventListener('download_ota', (event) => {
telemetry.setDownloadOTA(event.data);
});
eventSource.addEventListener('analytics', (event) => {
const data = JSON.parse(event.data) as Analytics;
analytics.addData(data);
});
}
const handleBattery = (data: string) => telemetry.setRSSI(data);

function disconnectEventSource() {
clearTimeout(unresponsiveTimeoutId);
eventSource?.close();
}
const handleOAT = (data: string) => telemetry.setDownloadOTA(data);

function reconnectEventSource() {
notifications.error('Connection to device lost', 5000);
disconnectEventSource();
connectToEventSource();
}

function resetUnresponsiveCheck() {
clearTimeout(unresponsiveTimeoutId);
unresponsiveTimeoutId = setTimeout(() => reconnectEventSource(), 2000);
}
let menuOpen = false;
</script>

<svelte:head>
Expand Down
33 changes: 7 additions & 26 deletions interface/src/routes/demo/Demo.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import Info from '~icons/tabler/info-circle';
import Save from '~icons/tabler/device-floppy';
import Reload from '~icons/tabler/reload';
import { socket } from '$lib/stores/socket';
import type { LightState } from '$lib/types/models';

let lightState: LightState = { led_on: false };
Expand All @@ -31,34 +32,14 @@
return;
}

const ws_token = $page.data.features.security ? '?access_token=' + $user.bearer_token : '';

const lightStateSocket = new WebSocket('ws://' + $page.url.host + '/ws/lightState' + ws_token);

lightStateSocket.onopen = (event) => {
lightStateSocket.send('Hello');
};

lightStateSocket.addEventListener('close', (event) => {
const closeCode = event.code;
const closeReason = event.reason;
console.log('WebSocket closed with code:', closeCode);
console.log('Close reason:', closeReason);
notifications.error('Websocket disconnected', 5000);
});

lightStateSocket.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type != 'id') {
lightState = message;
}
};

onDestroy(() => lightStateSocket.close());

onMount(() => {
socket.on<LightState>("led", (data) => {
lightState = data;
})
getLightstate();
});

onDestroy(() => socket.off("led"));

async function postLightstate() {
try {
Expand Down Expand Up @@ -125,7 +106,7 @@
class="toggle toggle-primary"
bind:checked={lightState.led_on}
on:change={() => {
lightStateSocket.send(JSON.stringify(lightState));
socket.sendEvent("led", lightState);
}}
/>
</label>
Expand Down
9 changes: 4 additions & 5 deletions interface/src/routes/system/status/SystemStatus.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import Stopwatch from '~icons/tabler/24-hours';
import SDK from '~icons/tabler/sdk';
import type { SystemInformation } from '$lib/types/models';
import { socket } from '$lib/stores/socket';

let systemInformation: SystemInformation;

Expand All @@ -44,13 +45,11 @@
return systemInformation;
}

const interval = setInterval(async () => {
getSystemStatus();
}, 5000);
onMount(() => socket.on("analytics", handleSystemData));

onMount(() => getSystemStatus());
onDestroy(() => socket.off("analytics", handleSystemData));

onDestroy(() => clearInterval(interval));
const handleSystemData = (data: Analytics) => systemInformation = { ...systemInformation, ...data };

async function postRestart() {
const response = await fetch('/rest/restart', {
Expand Down
16 changes: 8 additions & 8 deletions lib/framework/AnalyticsService.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
#include <WiFi.h>
#include <ArduinoJson.h>
#include <ESPFS.h>
#include <NotificationEvents.h>
#include <EventSocket.h>

#define MAX_ESP_ANALYTICS_SIZE 1024
#define ANALYTICS_INTERVAL 2000

class AnalyticsService
{
public:
AnalyticsService(NotificationEvents *notificationEvents) : _notificationEvents(notificationEvents){};
AnalyticsService(EventSocket *socket) : _socket(socket){};

void begin()
{
Expand All @@ -40,17 +40,17 @@ class AnalyticsService
};

protected:
NotificationEvents *_notificationEvents;
EventSocket *_socket;

static void _loopImpl(void *_this) { static_cast<AnalyticsService *>(_this)->_loop(); }
void _loop()
{
TickType_t xLastWakeTime;
xLastWakeTime = xTaskGetTickCount();
TickType_t xLastWakeTime = xTaskGetTickCount();
StaticJsonDocument<MAX_ESP_ANALYTICS_SIZE> doc;
char message[MAX_ESP_ANALYTICS_SIZE];
while (1)
{
StaticJsonDocument<MAX_ESP_ANALYTICS_SIZE> doc;
String message;
doc.clear();
doc["uptime"] = millis() / 1000;
doc["free_heap"] = ESP.getFreeHeap();
doc["total_heap"] = ESP.getHeapSize();
Expand All @@ -61,7 +61,7 @@ class AnalyticsService
doc["core_temp"] = temperatureRead();

serializeJson(doc, message);
_notificationEvents->send(message, "analytics", millis());
_socket->emit("analytics", message);

vTaskDelayUntil(&xLastWakeTime, ANALYTICS_INTERVAL / portTICK_PERIOD_MS);
}
Expand Down
Loading