From ff89504d395c901001dfff13ddec6ca12732e843 Mon Sep 17 00:00:00 2001 From: Max Prokhorov Date: Wed, 27 May 2020 00:18:51 +0300 Subject: [PATCH] WS refactoring (#2261) * ws: clean-up wsPost implementation - explain ourselves - re-do consts, fix locality - fix shadowing in ctors - more consistent naming - timeout for messages * save 16 bytes temporary * reference reference --- code/espurna/mqtt.cpp | 4 +- code/espurna/ws.cpp | 203 ++++++++++++++++++++++--------------- code/espurna/ws.h | 87 ++++++++++++---- code/espurna/ws_internal.h | 160 +++++++++++++++++------------ 4 files changed, 285 insertions(+), 169 deletions(-) diff --git a/code/espurna/mqtt.cpp b/code/espurna/mqtt.cpp index fedcdbb0..2b5b0838 100644 --- a/code/espurna/mqtt.cpp +++ b/code/espurna/mqtt.cpp @@ -1148,7 +1148,9 @@ void mqttSetup() { .onKeyCheck(_mqttWebSocketOnKeyCheck); mqttRegister([](unsigned int type, const char*, const char*) { - if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) wsPost(_mqttWebSocketOnData); + if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) { + wsPost(_mqttWebSocketOnData); + } }); #endif diff --git a/code/espurna/ws.cpp b/code/espurna/ws.cpp index e451b67a..0f8d0b77 100644 --- a/code/espurna/ws.cpp +++ b/code/espurna/ws.cpp @@ -28,6 +28,64 @@ uint32_t _ws_last_update = 0; // WS callbacks // ----------------------------------------------------------------------------- +std::queue _ws_queue; +ws_callbacks_t _ws_callbacks; + +void wsPost(uint32_t client_id, ws_on_send_callback_f&& cb) { + _ws_queue.emplace(client_id, std::move(cb)); +} + +void wsPost(ws_on_send_callback_f&& cb) { + wsPost(0, std::move(cb)); +} + +void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb) { + _ws_queue.emplace(client_id, cb); +} + +void wsPost(const ws_on_send_callback_f& cb) { + wsPost(0, cb); +} + +template +void _wsPostCallbacks(uint32_t client_id, T&& cbs, WsPostponedCallbacks::Mode mode) { + _ws_queue.emplace(client_id, std::forward(cbs), mode); +} + +void wsPostAll(uint32_t client_id, ws_on_send_callback_list_t&& cbs) { + _wsPostCallbacks(client_id, std::move(cbs), WsPostponedCallbacks::Mode::All); +} + +void wsPostAll(ws_on_send_callback_list_t&& cbs) { + wsPostAll(0, std::move(cbs)); +} + +void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs) { + _wsPostCallbacks(client_id, cbs, WsPostponedCallbacks::Mode::All); +} + +void wsPostAll(const ws_on_send_callback_list_t& cbs) { + wsPostAll(0, cbs); +} + +void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs) { + _wsPostCallbacks(client_id, std::move(cbs), WsPostponedCallbacks::Mode::Sequence); +} + +void wsPostSequence(ws_on_send_callback_list_t&& cbs) { + wsPostSequence(0, std::move(cbs)); +} + +void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs) { + _wsPostCallbacks(client_id, cbs, WsPostponedCallbacks::Mode::Sequence); +} + +void wsPostSequence(const ws_on_send_callback_list_t& cbs) { + wsPostSequence(0, cbs); +} + +// ----------------------------------------------------------------------------- + ws_callbacks_t& ws_callbacks_t::onVisible(ws_on_send_callback_f cb) { on_visible.push_back(cb); return *this; @@ -53,14 +111,11 @@ ws_callbacks_t& ws_callbacks_t::onKeyCheck(ws_on_keycheck_callback_f cb) { return *this; } -static ws_callbacks_t _ws_callbacks; -static std::queue _ws_client_data; - // ----------------------------------------------------------------------------- // WS authentication // ----------------------------------------------------------------------------- -ws_ticket_t _ws_tickets[WS_BUFFER_SIZE]; +WsTicket _ws_tickets[WS_BUFFER_SIZE]; void _onAuth(AsyncWebServerRequest *request) { @@ -109,28 +164,30 @@ bool _wsAuth(AsyncWebSocketClient * client) { #if DEBUG_WEB_SUPPORT -ws_debug_t _ws_debug(WS_DEBUG_MSG_BUFFER); +constexpr size_t WsDebugMessagesMax = 8; + +WsDebug _ws_debug(WsDebugMessagesMax); -void ws_debug_t::send(const bool connected) { - if (!connected && flush) { +void WsDebug::send(bool connected) { + if (!connected && _flush) { clear(); return; } - if (!flush) return; + if (!_flush) return; // ref: http://arduinojson.org/v5/assistant/ // {"weblog": {"msg":[...],"pre":[...]}} - DynamicJsonBuffer jsonBuffer(2*JSON_ARRAY_SIZE(messages.size()) + JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2)); + DynamicJsonBuffer jsonBuffer(2*JSON_ARRAY_SIZE(_messages.size()) + JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2)); JsonObject& root = jsonBuffer.createObject(); JsonObject& weblog = root.createNestedObject("weblog"); - JsonArray& msg = weblog.createNestedArray("msg"); - JsonArray& pre = weblog.createNestedArray("pre"); + JsonArray& msg_array = weblog.createNestedArray("msg"); + JsonArray& pre_array = weblog.createNestedArray("pre"); - for (auto& message : messages) { - pre.add(message.first.c_str()); - msg.add(message.second.c_str()); + for (auto& msg : _messages) { + pre_array.add(msg.first.c_str()); + msg_array.add(msg.second.c_str()); } wsSend(root); @@ -421,31 +478,6 @@ void _wsOnConnected(JsonObject& root) { root["hbInterval"] = getSetting("hbInterval", HEARTBEAT_INTERVAL); } -void wsSend(JsonObject& root) { - // TODO: avoid serializing twice? - size_t len = root.measureLength(); - AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len); - - if (buffer) { - root.printTo(reinterpret_cast(buffer->get()), len + 1); - _ws.textAll(buffer); - } -} - -void wsSend(uint32_t client_id, JsonObject& root) { - AsyncWebSocketClient* client = _ws.client(client_id); - if (client == nullptr) return; - - // TODO: avoid serializing twice? - size_t len = root.measureLength(); - AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len); - - if (buffer) { - root.printTo(reinterpret_cast(buffer->get()), len + 1); - client->text(buffer); - } -} - void _wsConnected(uint32_t client_id) { const bool changePassword = (USE_PASSWORD && WEB_FORCE_PASS_CHANGE) @@ -515,22 +547,30 @@ void _wsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventTy // TODO: make this generic loop method to queue important ws messages? // or, if something uses ticker / async ctx to send messages, // it needs a retry mechanism built into the callback object -void _wsHandleClientData(const bool connected) { +void _wsHandlePostponedCallbacks(bool connected) { - if (!connected && !_ws_client_data.empty()) { - _ws_client_data.pop(); + if (!connected && !_ws_queue.empty()) { + _ws_queue.pop(); return; } - if (_ws_client_data.empty()) return; - auto& data = _ws_client_data.front(); + if (_ws_queue.empty()) return; + auto& callbacks = _ws_queue.front(); + + // avoid stalling forever when can't send anything + constexpr decltype(ESP.getCycleCount()) WsQueueTimeoutClockCycles = microsecondsToClockCycles(10 * 1000 * 1000); // 10s + if (ESP.getCycleCount() - callbacks.timestamp > WsQueueTimeoutClockCycles) { + _ws_queue.pop(); + return; + } // client_id == 0 means we need to send the message to every client - if (data.client_id) { - AsyncWebSocketClient* ws_client = _ws.client(data.client_id); + if (callbacks.client_id) { + AsyncWebSocketClient* ws_client = _ws.client(callbacks.client_id); + // ...but, we need to check if client is still connected if (!ws_client) { - _ws_client_data.pop(); + _ws_queue.pop(); return; } @@ -544,27 +584,27 @@ void _wsHandleClientData(const bool connected) { // XXX: block allocation will try to create *2 next time, // likely failing and causing wsSend to reference empty objects // XXX: arduinojson6 will not do this, but we may need to use per-callback buffers - constexpr const size_t BUFFER_SIZE = 3192; - DynamicJsonBuffer jsonBuffer(BUFFER_SIZE); + constexpr size_t WsQueueJsonBufferSize = 3192; + DynamicJsonBuffer jsonBuffer(WsQueueJsonBufferSize); JsonObject& root = jsonBuffer.createObject(); - data.send(root); - if (data.client_id) { - wsSend(data.client_id, root); + callbacks.send(root); + if (callbacks.client_id) { + wsSend(callbacks.client_id, root); } else { wsSend(root); } yield(); - if (data.done()) { - _ws_client_data.pop(); + if (callbacks.done()) { + _ws_queue.pop(); } } void _wsLoop() { const bool connected = wsConnected(); _wsDoUpdate(connected); - _wsHandleClientData(connected); + _wsHandlePostponedCallbacks(connected); #if DEBUG_WEB_SUPPORT _ws_debug.send(connected); #endif @@ -586,6 +626,31 @@ ws_callbacks_t& wsRegister() { return _ws_callbacks; } +void wsSend(JsonObject& root) { + // Note: 'measurement' tries to serialize json contents byte-by-byte, + // which is somewhat costly, but likely unavoidable for us. + size_t len = root.measureLength(); + AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len); + + if (buffer) { + root.printTo(reinterpret_cast(buffer->get()), len + 1); + _ws.textAll(buffer); + } +} + +void wsSend(uint32_t client_id, JsonObject& root) { + AsyncWebSocketClient* client = _ws.client(client_id); + if (client == nullptr) return; + + size_t len = root.measureLength(); + AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len); + + if (buffer) { + root.printTo(reinterpret_cast(buffer->get()), len + 1); + client->text(buffer); + } +} + void wsSend(ws_on_send_callback_f callback) { if (_ws.count() > 0) { DynamicJsonBuffer jsonBuffer(512); @@ -630,34 +695,6 @@ void wsSend_P(uint32_t client_id, PGM_P payload) { _ws.text(client_id, buffer); } -void wsPost(const ws_on_send_callback_f& cb) { - _ws_client_data.emplace(cb); -} - -void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb) { - _ws_client_data.emplace(client_id, cb); -} - -void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs) { - _ws_client_data.emplace(client_id, cbs, ws_data_t::ALL); -} - -void wsPostAll(const ws_on_send_callback_list_t& cbs) { - _ws_client_data.emplace(0, cbs, ws_data_t::ALL); -} - -void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs) { - _ws_client_data.emplace(client_id, cbs, ws_data_t::SEQUENCE); -} - -void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs) { - _ws_client_data.emplace(client_id, std::forward(cbs), ws_data_t::SEQUENCE); -} - -void wsPostSequence(const ws_on_send_callback_list_t& cbs) { - _ws_client_data.emplace(0, cbs, ws_data_t::SEQUENCE); -} - void wsSetup() { _ws.onEvent(_wsEvent); diff --git a/code/espurna/ws.h b/code/espurna/ws.h index 538f756f..c39802a0 100644 --- a/code/espurna/ws.h +++ b/code/espurna/ws.h @@ -3,6 +3,7 @@ WEBSOCKET MODULE Copyright (C) 2016-2019 by Xose Pérez +Copyright (C) 2019 by Maxim Prokhorov */ @@ -19,6 +20,18 @@ Copyright (C) 2016-2019 by Xose Pérez #include "web.h" #include "utils.h" +// Generalized WS lifetime callbacks. +// Each callback is kept as std::function, thus we can use complex objects, and not just basic function pointers. +// +// Connection start: +// - on_visible will be the very first message sent, callback data will be grouped together +// - on_connected is sent next, but each callback's data will be sent separately +// - on_data is the final one, each callback is executed separately +// +// While connected: +// - on_action will be ran whenever we receive special JSON 'action' payload +// - on_keycheck will be used to determine if we can handle specific settings keys + using ws_on_send_callback_f = std::function; using ws_on_action_callback_f = std::function; using ws_on_keycheck_callback_f = std::function; @@ -28,44 +41,76 @@ using ws_on_action_callback_list_t = std::vector; using ws_on_keycheck_callback_list_t = std::vector; struct ws_callbacks_t { + ws_callbacks_t& onVisible(ws_on_send_callback_f); + ws_callbacks_t& onConnected(ws_on_send_callback_f); + ws_callbacks_t& onData(ws_on_send_callback_f); + ws_callbacks_t& onAction(ws_on_action_callback_f); + ws_callbacks_t& onKeyCheck(ws_on_keycheck_callback_f); + ws_on_send_callback_list_t on_visible; ws_on_send_callback_list_t on_connected; ws_on_send_callback_list_t on_data; ws_on_action_callback_list_t on_action; ws_on_keycheck_callback_list_t on_keycheck; - - ws_callbacks_t& onVisible(ws_on_send_callback_f); - ws_callbacks_t& onConnected(ws_on_send_callback_f); - ws_callbacks_t& onData(ws_on_send_callback_f); - ws_callbacks_t& onAction(ws_on_action_callback_f); - ws_callbacks_t& onKeyCheck(ws_on_keycheck_callback_f); }; -ws_callbacks_t& wsRegister(); +// Postponed debug messages. best-effort, will not be re-scheduled when ws queue is full -void wsSetup(); -void wsSend(uint32_t client_id, const char* data); +bool wsDebugSend(const char* prefix, const char* message); + +// Postponed json messages. schedules callback(s) to be called when resources to do so are available. +// Queued item is removed on client disconnection *or* when internal timeout occurs + +// There are two policies set on how to send the data: +// - All will use the same JsonObject for each callback +// - Sequence will use a different JsonObject for each callback +// Default is All +// +// WARNING: callback lists are taken by reference! make sure that list is ether: +// - std::move(...)'ed to give control of the callback list to us +// - persistent and will be available after the current block ends (global, heap-allocated, etc.) +// de-allocation is not expected e.g. ws_callbacks_t from wsRegister() is never destroyed + +void wsPost(uint32_t client_id, ws_on_send_callback_f&& cb); +void wsPost(ws_on_send_callback_f&& cb); +void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb); +void wsPost(const ws_on_send_callback_f& cb); + +void wsPostAll(uint32_t client_id, ws_on_send_callback_list_t&& cbs); +void wsPostAll(ws_on_send_callback_list_t&& cbs); +void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs); +void wsPostAll(const ws_on_send_callback_list_t& cbs); + +void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs); +void wsPostSequence(ws_on_send_callback_list_t&& cbs); +void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs); +void wsPostSequence(const ws_on_send_callback_list_t& cbs); + +// Immmediatly try to serialize and send JsonObject& +// May silently fail when network is busy sending previous requests + +void wsSend(JsonObject& root); void wsSend(uint32_t client_id, JsonObject& root); + void wsSend(JsonObject& root); void wsSend(ws_on_send_callback_f callback); void wsSend(const char* data); +// Immediatly try to serialize and send raw char data +// (also, see above) + void wsSend_P(PGM_P data); void wsSend_P(uint32_t client_id, PGM_P data); -void wsPost(const ws_on_send_callback_f& callback); -void wsPost(uint32_t client_id, const ws_on_send_callback_f& callback); -void wsPost(const ws_on_send_callback_list_t& callbacks); -void wsPost(uint32_t client_id, const ws_on_send_callback_list_t& callbacks); - -void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& callbacks); -void wsPostAll(const ws_on_send_callback_list_t& callbacks); - -void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& callbacks); -void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& callbacks); -void wsPostSequence(const ws_on_send_callback_list_t& callbacks); +// Check if any or specific client_id is connected +// Server will try to set unique ID for each client bool wsConnected(); bool wsConnected(uint32_t client_id); -bool wsDebugSend(const char* prefix, const char* message); + +// Access to our module-specific lifetime callbacks. +// Expected usage is through the on() methods + +ws_callbacks_t& wsRegister(); +void wsSetup(); diff --git a/code/espurna/ws_internal.h b/code/espurna/ws_internal.h index 5e4097fe..502f9170 100644 --- a/code/espurna/ws_internal.h +++ b/code/espurna/ws_internal.h @@ -1,14 +1,16 @@ /* -WEBSOCKET MODULE +Part of the WEBSOCKET MODULE Copyright (C) 2016-2019 by Xose Pérez +Copyright (C) 2019 by Maxim Prokhorov */ #pragma once #include "espurna.h" +#include "ws.h" #include @@ -16,13 +18,11 @@ Copyright (C) 2016-2019 by Xose Pérez #include #include -constexpr const size_t WS_DEBUG_MSG_BUFFER = 8; - // ----------------------------------------------------------------------------- // WS authentication // ----------------------------------------------------------------------------- -struct ws_ticket_t { +struct WsTicket { IPAddress ip; unsigned long timestamp = 0; }; @@ -31,115 +31,147 @@ struct ws_ticket_t { // WS callbacks // ----------------------------------------------------------------------------- -struct ws_data_t { +// The idea here is to bind either: +// - constant 'callbacks' list as reference, which was registered via wsRegister() +// - in-place callback / callbacks that will be moved inside this container + +struct WsPostponedCallbacks { + + public: - enum mode_t { - SEQUENCE, - ALL + enum class Mode { + Sequence, + All }; - ws_data_t(const ws_on_send_callback_f& cb) : - storage(new ws_on_send_callback_list_t {cb}), - client_id(0), - mode(ALL), - callbacks(*storage.get()), - current(callbacks.begin()) + WsPostponedCallbacks(uint32_t client_id, ws_on_send_callback_f&& cb) : + client_id(client_id), + timestamp(ESP.getCycleCount()), + _storage(new ws_on_send_callback_list_t {std::move(cb)}), + _callbacks(*_storage.get()), + _current(_callbacks.begin()), + _mode(Mode::All) {} - ws_data_t(uint32_t client_id, const ws_on_send_callback_f& cb) : - storage(new ws_on_send_callback_list_t {cb}), + WsPostponedCallbacks(uint32_t client_id, const ws_on_send_callback_f& cb) : client_id(client_id), - mode(ALL), - callbacks(*storage.get()), - current(callbacks.begin()) + timestamp(ESP.getCycleCount()), + _storage(new ws_on_send_callback_list_t {cb}), + _callbacks(*_storage.get()), + _current(_callbacks.begin()), + _mode(Mode::All) {} - ws_data_t(const uint32_t client_id, ws_on_send_callback_list_t&& callbacks, mode_t mode = SEQUENCE) : - storage(new ws_on_send_callback_list_t(std::move(callbacks))), + template + explicit WsPostponedCallbacks(T&& cb) : + WsPostponedCallbacks(0, std::forward(cb)) + {} + + WsPostponedCallbacks(const uint32_t client_id, const ws_on_send_callback_list_t& cbs, Mode mode = Mode::Sequence) : client_id(client_id), - mode(mode), - callbacks(*storage.get()), - current(callbacks.begin()) + timestamp(ESP.getCycleCount()), + _callbacks(cbs), + _current(_callbacks.begin()), + _mode(mode) {} - ws_data_t(const uint32_t client_id, const ws_on_send_callback_list_t& callbacks, mode_t mode = SEQUENCE) : + WsPostponedCallbacks(const uint32_t client_id, ws_on_send_callback_list_t&& cbs, Mode mode = Mode::All) : client_id(client_id), - mode(mode), - callbacks(callbacks), - current(callbacks.begin()) + timestamp(ESP.getCycleCount()), + _storage(new ws_on_send_callback_list_t(std::move(cbs))), + _callbacks(*_storage.get()), + _current(_callbacks.begin()), + _mode(mode) {} bool done() { - return current == callbacks.end(); + return _current == _callbacks.end(); } void sendAll(JsonObject& root) { - current = callbacks.end(); - for (auto& callback : callbacks) { + _current = _callbacks.end(); + for (auto& callback : _callbacks) { callback(root); } } void sendCurrent(JsonObject& root) { - if (current == callbacks.end()) return; - (*current)(root); - ++current; + if (_current == _callbacks.end()) return; + (*_current)(root); + ++_current; } void send(JsonObject& root) { - switch (mode) { - case SEQUENCE: sendCurrent(root); break; - case ALL: sendAll(root); break; + switch (_mode) { + case Mode::Sequence: + sendCurrent(root); + break; + case Mode::All: + sendAll(root); + break; } } - std::unique_ptr storage; - const uint32_t client_id; - const mode_t mode; - const ws_on_send_callback_list_t& callbacks; - ws_on_send_callback_list_t::const_iterator current; + const decltype(ESP.getCycleCount()) timestamp; + + private: + + std::unique_ptr _storage; + + const ws_on_send_callback_list_t& _callbacks; + ws_on_send_callback_list_t::const_iterator _current; + + const Mode _mode; + }; // ----------------------------------------------------------------------------- // Debug // ----------------------------------------------------------------------------- -using ws_debug_msg_t = std::pair; -using ws_debug_messages_t = std::vector; +struct WsDebug { -struct ws_debug_t { + using Message = std::pair; + using MsgList = std::vector; - ws_debug_t(size_t capacity) : - flush(false), - current(0), - capacity(capacity) + WsDebug(size_t capacity) : + _flush(false), + _current(0), + _capacity(capacity) { - messages.reserve(capacity); + _messages.reserve(_capacity); } void clear() { - messages.clear(); - current = 0; - flush = false; + _messages.clear(); + _current = 0; + _flush = false; } - void add(const char* prefix, const char* message) { - if (current >= capacity) { - flush = true; + template + void add(T&& message) { + if (_current >= _capacity) { + _flush = true; send(wsConnected()); } - messages.emplace(messages.begin() + current, prefix, message); - flush = true; - ++current; + _messages.emplace(_messages.begin() + _current, std::forward(message)); + _flush = true; + ++_current; } - void send(const bool connected); + void add(const char* prefix, const char* message) { + add(std::move(std::make_pair(prefix, message))); + } + + void send(bool connected); + + private: - bool flush; - size_t current; - const size_t capacity; - ws_debug_messages_t messages; + bool _flush; + size_t _current; + const size_t _capacity; + MsgList _messages; };