diff --git a/interface/src/lib/stores/socket.ts b/interface/src/lib/stores/socket.ts new file mode 100644 index 00000000..693dcaa6 --- /dev/null +++ b/interface/src/lib/stores/socket.ts @@ -0,0 +1,115 @@ +import { writable } from 'svelte/store'; + +function createWebSocket() { + let listeners = new Map void>>(); + const { subscribe, set } = writable(false); + const socketEvents = ['open', 'close', 'error', 'message', 'unresponsive'] as const; + type SocketEvent = (typeof socketEvents)[number]; + let unresponsiveTimeoutId: number; + let reconnectTimeoutId: number; + let ws: WebSocket; + let socketUrl: string | URL; + + function init(url: string | URL) { + socketUrl = url; + connect(); + } + + function disconnect(reason: SocketEvent, event?: Event) { + ws.close(); + set(false); + clearTimeout(unresponsiveTimeoutId); + clearTimeout(reconnectTimeoutId); + listeners.get(reason)?.forEach((listener) => listener(event)); + reconnectTimeoutId = setTimeout(connect, 1000); + } + + function connect() { + ws = new WebSocket(socketUrl); + ws.onopen = (ev) => { + set(true); + clearTimeout(reconnectTimeoutId); + listeners.get('open')?.forEach((listener) => listener(ev)); + for (const event of listeners.keys()) { + if (socketEvents.includes(event as SocketEvent)) continue; + sendEvent('subscribe', event); + } + }; + ws.onmessage = (message) => { + resetUnresponsiveCheck(); + let data = message.data; + + if (data instanceof ArrayBuffer) { + listeners.get('binary')?.forEach((listener) => listener(data)); + return; + } + listeners.get('message')?.forEach((listener) => listener(data)); + try { + data = JSON.parse(message.data.substring(2)); + } catch (error) { + listeners.get('error')?.forEach((listener) => listener(error)); + return; + } + listeners.get('json')?.forEach((listener) => listener(data)); + const [event, payload] = data; + if (event) listeners.get(event)?.forEach((listener) => listener(payload)); + }; + ws.onerror = (ev) => disconnect('error', ev); + ws.onclose = (ev) => disconnect('close', ev); + } + + function unsubscribe(event: string, listener?: (data: any) => void) { + let eventListeners = listeners.get(event); + if (!eventListeners) return; + + if (!eventListeners.size) { + sendEvent('unsubscribe', event); + } + if (listener) { + eventListeners?.delete(listener); + } else { + listeners.delete(event); + } + } + + function resetUnresponsiveCheck() { + clearTimeout(unresponsiveTimeoutId); + unresponsiveTimeoutId = setTimeout(() => disconnect('unresponsive'), 2000); + } + + function send(msg: unknown) { + if (!ws || ws.readyState !== WebSocket.OPEN) return; + ws.send(JSON.stringify(msg)); + } + + function sendEvent(event: string, data: unknown) { + send({ event, data }); + } + + return { + subscribe, + send, + sendEvent, + init, + on: (event: string, listener: (data: T) => void): (() => void) => { + let eventListeners = listeners.get(event); + if (!eventListeners) { + if (!socketEvents.includes(event as SocketEvent)) { + sendEvent('subscribe', event); + } + eventListeners = new Set(); + listeners.set(event, eventListeners); + } + eventListeners.add(listener as (data: any) => void); + + return () => { + unsubscribe(event, listener); + }; + }, + off: (event: string, listener?: (data: any) => void) => { + unsubscribe(event, listener); + } + }; +} + +export const socket = createWebSocket(); diff --git a/interface/src/routes/+layout.svelte b/interface/src/routes/+layout.svelte index bf5778c8..13262443 100644 --- a/interface/src/routes/+layout.svelte +++ b/interface/src/routes/+layout.svelte @@ -4,6 +4,7 @@ import { user } from '$lib/stores/user'; import { telemetry } from '$lib/stores/telemetry'; import { analytics } from '$lib/stores/analytics'; + import { socket } from '$lib/stores/socket'; import type { userProfile } from '$lib/stores/user'; import { page } from '$app/stores'; import { Modals, closeModal } from 'svelte-modals'; @@ -22,10 +23,45 @@ if ($user.bearer_token !== '') { await validateUser($user); } - connectToEventSource(); + user.subscribe((value) => { + if (value.bearer_token !== '') { + const ws_token = $page.data.features.security ? '?access_token=' + $user.bearer_token : '' + socket.init(`ws://${window.location.host}/ws${ws_token}`) + } + }); + addEventListeners() + }); + + onDestroy(() => { + removeEventListeners() }); + + const addEventListeners = () => { + socket.on("analytics", handleAnalytics) + socket.on("open", handleOpen) + socket.on("close", handleClose) + socket.on("error", handleError) + socket.on("rssi", handleNetworkStatus) + socket.on("infoToast", handleInfoToast) + socket.on("successToast", handleSuccessToast) + socket.on("warningToast", handleWarningToast) + socket.on("errorToast", handleErrorToast) + socket.on("battery", handleBattery) + socket.on("download_ota", handleOAT) + } - onDestroy(() => disconnectEventSource()); + const removeEventListeners = () => { + socket.off("analytics", handleAnalytics) + socket.off("open", handleOpen) + socket.off("close", handleClose) + socket.off("rssi", handleNetworkStatus) + socket.off("infoToast", handleInfoToast) + socket.off("successToast", handleSuccessToast) + socket.off("warningToast", handleWarningToast) + socket.off("errorToast", handleErrorToast) + socket.off("battery", handleBattery) + socket.off("download_ota", handleOAT) + } async function validateUser(userdata: userProfile) { try { @@ -43,74 +79,31 @@ console.error('Error:', error); } } + + const handleOpen = () => { + notifications.success('Connection to device established', 5000) + telemetry.setRSSI('found') + } - let menuOpen = false; - - let eventSourceUrl = '/events'; - let eventSource: EventSource; - let unresponsiveTimeoutId: number; - - function connectToEventSource() { - eventSource = new EventSource(eventSourceUrl); - - eventSource.addEventListener('open', () => { - notifications.success('Connection to device established', 5000); - telemetry.setRSSI('found'); // Update store and flag as server being available again - }); - - eventSource.addEventListener('rssi', (event) => { - telemetry.setRSSI(event.data); - resetUnresponsiveCheck(); - }); - - eventSource.addEventListener('error', (event) => { - reconnectEventSource(); - }); - - eventSource.addEventListener('infoToast', (event) => { - notifications.info(event.data, 5000); - }); + const handleClose = () => notifications.error('Connection to device established', 5000); - eventSource.addEventListener('successToast', (event) => { - notifications.success(event.data, 5000); - }); + const handleError = (data: any) => console.error(data) - eventSource.addEventListener('warningToast', (event) => { - notifications.warning(event.data, 5000); - }); + const handleInfoToast = (data: string) => notifications.info(data, 5000) + const handleWarningToast = (data: string) => notifications.warning(data, 5000) + const handleErrorToast = (data: string) => notifications.error(data, 5000) + const handleSuccessToast = (data: string) => notifications.success(data, 5000) - eventSource.addEventListener('errorToast', (event) => { - notifications.error(event.data, 5000); - }); + + const handleAnalytics = (data: Analytics) => analytics.addData(data) - eventSource.addEventListener('battery', (event) => { - telemetry.setBattery(event.data); - }); + const handleNetworkStatus = (data: string) => telemetry.setRSSI(data); - eventSource.addEventListener('download_ota', (event) => { - telemetry.setDownloadOTA(event.data); - }); - eventSource.addEventListener('analytics', (event) => { - const data = JSON.parse(event.data) as Analytics; - analytics.addData(data); - }); - } + const handleBattery = (data: string) => telemetry.setRSSI(data); - function disconnectEventSource() { - clearTimeout(unresponsiveTimeoutId); - eventSource?.close(); - } + const handleOAT = (data: string) => telemetry.setDownloadOTA(data); - function reconnectEventSource() { - notifications.error('Connection to device lost', 5000); - disconnectEventSource(); - connectToEventSource(); - } - - function resetUnresponsiveCheck() { - clearTimeout(unresponsiveTimeoutId); - unresponsiveTimeoutId = setTimeout(() => reconnectEventSource(), 2000); - } + let menuOpen = false; diff --git a/interface/src/routes/demo/Demo.svelte b/interface/src/routes/demo/Demo.svelte index e1ba44db..70634f6e 100644 --- a/interface/src/routes/demo/Demo.svelte +++ b/interface/src/routes/demo/Demo.svelte @@ -8,6 +8,7 @@ import Info from '~icons/tabler/info-circle'; import Save from '~icons/tabler/device-floppy'; import Reload from '~icons/tabler/reload'; + import { socket } from '$lib/stores/socket'; import type { LightState } from '$lib/types/models'; let lightState: LightState = { led_on: false }; @@ -31,34 +32,14 @@ return; } - const ws_token = $page.data.features.security ? '?access_token=' + $user.bearer_token : ''; - - const lightStateSocket = new WebSocket('ws://' + $page.url.host + '/ws/lightState' + ws_token); - - lightStateSocket.onopen = (event) => { - lightStateSocket.send('Hello'); - }; - - lightStateSocket.addEventListener('close', (event) => { - const closeCode = event.code; - const closeReason = event.reason; - console.log('WebSocket closed with code:', closeCode); - console.log('Close reason:', closeReason); - notifications.error('Websocket disconnected', 5000); - }); - - lightStateSocket.onmessage = (event) => { - const message = JSON.parse(event.data); - if (message.type != 'id') { - lightState = message; - } - }; - - onDestroy(() => lightStateSocket.close()); - onMount(() => { + socket.on("led", (data) => { + lightState = data; + }) getLightstate(); }); + + onDestroy(() => socket.off("led")); async function postLightstate() { try { @@ -125,7 +106,7 @@ class="toggle toggle-primary" bind:checked={lightState.led_on} on:change={() => { - lightStateSocket.send(JSON.stringify(lightState)); + socket.sendEvent("led", lightState); }} /> diff --git a/interface/src/routes/system/status/SystemStatus.svelte b/interface/src/routes/system/status/SystemStatus.svelte index 39ea5315..71c0c585 100644 --- a/interface/src/routes/system/status/SystemStatus.svelte +++ b/interface/src/routes/system/status/SystemStatus.svelte @@ -25,6 +25,7 @@ import Stopwatch from '~icons/tabler/24-hours'; import SDK from '~icons/tabler/sdk'; import type { SystemInformation } from '$lib/types/models'; + import { socket } from '$lib/stores/socket'; let systemInformation: SystemInformation; @@ -44,13 +45,11 @@ return systemInformation; } - const interval = setInterval(async () => { - getSystemStatus(); - }, 5000); + onMount(() => socket.on("analytics", handleSystemData)); - onMount(() => getSystemStatus()); + onDestroy(() => socket.off("analytics", handleSystemData)); - onDestroy(() => clearInterval(interval)); + const handleSystemData = (data: Analytics) => systemInformation = { ...systemInformation, ...data }; async function postRestart() { const response = await fetch('/rest/restart', { diff --git a/lib/framework/AnalyticsService.h b/lib/framework/AnalyticsService.h index 915dcf26..a435065a 100644 --- a/lib/framework/AnalyticsService.h +++ b/lib/framework/AnalyticsService.h @@ -16,7 +16,7 @@ #include #include #include -#include +#include #define MAX_ESP_ANALYTICS_SIZE 1024 #define ANALYTICS_INTERVAL 2000 @@ -24,7 +24,7 @@ class AnalyticsService { public: - AnalyticsService(NotificationEvents *notificationEvents) : _notificationEvents(notificationEvents){}; + AnalyticsService(EventSocket *socket) : _socket(socket){}; void begin() { @@ -40,17 +40,17 @@ class AnalyticsService }; protected: - NotificationEvents *_notificationEvents; + EventSocket *_socket; static void _loopImpl(void *_this) { static_cast(_this)->_loop(); } void _loop() { - TickType_t xLastWakeTime; - xLastWakeTime = xTaskGetTickCount(); + TickType_t xLastWakeTime = xTaskGetTickCount(); + StaticJsonDocument doc; + char message[MAX_ESP_ANALYTICS_SIZE]; while (1) { - StaticJsonDocument doc; - String message; + doc.clear(); doc["uptime"] = millis() / 1000; doc["free_heap"] = ESP.getFreeHeap(); doc["total_heap"] = ESP.getHeapSize(); @@ -61,7 +61,7 @@ class AnalyticsService doc["core_temp"] = temperatureRead(); serializeJson(doc, message); - _notificationEvents->send(message, "analytics", millis()); + _socket->emit("analytics", message); vTaskDelayUntil(&xLastWakeTime, ANALYTICS_INTERVAL / portTICK_PERIOD_MS); } diff --git a/lib/framework/BatteryService.cpp b/lib/framework/BatteryService.cpp index d4003cac..5a8d19b6 100644 --- a/lib/framework/BatteryService.cpp +++ b/lib/framework/BatteryService.cpp @@ -13,16 +13,16 @@ #include -BatteryService::BatteryService(NotificationEvents *notificationEvents) : _notificationEvents(notificationEvents) +BatteryService::BatteryService(EventSocket *socket) : _socket(socket) { } void BatteryService::batteryEvent() { StaticJsonDocument<32> doc; - String message; + char message[32]; doc["soc"] = _lastSOC; doc["charging"] = _isCharging; serializeJson(doc, message); - _notificationEvents->send(message, "battery", millis()); + _socket->emit("battery", message); } diff --git a/lib/framework/BatteryService.h b/lib/framework/BatteryService.h index ff021a6c..3e4c7d3a 100644 --- a/lib/framework/BatteryService.h +++ b/lib/framework/BatteryService.h @@ -13,13 +13,13 @@ * the terms of the LGPL v3 license. See the LICENSE file for details. **/ +#include #include -#include class BatteryService { public: - BatteryService(NotificationEvents *notificationEvents); + BatteryService(EventSocket *socket); void updateSOC(float stateOfCharge) { @@ -35,7 +35,7 @@ class BatteryService private: void batteryEvent(); - NotificationEvents *_notificationEvents; + EventSocket *_socket; int _lastSOC = 100; boolean _isCharging = false; }; diff --git a/lib/framework/DownloadFirmwareService.cpp b/lib/framework/DownloadFirmwareService.cpp index 110522a7..2543735d 100644 --- a/lib/framework/DownloadFirmwareService.cpp +++ b/lib/framework/DownloadFirmwareService.cpp @@ -15,7 +15,7 @@ extern const uint8_t rootca_crt_bundle_start[] asm("_binary_src_certs_x509_crt_bundle_bin_start"); -static NotificationEvents *_notificationEvents = nullptr; +static EventSocket *_socket = nullptr; static int previousProgress = 0; StaticJsonDocument<128> doc; @@ -24,7 +24,7 @@ void update_started() String output; doc["status"] = "preparing"; serializeJson(doc, output); - _notificationEvents->send(output, "download_ota", millis()); + _socket->emit("download_ota", output.c_str()); } void update_progress(int currentBytes, int totalBytes) @@ -35,8 +35,7 @@ void update_progress(int currentBytes, int totalBytes) if (progress > previousProgress) { doc["progress"] = progress; - serializeJson(doc, output); - _notificationEvents->send(output, "download_ota", millis()); + _socket->emit("download_ota", output.c_str()); ESP_LOGV("Download OTA", "HTTP update process at %d of %d bytes... (%d %%)", currentBytes, totalBytes, progress); } previousProgress = progress; @@ -47,7 +46,7 @@ void update_finished() String output; doc["status"] = "finished"; serializeJson(doc, output); - _notificationEvents->send(output, "download_ota", millis()); + _socket->emit("download_ota", output.c_str()); // delay to allow the event to be sent out vTaskDelay(100 / portTICK_PERIOD_MS); @@ -77,7 +76,7 @@ void updateTask(void *param) doc["status"] = "error"; doc["error"] = httpUpdate.getLastErrorString().c_str(); serializeJson(doc, output); - _notificationEvents->send(output, "download_ota", millis()); + _socket->emit("download_ota", output.c_str()); ESP_LOGE("Download OTA", "HTTP Update failed with error (%d): %s", httpUpdate.getLastError(), httpUpdate.getLastErrorString().c_str()); #ifdef SERIAL_INFO @@ -89,7 +88,7 @@ void updateTask(void *param) doc["status"] = "error"; doc["error"] = "Update failed, has same firmware version"; serializeJson(doc, output); - _notificationEvents->send(output, "download_ota", millis()); + _socket->emit("download_ota", output.c_str()); ESP_LOGE("Download OTA", "HTTP Update failed, has same firmware version"); #ifdef SERIAL_INFO @@ -106,10 +105,9 @@ void updateTask(void *param) vTaskDelete(NULL); } -DownloadFirmwareService::DownloadFirmwareService(PsychicHttpServer *server, SecurityManager *securityManager, NotificationEvents *notificationEvents) : _server(server), - _securityManager(securityManager), - _notificationEvents(notificationEvents) - +DownloadFirmwareService::DownloadFirmwareService(PsychicHttpServer *server, SecurityManager *securityManager, + EventSocket *socket) + : _server(server), _securityManager(securityManager), _socket(socket) { } @@ -143,7 +141,8 @@ esp_err_t DownloadFirmwareService::downloadUpdate(PsychicRequest *request, JsonV String output; serializeJson(doc, output); - _notificationEvents->send(output, "download_ota", millis()); + + _socket->emit("download_ota", output.c_str()); if (xTaskCreatePinnedToCore( &updateTask, // Function that should be called diff --git a/lib/framework/DownloadFirmwareService.h b/lib/framework/DownloadFirmwareService.h index 652528a2..f06d799c 100644 --- a/lib/framework/DownloadFirmwareService.h +++ b/lib/framework/DownloadFirmwareService.h @@ -18,9 +18,9 @@ #include #include +#include #include #include -#include #include #include @@ -32,13 +32,13 @@ class DownloadFirmwareService { public: - DownloadFirmwareService(PsychicHttpServer *server, SecurityManager *securityManager, NotificationEvents *notificationEvents); + DownloadFirmwareService(PsychicHttpServer *server, SecurityManager *securityManager, EventSocket *socket); void begin(); private: SecurityManager *_securityManager; PsychicHttpServer *_server; - NotificationEvents *_notificationEvents; + EventSocket *_socket; esp_err_t downloadUpdate(PsychicRequest *request, JsonVariant &json); }; diff --git a/lib/framework/ESP32SvelteKit.cpp b/lib/framework/ESP32SvelteKit.cpp index 25783611..e11b40c0 100644 --- a/lib/framework/ESP32SvelteKit.cpp +++ b/lib/framework/ESP32SvelteKit.cpp @@ -6,7 +6,7 @@ * https://github.com/theelims/ESP32-sveltekit * * Copyright (C) 2018 - 2023 rjwats - * Copyright (C) 2023 theelims + * Copyright (C) 2024 theelims * * All Rights Reserved. This software may be modified and distributed under * the terms of the LGPL v3 license. See the LICENSE file for details. @@ -18,12 +18,12 @@ ESP32SvelteKit::ESP32SvelteKit(PsychicHttpServer *server, unsigned int numberEnd _numberEndpoints(numberEndpoints), _featureService(server), _securitySettingsService(server, &ESPFS), - _wifiSettingsService(server, &ESPFS, &_securitySettingsService, &_notificationEvents), + _wifiSettingsService(server, &ESPFS, &_securitySettingsService, &_socket), _wifiScanner(server, &_securitySettingsService), _wifiStatus(server, &_securitySettingsService), _apSettingsService(server, &ESPFS, &_securitySettingsService), _apStatus(server, &_securitySettingsService, &_apSettingsService), - _notificationEvents(server), + _socket(server, &_securitySettingsService), #if FT_ENABLED(FT_NTP) _ntpSettingsService(server, &ESPFS, &_securitySettingsService), _ntpStatus(server, &_securitySettingsService), @@ -32,7 +32,7 @@ ESP32SvelteKit::ESP32SvelteKit(PsychicHttpServer *server, unsigned int numberEnd _uploadFirmwareService(server, &_securitySettingsService), #endif #if FT_ENABLED(FT_DOWNLOAD_FIRMWARE) - _downloadFirmwareService(server, &_securitySettingsService, &_notificationEvents), + _downloadFirmwareService(server, &_securitySettingsService, &_socket), #endif #if FT_ENABLED(FT_MQTT) _mqttSettingsService(server, &ESPFS, &_securitySettingsService), @@ -45,10 +45,10 @@ ESP32SvelteKit::ESP32SvelteKit(PsychicHttpServer *server, unsigned int numberEnd _sleepService(server, &_securitySettingsService), #endif #if FT_ENABLED(FT_BATTERY) - _batteryService(&_notificationEvents), + _batteryService(&_socket), #endif #if FT_ENABLED(FT_ANALYTICS) - _analyticsService(&_notificationEvents), + _analyticsService(&_socket), #endif _restartService(server, &_securitySettingsService), _factoryResetService(server, &ESPFS, &_securitySettingsService), @@ -139,7 +139,7 @@ void ESP32SvelteKit::begin() // Start the services _apStatus.begin(); - _notificationEvents.begin(); + _socket.begin(); _apSettingsService.begin(); _factoryResetService.begin(); _featureService.begin(); diff --git a/lib/framework/ESP32SvelteKit.h b/lib/framework/ESP32SvelteKit.h index 339d30b7..6f41932f 100644 --- a/lib/framework/ESP32SvelteKit.h +++ b/lib/framework/ESP32SvelteKit.h @@ -27,9 +27,9 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -85,9 +85,9 @@ class ESP32SvelteKit return &_securitySettingsService; } - NotificationEvents *getNotificationEvents() + EventSocket *getSocket() { - return &_notificationEvents; + return &_socket; } #if FT_ENABLED(FT_SECURITY) @@ -170,7 +170,7 @@ class ESP32SvelteKit WiFiStatus _wifiStatus; APSettingsService _apSettingsService; APStatus _apStatus; - NotificationEvents _notificationEvents; + EventSocket _socket; #if FT_ENABLED(FT_NTP) NTPSettingsService _ntpSettingsService; NTPStatus _ntpStatus; diff --git a/lib/framework/EventSocket.cpp b/lib/framework/EventSocket.cpp new file mode 100644 index 00000000..d2f7b600 --- /dev/null +++ b/lib/framework/EventSocket.cpp @@ -0,0 +1,155 @@ +#include + +SemaphoreHandle_t clientSubscriptionsMutex = xSemaphoreCreateMutex(); + +EventSocket::EventSocket(PsychicHttpServer *server, SecurityManager *securityManager, + AuthenticationPredicate authenticationPredicate) + : _server(server), _securityManager(securityManager), _authenticationPredicate(authenticationPredicate), + _bufferSize(1024) +{ +} + +void EventSocket::begin() +{ + _socket.setFilter(_securityManager->filterRequest(_authenticationPredicate)); + _socket.onOpen((std::bind(&EventSocket::onWSOpen, this, std::placeholders::_1))); + _socket.onClose(std::bind(&EventSocket::onWSClose, this, std::placeholders::_1)); + _socket.onFrame(std::bind(&EventSocket::onFrame, this, std::placeholders::_1, std::placeholders::_2)); + _server->on(EVENT_SERVICE_PATH, &_eventSource); + _server->on(WS_EVENT_SERVICE_PATH, &_socket); + + ESP_LOGV("Socket", "Registered socket Source endpoint: %s", EVENT_SERVICE_PATH); +} + +void EventSocket::onWSOpen(PsychicWebSocketClient *client) +{ + ESP_LOGI("WebSocketServer", "ws[%s][%u] connect", client->remoteIP().toString(), client->socket()); +} + +void EventSocket::onWSClose(PsychicWebSocketClient *client) +{ + for (auto &event_subscriptions : client_subscriptions) + { + event_subscriptions.second.remove(client->socket()); + } + ESP_LOGI("WebSocketServer", "ws[%s][%u] disconnect", client->remoteIP().toString(), client->socket()); +} + +esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame) +{ + ESP_LOGV("WebSocketServer", "ws[%s][%u] opcode[%d]", request->client()->remoteIP().toString(), + request->client()->socket(), frame->type); + + if (frame->type == HTTPD_WS_TYPE_TEXT) + { + ESP_LOGV("WebSocketServer", "ws[%s][%u] request: %s", request->client()->remoteIP().toString(), + request->client()->socket(), (char *)frame->payload); + + DynamicJsonDocument doc = DynamicJsonDocument(_bufferSize); + DeserializationError error = deserializeJson(doc, (char *)frame->payload, frame->len); + + if (!error && doc.is()) + { + String event = doc["event"]; + if (event == "subscribe") + { + client_subscriptions[doc["data"]].push_back(request->client()->socket()); + } + else if (event == "unsubscribe") + { + client_subscriptions[doc["data"]].remove(request->client()->socket()); + } + else + { + JsonObject jsonObject = doc["data"].as(); + handleCallbacks(event, jsonObject, request->client()->socket()); + } + return ESP_OK; + } + } + return ESP_OK; +} + +void EventSocket::emit(String event, String payload) +{ + emit(event.c_str(), payload.c_str(), ""); +} + +void EventSocket::emit(const char *event, const char *payload) +{ + emit(event, payload, ""); +} + +void EventSocket::emit(const char *event, const char *payload, const char *originId) +{ + int originSubscriptionId = originId[0] ? atoi(originId) : -1; + if (_eventSource.count() > 0) + { + _eventSource.send(payload, event, millis()); + } + xSemaphoreTake(clientSubscriptionsMutex, portMAX_DELAY); + auto &subscriptions = client_subscriptions[event]; + if (subscriptions.empty()) + { + xSemaphoreGive(clientSubscriptionsMutex); + return; + } + String msg = "42[\"" + String(event) + "\"," + String(payload) + "]"; + + for (int subscription : client_subscriptions[event]) + { + if (subscription == originSubscriptionId) + continue; + auto *client = _socket.getClient(subscription); + if (!client) + { + subscriptions.remove(subscription); + continue; + } + ESP_LOGV("WebSocketServer", "Emitting event: %s to %s, Message: %s", event, client->remoteIP().toString(), + msg.c_str()); + client->sendMessage(msg.c_str()); + } + xSemaphoreGive(clientSubscriptionsMutex); +} + +void EventSocket::pushNotification(String message, pushEvent event) +{ + String eventType; + switch (event) + { + case (PUSHERROR): + eventType = "errorToast"; + break; + case (PUSHWARNING): + eventType = "warningToast"; + break; + case (PUSHINFO): + eventType = "infoToast"; + break; + case (PUSHSUCCESS): + eventType = "successToast"; + break; + default: + return; + } + emit(eventType.c_str(), message.c_str()); +} + +void EventSocket::handleCallbacks(String event, JsonObject &jsonObject, int originId) +{ + for (auto &callback : event_callbacks[event]) + { + callback(jsonObject, originId); + } +} + +void EventSocket::on(String event, EventCallback callback) +{ + event_callbacks[event].push_back(callback); +} + +void EventSocket::broadcast(String message) +{ + _socket.sendAll(message.c_str()); +} diff --git a/lib/framework/EventSocket.h b/lib/framework/EventSocket.h new file mode 100644 index 00000000..391ec894 --- /dev/null +++ b/lib/framework/EventSocket.h @@ -0,0 +1,59 @@ +#ifndef Socket_h +#define Socket_h + +#include +#include +#include +#include + +#define EVENT_SERVICE_PATH "/events" +#define WS_EVENT_SERVICE_PATH "/ws" + +typedef std::function EventCallback; + +enum pushEvent +{ + PUSHERROR, + PUSHWARNING, + PUSHINFO, + PUSHSUCCESS +}; + +class EventSocket +{ + public: + EventSocket(PsychicHttpServer *server, SecurityManager *_securityManager, + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_AUTHENTICATED); + + void begin(); + + void on(String event, EventCallback callback); + + void emit(String event, String payload); + + void emit(const char *event, const char *payload); + + void emit(const char *event, const char *payload, const char *originId); + + void pushNotification(String message, pushEvent event); + + void broadcast(String message); + + private: + PsychicHttpServer *_server; + PsychicWebSocketHandler _socket; + SecurityManager *_securityManager; + AuthenticationPredicate _authenticationPredicate; + PsychicEventSource _eventSource; + + std::map> client_subscriptions; + std::map> event_callbacks; + void handleCallbacks(String event, JsonObject &jsonObject, int originId); + + size_t _bufferSize; + void onWSOpen(PsychicWebSocketClient *client); + void onWSClose(PsychicWebSocketClient *client); + esp_err_t onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame); +}; + +#endif diff --git a/lib/framework/NotificationEvents.cpp b/lib/framework/NotificationEvents.cpp deleted file mode 100644 index acdc6566..00000000 --- a/lib/framework/NotificationEvents.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -/** - * ESP32 SvelteKit - * - * A simple, secure and extensible framework for IoT projects for ESP32 platforms - * with responsive Sveltekit front-end built with TailwindCSS and DaisyUI. - * https://github.com/theelims/ESP32-sveltekit - * - * Copyright (C) 2023 theelims - * - * All Rights Reserved. This software may be modified and distributed under - * the terms of the LGPL v3 license. See the LICENSE file for details. - **/ - -#include - -NotificationEvents::NotificationEvents(PsychicHttpServer *server) : _server(server) -{ -} - -void NotificationEvents::begin() -{ - _eventSource.onOpen([&](PsychicEventSourceClient *client) { // client->send("hello", NULL, millis(), 1000); -#ifdef SERIAL_INFO - Serial.printf("New client connected to Event Source: #%u connected from %s\n", client->socket(), client->remoteIP().toString()); -#endif - }); - _eventSource.onClose([&](PsychicEventSourceClient *client) { // client->send("hello", NULL, millis(), 1000); -#ifdef SERIAL_INFO - Serial.printf("Client closed connection to Event Source: #%u connected from %s\n", client->socket(), client->remoteIP().toString()); -#endif - }); - _server->on(EVENT_NOTIFICATION_SERVICE_PATH, &_eventSource); - - ESP_LOGV("NotificationEvents", "Registered Event Source endpoint: %s", EVENT_NOTIFICATION_SERVICE_PATH); -} - -void NotificationEvents::pushNotification(String message, pushEvent event, int id) -{ - String eventType; - switch (event) - { - case (PUSHERROR): - eventType = "errorToast"; - break; - case (PUSHWARNING): - eventType = "warningToast"; - break; - case (PUSHINFO): - eventType = "infoToast"; - break; - case (PUSHSUCCESS): - eventType = "successToast"; - break; - default: - return; - } - _eventSource.send(message.c_str(), eventType.c_str(), id); -} - -void NotificationEvents::send(String message, String event, int id) -{ - _eventSource.send(message.c_str(), event.c_str(), id); -} diff --git a/lib/framework/NotificationEvents.h b/lib/framework/NotificationEvents.h deleted file mode 100644 index d33193e9..00000000 --- a/lib/framework/NotificationEvents.h +++ /dev/null @@ -1,45 +0,0 @@ -#pragma once - -/** - * ESP32 SvelteKit - * - * A simple, secure and extensible framework for IoT projects for ESP32 platforms - * with responsive Sveltekit front-end built with TailwindCSS and DaisyUI. - * https://github.com/theelims/ESP32-sveltekit - * - * Copyright (C) 2023 theelims - * - * All Rights Reserved. This software may be modified and distributed under - * the terms of the LGPL v3 license. See the LICENSE file for details. - **/ - -#include - -#include -#include - -#define EVENT_NOTIFICATION_SERVICE_PATH "/events" - -enum pushEvent -{ - PUSHERROR, - PUSHWARNING, - PUSHINFO, - PUSHSUCCESS -}; - -class NotificationEvents -{ -protected: - PsychicHttpServer *_server; - PsychicEventSource _eventSource; - -public: - NotificationEvents(PsychicHttpServer *server); - - void begin(); - - void pushNotification(String message, pushEvent event, int id = 0); - - void send(String message, String event, int id = 0); -}; diff --git a/lib/framework/WebSocketServer.h b/lib/framework/WebSocketServer.h index dc7e8e7a..3e5e2254 100644 --- a/lib/framework/WebSocketServer.h +++ b/lib/framework/WebSocketServer.h @@ -15,148 +15,48 @@ * the terms of the LGPL v3 license. See the LICENSE file for details. **/ -#include +#include #include #include - -#define WEB_SOCKET_CLIENT_ID_MSG_SIZE 128 - -#define WEB_SOCKET_ORIGIN "wsserver" -#define WEB_SOCKET_ORIGIN_CLIENT_ID_PREFIX "wsserver:" +#include template class WebSocketServer { public: - WebSocketServer(JsonStateReader stateReader, - JsonStateUpdater stateUpdater, - StatefulService *statefulService, - PsychicHttpServer *server, - const char *webSocketPath, - SecurityManager *securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, - size_t bufferSize = DEFAULT_BUFFER_SIZE) : _stateReader(stateReader), - _stateUpdater(stateUpdater), - _statefulService(statefulService), - _server(server), - _bufferSize(bufferSize), - _webSocketPath(webSocketPath), - _authenticationPredicate(authenticationPredicate), - _securityManager(securityManager) - { - _statefulService->addUpdateHandler( - [&](const String &originId) - { transmitData(nullptr, originId); }, - false); - } - - void begin() - { - _webSocket.setFilter(_securityManager->filterRequest(_authenticationPredicate)); - _webSocket.onOpen(std::bind(&WebSocketServer::onWSOpen, - this, - std::placeholders::_1)); - _webSocket.onClose(std::bind(&WebSocketServer::onWSClose, - this, - std::placeholders::_1)); - _webSocket.onFrame(std::bind(&WebSocketServer::onWSFrame, - this, - std::placeholders::_1, - std::placeholders::_2)); - _server->on(_webSocketPath.c_str(), &_webSocket); - - ESP_LOGV("WebSocketServer", "Registered WebSocket handler: %s", _webSocketPath.c_str()); - } - - void onWSOpen(PsychicWebSocketClient *client) - { - - // when a client connects, we transmit it's id and the current payload - transmitId(client); - transmitData(client, WEB_SOCKET_ORIGIN); - ESP_LOGI("WebSocketServer", "ws[%s][%u] connect", client->remoteIP().toString(), client->socket()); - } - - void onWSClose(PsychicWebSocketClient *client) - { - ESP_LOGI("WebSocketServer", "ws[%s][%u] disconnect", client->remoteIP().toString(), client->socket()); - } - - esp_err_t onWSFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame) - { - ESP_LOGV("WebSocketServer", "ws[%s][%u] opcode[%d]", request->client()->remoteIP().toString(), request->client()->socket(), frame->type); - - if (frame->type == HTTPD_WS_TYPE_TEXT) - { - ESP_LOGV("WebSocketServer", "ws[%s][%u] request: %s", request->client()->remoteIP().toString(), request->client()->socket(), (char *)frame->payload); - - DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize); - DeserializationError error = deserializeJson(jsonDocument, (char *)frame->payload, frame->len); - - if (!error && jsonDocument.is()) - { - JsonObject jsonObject = jsonDocument.as(); - _statefulService->update(jsonObject, _stateUpdater, clientId(request->client())); - return ESP_OK; - } - } - return ESP_OK; - } - - String clientId(PsychicWebSocketClient *client) - { - return WEB_SOCKET_ORIGIN_CLIENT_ID_PREFIX + String(client->socket()); - } + WebSocketServer(JsonStateReader stateReader, JsonStateUpdater stateUpdater, StatefulService *statefulService, + EventSocket *socket, const char *event, size_t bufferSize = DEFAULT_BUFFER_SIZE) + : _stateReader(stateReader), _stateUpdater(stateUpdater), _statefulService(statefulService), _socket(socket), + _bufferSize(bufferSize), _event(event) + { + _socket->on(event, std::bind(&WebSocketServer::updateState, this, std::placeholders::_1, std::placeholders::_2)); + _statefulService->addUpdateHandler([&](const String &originId) { syncState(originId); }, false); + } private: JsonStateReader _stateReader; JsonStateUpdater _stateUpdater; StatefulService *_statefulService; - AuthenticationPredicate _authenticationPredicate; - SecurityManager *_securityManager; - PsychicHttpServer *_server; - PsychicWebSocketHandler _webSocket; - String _webSocketPath; + EventSocket *_socket; + const char * _event; size_t _bufferSize; - void transmitId(PsychicWebSocketClient *client) + void updateState(JsonObject &root, int originId) { - DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_CLIENT_ID_MSG_SIZE); - JsonObject root = jsonDocument.to(); - root["type"] = "id"; - root["id"] = clientId(client); - - // serialize the json to a string - String buffer; - serializeJson(jsonDocument, buffer); - client->sendMessage(buffer.c_str()); + if (_statefulService->updateWithoutPropagation(root, _stateUpdater) == StateUpdateResult::CHANGED) + { + _statefulService->callUpdateHandlers(String(originId)); + } } - /** - * Broadcasts the payload to the destination, if provided. Otherwise broadcasts to all clients except the origin, if - * specified. - * - * Original implementation sent clients their own IDs so they could ignore updates they initiated. This approach - * simplifies the client and the server implementation but may not be sufficient for all use-cases. - */ - void transmitData(PsychicWebSocketClient *client, const String &originId) + void syncState(const String &originId) { - DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize); + DynamicJsonDocument jsonDocument{_bufferSize}; JsonObject root = jsonDocument.to(); - String buffer; - + String output; _statefulService->read(root, _stateReader); - - // serialize the json to a string - serializeJson(jsonDocument, buffer); - if (client) - { - client->sendMessage(buffer.c_str()); - } - else - { - _webSocket.sendAll(buffer.c_str()); - } + serializeJson(root, output); + _socket->emit(_event, output.c_str(), originId.c_str()); } }; diff --git a/lib/framework/WiFiSettingsService.cpp b/lib/framework/WiFiSettingsService.cpp index 97bb5df9..110391ba 100644 --- a/lib/framework/WiFiSettingsService.cpp +++ b/lib/framework/WiFiSettingsService.cpp @@ -14,23 +14,13 @@ #include -WiFiSettingsService::WiFiSettingsService(PsychicHttpServer *server, FS *fs, SecurityManager *securityManager, NotificationEvents *notificationEvents) : _server(server), - _securityManager(securityManager), - _httpEndpoint(WiFiSettings::read, - WiFiSettings::update, - this, - server, - WIFI_SETTINGS_SERVICE_PATH, - securityManager, - AuthenticationPredicates::IS_ADMIN, - WIFI_SETTINGS_BUFFER_SIZE), - _fsPersistence(WiFiSettings::read, - WiFiSettings::update, - this, - fs, - WIFI_SETTINGS_FILE), - _lastConnectionAttempt(0), - _notificationEvents(notificationEvents) +WiFiSettingsService::WiFiSettingsService(PsychicHttpServer *server, FS *fs, SecurityManager *securityManager, + EventSocket *socket) + : _server(server), _securityManager(securityManager), + _httpEndpoint(WiFiSettings::read, WiFiSettings::update, this, server, WIFI_SETTINGS_SERVICE_PATH, securityManager, + AuthenticationPredicates::IS_ADMIN, WIFI_SETTINGS_BUFFER_SIZE), + _fsPersistence(WiFiSettings::read, WiFiSettings::update, this, fs, WIFI_SETTINGS_FILE), _lastConnectionAttempt(0), + _socket(socket) { addUpdateHandler([&](const String &originId) { reconfigureWiFiConnection(); }, @@ -220,16 +210,9 @@ void WiFiSettingsService::configureNetwork(wifi_settings_t &network) void WiFiSettingsService::updateRSSI() { - // if WiFi is disconnected send disconnect - if (WiFi.isConnected()) - { - String rssi = String(WiFi.RSSI()); - _notificationEvents->send(rssi, "rssi", millis()); - } - else - { - _notificationEvents->send("disconnected", "rssi", millis()); - } + char buffer[16]; + snprintf(buffer, sizeof(buffer), WiFi.isConnected() ? "%d" : "disconnected", WiFi.RSSI()); + _socket->emit("rssi", buffer); } void WiFiSettingsService::onStationModeDisconnected(WiFiEvent_t event, WiFiEventInfo_t info) diff --git a/lib/framework/WiFiSettingsService.h b/lib/framework/WiFiSettingsService.h index c946fb09..62dc018b 100644 --- a/lib/framework/WiFiSettingsService.h +++ b/lib/framework/WiFiSettingsService.h @@ -19,10 +19,10 @@ #include #include #include +#include #include #include #include -#include #include #include #include @@ -201,7 +201,7 @@ class WiFiSettings class WiFiSettingsService : public StatefulService { public: - WiFiSettingsService(PsychicHttpServer *server, FS *fs, SecurityManager *securityManager, NotificationEvents *notificationEvents); + WiFiSettingsService(PsychicHttpServer *server, FS *fs, SecurityManager *securityManager, EventSocket *socket); void initWiFi(); void begin(); @@ -213,7 +213,7 @@ class WiFiSettingsService : public StatefulService SecurityManager *_securityManager; HttpEndpoint _httpEndpoint; FSPersistence _fsPersistence; - NotificationEvents *_notificationEvents; + EventSocket *_socket; unsigned long _lastConnectionAttempt; unsigned long _lastRssiUpdate; diff --git a/platformio.ini b/platformio.ini index 66a61d07..a22363c5 100644 --- a/platformio.ini +++ b/platformio.ini @@ -26,7 +26,7 @@ build_flags = -D APP_NAME=\"ESP32-Sveltekit\" ; Must only contain characters from [a-zA-Z0-9-_] as this is converted into a filename -D APP_VERSION=\"0.3.0\" ; semver compatible version string ; Uncomment to receive log messages from the ESP Arduino Core - -D CORE_DEBUG_LEVEL=5 + -D CORE_DEBUG_LEVEL=4 ; Move all networking stuff to the protocol core 0 and leave business logic on application core 1 -D ESP32SVELTEKIT_RUNNING_CORE=0 ; Uncomment EMBED_WWW to embed the WWW data in the firmware binary diff --git a/src/LightStateService.cpp b/src/LightStateService.cpp index a11b7c26..d2ba5854 100644 --- a/src/LightStateService.cpp +++ b/src/LightStateService.cpp @@ -14,30 +14,14 @@ #include -LightStateService::LightStateService(PsychicHttpServer *server, - SecurityManager *securityManager, - PsychicMqttClient *mqttClient, - LightMqttSettingsService *lightMqttSettingsService) : _httpEndpoint(LightState::read, - LightState::update, - this, - server, - LIGHT_SETTINGS_ENDPOINT_PATH, - securityManager, - AuthenticationPredicates::IS_AUTHENTICATED), - _mqttPubSub(LightState::homeAssistRead, LightState::homeAssistUpdate, this, mqttClient), - _webSocketServer(LightState::read, - LightState::update, - this, - server, - LIGHT_SETTINGS_SOCKET_PATH, - securityManager, - AuthenticationPredicates::IS_AUTHENTICATED), - _mqttClient(mqttClient), - _lightMqttSettingsService(lightMqttSettingsService) -/* _webSocketClient(LightState::read, - LightState::update, - this, - LIGHT_SETTINGS_SOCKET_PATH)*/ +LightStateService::LightStateService(PsychicHttpServer *server, EventSocket *socket, SecurityManager *securityManager, + PsychicMqttClient *mqttClient, LightMqttSettingsService *lightMqttSettingsService) + : _httpEndpoint(LightState::read, LightState::update, this, server, LIGHT_SETTINGS_ENDPOINT_PATH, securityManager, + AuthenticationPredicates::IS_AUTHENTICATED), + _webSocketServer(LightState::read, LightState::update, this, socket, LIGHT_SETTINGS_EVENT, + LIGHT_SETTINGS_MAX_BUFFER_SIZE), + _mqttPubSub(LightState::homeAssistRead, LightState::homeAssistUpdate, this, mqttClient), _mqttClient(mqttClient), + _lightMqttSettingsService(lightMqttSettingsService), _socket(socket) { // configure led to be output pinMode(LED_BUILTIN, OUTPUT); @@ -59,7 +43,6 @@ LightStateService::LightStateService(PsychicHttpServer *server, void LightStateService::begin() { _httpEndpoint.begin(); - _webSocketServer.begin(); _state.ledOn = DEFAULT_LED_STATE; onConfigUpdated(); } @@ -99,3 +82,4 @@ void LightStateService::registerConfig() _mqttPubSub.configureTopics(pubTopic, subTopic); } + diff --git a/src/LightStateService.h b/src/LightStateService.h index 4c7e2979..caa49f38 100644 --- a/src/LightStateService.h +++ b/src/LightStateService.h @@ -17,17 +17,18 @@ #include +#include #include #include #include -// #include #define DEFAULT_LED_STATE false #define OFF_STATE "OFF" #define ON_STATE "ON" #define LIGHT_SETTINGS_ENDPOINT_PATH "/rest/lightState" -#define LIGHT_SETTINGS_SOCKET_PATH "/ws/lightState" +#define LIGHT_SETTINGS_EVENT "led" +#define LIGHT_SETTINGS_MAX_BUFFER_SIZE 256 class LightState { @@ -81,19 +82,17 @@ class LightState class LightStateService : public StatefulService { public: - LightStateService(PsychicHttpServer *server, - SecurityManager *securityManager, - PsychicMqttClient *mqttClient, - LightMqttSettingsService *lightMqttSettingsService); - void begin(); + LightStateService(PsychicHttpServer *server, EventSocket *socket, SecurityManager *securityManager, + PsychicMqttClient *mqttClient, LightMqttSettingsService *lightMqttSettingsService); + void begin(); private: HttpEndpoint _httpEndpoint; - MqttPubSub _mqttPubSub; WebSocketServer _webSocketServer; - // WebSocketClient _webSocketClient; + MqttPubSub _mqttPubSub; PsychicMqttClient *_mqttClient; LightMqttSettingsService *_lightMqttSettingsService; + EventSocket *_socket; void registerConfig(); void onConfigUpdated(); diff --git a/src/main.cpp b/src/main.cpp index 326f3634..64b67bcd 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -26,10 +26,13 @@ ESP32SvelteKit esp32sveltekit(&server, 120); LightMqttSettingsService lightMqttSettingsService = LightMqttSettingsService(&server, esp32sveltekit.getFS(), esp32sveltekit.getSecurityManager()); -LightStateService lightStateService = LightStateService(&server, - esp32sveltekit.getSecurityManager(), - esp32sveltekit.getMqttClient(), - &lightMqttSettingsService); +LightStateService lightStateService = LightStateService( + &server, + esp32sveltekit.getSocket(), + esp32sveltekit.getSecurityManager(), + esp32sveltekit.getMqttClient(), + &lightMqttSettingsService +); void setup() {