Browse Source

WS refactoring (#2261)

* ws: clean-up wsPost implementation

- explain ourselves
- re-do consts, fix locality
- fix shadowing in ctors
- more consistent naming
- timeout for messages

* save 16 bytes temporary

* reference reference
mcspr-patch-1
Max Prokhorov 4 years ago
committed by GitHub
parent
commit
ff89504d39
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 285 additions and 169 deletions
  1. +3
    -1
      code/espurna/mqtt.cpp
  2. +120
    -83
      code/espurna/ws.cpp
  3. +66
    -21
      code/espurna/ws.h
  4. +96
    -64
      code/espurna/ws_internal.h

+ 3
- 1
code/espurna/mqtt.cpp View File

@ -1148,7 +1148,9 @@ void mqttSetup() {
.onKeyCheck(_mqttWebSocketOnKeyCheck);
mqttRegister([](unsigned int type, const char*, const char*) {
if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) wsPost(_mqttWebSocketOnData);
if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) {
wsPost(_mqttWebSocketOnData);
}
});
#endif


+ 120
- 83
code/espurna/ws.cpp View File

@ -28,6 +28,64 @@ uint32_t _ws_last_update = 0;
// WS callbacks
// -----------------------------------------------------------------------------
std::queue<WsPostponedCallbacks> _ws_queue;
ws_callbacks_t _ws_callbacks;
void wsPost(uint32_t client_id, ws_on_send_callback_f&& cb) {
_ws_queue.emplace(client_id, std::move(cb));
}
void wsPost(ws_on_send_callback_f&& cb) {
wsPost(0, std::move(cb));
}
void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb) {
_ws_queue.emplace(client_id, cb);
}
void wsPost(const ws_on_send_callback_f& cb) {
wsPost(0, cb);
}
template <typename T>
void _wsPostCallbacks(uint32_t client_id, T&& cbs, WsPostponedCallbacks::Mode mode) {
_ws_queue.emplace(client_id, std::forward<T>(cbs), mode);
}
void wsPostAll(uint32_t client_id, ws_on_send_callback_list_t&& cbs) {
_wsPostCallbacks(client_id, std::move(cbs), WsPostponedCallbacks::Mode::All);
}
void wsPostAll(ws_on_send_callback_list_t&& cbs) {
wsPostAll(0, std::move(cbs));
}
void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_wsPostCallbacks(client_id, cbs, WsPostponedCallbacks::Mode::All);
}
void wsPostAll(const ws_on_send_callback_list_t& cbs) {
wsPostAll(0, cbs);
}
void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs) {
_wsPostCallbacks(client_id, std::move(cbs), WsPostponedCallbacks::Mode::Sequence);
}
void wsPostSequence(ws_on_send_callback_list_t&& cbs) {
wsPostSequence(0, std::move(cbs));
}
void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_wsPostCallbacks(client_id, cbs, WsPostponedCallbacks::Mode::Sequence);
}
void wsPostSequence(const ws_on_send_callback_list_t& cbs) {
wsPostSequence(0, cbs);
}
// -----------------------------------------------------------------------------
ws_callbacks_t& ws_callbacks_t::onVisible(ws_on_send_callback_f cb) {
on_visible.push_back(cb);
return *this;
@ -53,14 +111,11 @@ ws_callbacks_t& ws_callbacks_t::onKeyCheck(ws_on_keycheck_callback_f cb) {
return *this;
}
static ws_callbacks_t _ws_callbacks;
static std::queue<ws_data_t> _ws_client_data;
// -----------------------------------------------------------------------------
// WS authentication
// -----------------------------------------------------------------------------
ws_ticket_t _ws_tickets[WS_BUFFER_SIZE];
WsTicket _ws_tickets[WS_BUFFER_SIZE];
void _onAuth(AsyncWebServerRequest *request) {
@ -109,28 +164,30 @@ bool _wsAuth(AsyncWebSocketClient * client) {
#if DEBUG_WEB_SUPPORT
ws_debug_t _ws_debug(WS_DEBUG_MSG_BUFFER);
constexpr size_t WsDebugMessagesMax = 8;
WsDebug _ws_debug(WsDebugMessagesMax);
void ws_debug_t::send(const bool connected) {
if (!connected && flush) {
void WsDebug::send(bool connected) {
if (!connected && _flush) {
clear();
return;
}
if (!flush) return;
if (!_flush) return;
// ref: http://arduinojson.org/v5/assistant/
// {"weblog": {"msg":[...],"pre":[...]}}
DynamicJsonBuffer jsonBuffer(2*JSON_ARRAY_SIZE(messages.size()) + JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2));
DynamicJsonBuffer jsonBuffer(2*JSON_ARRAY_SIZE(_messages.size()) + JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2));
JsonObject& root = jsonBuffer.createObject();
JsonObject& weblog = root.createNestedObject("weblog");
JsonArray& msg = weblog.createNestedArray("msg");
JsonArray& pre = weblog.createNestedArray("pre");
JsonArray& msg_array = weblog.createNestedArray("msg");
JsonArray& pre_array = weblog.createNestedArray("pre");
for (auto& message : messages) {
pre.add(message.first.c_str());
msg.add(message.second.c_str());
for (auto& msg : _messages) {
pre_array.add(msg.first.c_str());
msg_array.add(msg.second.c_str());
}
wsSend(root);
@ -421,31 +478,6 @@ void _wsOnConnected(JsonObject& root) {
root["hbInterval"] = getSetting("hbInterval", HEARTBEAT_INTERVAL);
}
void wsSend(JsonObject& root) {
// TODO: avoid serializing twice?
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);
if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
_ws.textAll(buffer);
}
}
void wsSend(uint32_t client_id, JsonObject& root) {
AsyncWebSocketClient* client = _ws.client(client_id);
if (client == nullptr) return;
// TODO: avoid serializing twice?
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);
if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
client->text(buffer);
}
}
void _wsConnected(uint32_t client_id) {
const bool changePassword = (USE_PASSWORD && WEB_FORCE_PASS_CHANGE)
@ -515,22 +547,30 @@ void _wsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventTy
// TODO: make this generic loop method to queue important ws messages?
// or, if something uses ticker / async ctx to send messages,
// it needs a retry mechanism built into the callback object
void _wsHandleClientData(const bool connected) {
void _wsHandlePostponedCallbacks(bool connected) {
if (!connected && !_ws_client_data.empty()) {
_ws_client_data.pop();
if (!connected && !_ws_queue.empty()) {
_ws_queue.pop();
return;
}
if (_ws_client_data.empty()) return;
auto& data = _ws_client_data.front();
if (_ws_queue.empty()) return;
auto& callbacks = _ws_queue.front();
// avoid stalling forever when can't send anything
constexpr decltype(ESP.getCycleCount()) WsQueueTimeoutClockCycles = microsecondsToClockCycles(10 * 1000 * 1000); // 10s
if (ESP.getCycleCount() - callbacks.timestamp > WsQueueTimeoutClockCycles) {
_ws_queue.pop();
return;
}
// client_id == 0 means we need to send the message to every client
if (data.client_id) {
AsyncWebSocketClient* ws_client = _ws.client(data.client_id);
if (callbacks.client_id) {
AsyncWebSocketClient* ws_client = _ws.client(callbacks.client_id);
// ...but, we need to check if client is still connected
if (!ws_client) {
_ws_client_data.pop();
_ws_queue.pop();
return;
}
@ -544,27 +584,27 @@ void _wsHandleClientData(const bool connected) {
// XXX: block allocation will try to create *2 next time,
// likely failing and causing wsSend to reference empty objects
// XXX: arduinojson6 will not do this, but we may need to use per-callback buffers
constexpr const size_t BUFFER_SIZE = 3192;
DynamicJsonBuffer jsonBuffer(BUFFER_SIZE);
constexpr size_t WsQueueJsonBufferSize = 3192;
DynamicJsonBuffer jsonBuffer(WsQueueJsonBufferSize);
JsonObject& root = jsonBuffer.createObject();
data.send(root);
if (data.client_id) {
wsSend(data.client_id, root);
callbacks.send(root);
if (callbacks.client_id) {
wsSend(callbacks.client_id, root);
} else {
wsSend(root);
}
yield();
if (data.done()) {
_ws_client_data.pop();
if (callbacks.done()) {
_ws_queue.pop();
}
}
void _wsLoop() {
const bool connected = wsConnected();
_wsDoUpdate(connected);
_wsHandleClientData(connected);
_wsHandlePostponedCallbacks(connected);
#if DEBUG_WEB_SUPPORT
_ws_debug.send(connected);
#endif
@ -586,6 +626,31 @@ ws_callbacks_t& wsRegister() {
return _ws_callbacks;
}
void wsSend(JsonObject& root) {
// Note: 'measurement' tries to serialize json contents byte-by-byte,
// which is somewhat costly, but likely unavoidable for us.
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);
if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
_ws.textAll(buffer);
}
}
void wsSend(uint32_t client_id, JsonObject& root) {
AsyncWebSocketClient* client = _ws.client(client_id);
if (client == nullptr) return;
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);
if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
client->text(buffer);
}
}
void wsSend(ws_on_send_callback_f callback) {
if (_ws.count() > 0) {
DynamicJsonBuffer jsonBuffer(512);
@ -630,34 +695,6 @@ void wsSend_P(uint32_t client_id, PGM_P payload) {
_ws.text(client_id, buffer);
}
void wsPost(const ws_on_send_callback_f& cb) {
_ws_client_data.emplace(cb);
}
void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb) {
_ws_client_data.emplace(client_id, cb);
}
void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(client_id, cbs, ws_data_t::ALL);
}
void wsPostAll(const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(0, cbs, ws_data_t::ALL);
}
void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(client_id, cbs, ws_data_t::SEQUENCE);
}
void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs) {
_ws_client_data.emplace(client_id, std::forward<ws_on_send_callback_list_t>(cbs), ws_data_t::SEQUENCE);
}
void wsPostSequence(const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(0, cbs, ws_data_t::SEQUENCE);
}
void wsSetup() {
_ws.onEvent(_wsEvent);


+ 66
- 21
code/espurna/ws.h View File

@ -3,6 +3,7 @@
WEBSOCKET MODULE
Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
Copyright (C) 2019 by Maxim Prokhorov <prokhorov dot max at outlook dot com>
*/
@ -19,6 +20,18 @@ Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
#include "web.h"
#include "utils.h"
// Generalized WS lifetime callbacks.
// Each callback is kept as std::function, thus we can use complex objects, and not just basic function pointers.
//
// Connection start:
// - on_visible will be the very first message sent, callback data will be grouped together
// - on_connected is sent next, but each callback's data will be sent separately
// - on_data is the final one, each callback is executed separately
//
// While connected:
// - on_action will be ran whenever we receive special JSON 'action' payload
// - on_keycheck will be used to determine if we can handle specific settings keys
using ws_on_send_callback_f = std::function<void(JsonObject& root)>;
using ws_on_action_callback_f = std::function<void(uint32_t client_id, const char * action, JsonObject& data)>;
using ws_on_keycheck_callback_f = std::function<bool(const char * key, JsonVariant& value)>;
@ -28,44 +41,76 @@ using ws_on_action_callback_list_t = std::vector<ws_on_action_callback_f>;
using ws_on_keycheck_callback_list_t = std::vector<ws_on_keycheck_callback_f>;
struct ws_callbacks_t {
ws_callbacks_t& onVisible(ws_on_send_callback_f);
ws_callbacks_t& onConnected(ws_on_send_callback_f);
ws_callbacks_t& onData(ws_on_send_callback_f);
ws_callbacks_t& onAction(ws_on_action_callback_f);
ws_callbacks_t& onKeyCheck(ws_on_keycheck_callback_f);
ws_on_send_callback_list_t on_visible;
ws_on_send_callback_list_t on_connected;
ws_on_send_callback_list_t on_data;
ws_on_action_callback_list_t on_action;
ws_on_keycheck_callback_list_t on_keycheck;
ws_callbacks_t& onVisible(ws_on_send_callback_f);
ws_callbacks_t& onConnected(ws_on_send_callback_f);
ws_callbacks_t& onData(ws_on_send_callback_f);
ws_callbacks_t& onAction(ws_on_action_callback_f);
ws_callbacks_t& onKeyCheck(ws_on_keycheck_callback_f);
};
ws_callbacks_t& wsRegister();
// Postponed debug messages. best-effort, will not be re-scheduled when ws queue is full
void wsSetup();
void wsSend(uint32_t client_id, const char* data);
bool wsDebugSend(const char* prefix, const char* message);
// Postponed json messages. schedules callback(s) to be called when resources to do so are available.
// Queued item is removed on client disconnection *or* when internal timeout occurs
// There are two policies set on how to send the data:
// - All will use the same JsonObject for each callback
// - Sequence will use a different JsonObject for each callback
// Default is All
//
// WARNING: callback lists are taken by reference! make sure that list is ether:
// - std::move(...)'ed to give control of the callback list to us
// - persistent and will be available after the current block ends (global, heap-allocated, etc.)
// de-allocation is not expected e.g. ws_callbacks_t from wsRegister() is never destroyed
void wsPost(uint32_t client_id, ws_on_send_callback_f&& cb);
void wsPost(ws_on_send_callback_f&& cb);
void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb);
void wsPost(const ws_on_send_callback_f& cb);
void wsPostAll(uint32_t client_id, ws_on_send_callback_list_t&& cbs);
void wsPostAll(ws_on_send_callback_list_t&& cbs);
void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs);
void wsPostAll(const ws_on_send_callback_list_t& cbs);
void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs);
void wsPostSequence(ws_on_send_callback_list_t&& cbs);
void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs);
void wsPostSequence(const ws_on_send_callback_list_t& cbs);
// Immmediatly try to serialize and send JsonObject&
// May silently fail when network is busy sending previous requests
void wsSend(JsonObject& root);
void wsSend(uint32_t client_id, JsonObject& root);
void wsSend(JsonObject& root);
void wsSend(ws_on_send_callback_f callback);
void wsSend(const char* data);
// Immediatly try to serialize and send raw char data
// (also, see above)
void wsSend_P(PGM_P data);
void wsSend_P(uint32_t client_id, PGM_P data);
void wsPost(const ws_on_send_callback_f& callback);
void wsPost(uint32_t client_id, const ws_on_send_callback_f& callback);
void wsPost(const ws_on_send_callback_list_t& callbacks);
void wsPost(uint32_t client_id, const ws_on_send_callback_list_t& callbacks);
void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& callbacks);
void wsPostAll(const ws_on_send_callback_list_t& callbacks);
void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& callbacks);
void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& callbacks);
void wsPostSequence(const ws_on_send_callback_list_t& callbacks);
// Check if any or specific client_id is connected
// Server will try to set unique ID for each client
bool wsConnected();
bool wsConnected(uint32_t client_id);
bool wsDebugSend(const char* prefix, const char* message);
// Access to our module-specific lifetime callbacks.
// Expected usage is through the on() methods
ws_callbacks_t& wsRegister();
void wsSetup();

+ 96
- 64
code/espurna/ws_internal.h View File

@ -1,14 +1,16 @@
/*
WEBSOCKET MODULE
Part of the WEBSOCKET MODULE
Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
Copyright (C) 2019 by Maxim Prokhorov <prokhorov dot max at outlook dot com>
*/
#pragma once
#include "espurna.h"
#include "ws.h"
#include <IPAddress.h>
@ -16,13 +18,11 @@ Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
#include <memory>
#include <vector>
constexpr const size_t WS_DEBUG_MSG_BUFFER = 8;
// -----------------------------------------------------------------------------
// WS authentication
// -----------------------------------------------------------------------------
struct ws_ticket_t {
struct WsTicket {
IPAddress ip;
unsigned long timestamp = 0;
};
@ -31,115 +31,147 @@ struct ws_ticket_t {
// WS callbacks
// -----------------------------------------------------------------------------
struct ws_data_t {
// The idea here is to bind either:
// - constant 'callbacks' list as reference, which was registered via wsRegister()
// - in-place callback / callbacks that will be moved inside this container
struct WsPostponedCallbacks {
public:
enum mode_t {
SEQUENCE,
ALL
enum class Mode {
Sequence,
All
};
ws_data_t(const ws_on_send_callback_f& cb) :
storage(new ws_on_send_callback_list_t {cb}),
client_id(0),
mode(ALL),
callbacks(*storage.get()),
current(callbacks.begin())
WsPostponedCallbacks(uint32_t client_id, ws_on_send_callback_f&& cb) :
client_id(client_id),
timestamp(ESP.getCycleCount()),
_storage(new ws_on_send_callback_list_t {std::move(cb)}),
_callbacks(*_storage.get()),
_current(_callbacks.begin()),
_mode(Mode::All)
{}
ws_data_t(uint32_t client_id, const ws_on_send_callback_f& cb) :
storage(new ws_on_send_callback_list_t {cb}),
WsPostponedCallbacks(uint32_t client_id, const ws_on_send_callback_f& cb) :
client_id(client_id),
mode(ALL),
callbacks(*storage.get()),
current(callbacks.begin())
timestamp(ESP.getCycleCount()),
_storage(new ws_on_send_callback_list_t {cb}),
_callbacks(*_storage.get()),
_current(_callbacks.begin()),
_mode(Mode::All)
{}
ws_data_t(const uint32_t client_id, ws_on_send_callback_list_t&& callbacks, mode_t mode = SEQUENCE) :
storage(new ws_on_send_callback_list_t(std::move(callbacks))),
template <typename T>
explicit WsPostponedCallbacks(T&& cb) :
WsPostponedCallbacks(0, std::forward<T>(cb))
{}
WsPostponedCallbacks(const uint32_t client_id, const ws_on_send_callback_list_t& cbs, Mode mode = Mode::Sequence) :
client_id(client_id),
mode(mode),
callbacks(*storage.get()),
current(callbacks.begin())
timestamp(ESP.getCycleCount()),
_callbacks(cbs),
_current(_callbacks.begin()),
_mode(mode)
{}
ws_data_t(const uint32_t client_id, const ws_on_send_callback_list_t& callbacks, mode_t mode = SEQUENCE) :
WsPostponedCallbacks(const uint32_t client_id, ws_on_send_callback_list_t&& cbs, Mode mode = Mode::All) :
client_id(client_id),
mode(mode),
callbacks(callbacks),
current(callbacks.begin())
timestamp(ESP.getCycleCount()),
_storage(new ws_on_send_callback_list_t(std::move(cbs))),
_callbacks(*_storage.get()),
_current(_callbacks.begin()),
_mode(mode)
{}
bool done() {
return current == callbacks.end();
return _current == _callbacks.end();
}
void sendAll(JsonObject& root) {
current = callbacks.end();
for (auto& callback : callbacks) {
_current = _callbacks.end();
for (auto& callback : _callbacks) {
callback(root);
}
}
void sendCurrent(JsonObject& root) {
if (current == callbacks.end()) return;
(*current)(root);
++current;
if (_current == _callbacks.end()) return;
(*_current)(root);
++_current;
}
void send(JsonObject& root) {
switch (mode) {
case SEQUENCE: sendCurrent(root); break;
case ALL: sendAll(root); break;
switch (_mode) {
case Mode::Sequence:
sendCurrent(root);
break;
case Mode::All:
sendAll(root);
break;
}
}
std::unique_ptr<ws_on_send_callback_list_t> storage;
const uint32_t client_id;
const mode_t mode;
const ws_on_send_callback_list_t& callbacks;
ws_on_send_callback_list_t::const_iterator current;
const decltype(ESP.getCycleCount()) timestamp;
private:
std::unique_ptr<ws_on_send_callback_list_t> _storage;
const ws_on_send_callback_list_t& _callbacks;
ws_on_send_callback_list_t::const_iterator _current;
const Mode _mode;
};
// -----------------------------------------------------------------------------
// Debug
// -----------------------------------------------------------------------------
using ws_debug_msg_t = std::pair<String, String>;
using ws_debug_messages_t = std::vector<ws_debug_msg_t>;
struct WsDebug {
struct ws_debug_t {
using Message = std::pair<String, String>;
using MsgList = std::vector<Message>;
ws_debug_t(size_t capacity) :
flush(false),
current(0),
capacity(capacity)
WsDebug(size_t capacity) :
_flush(false),
_current(0),
_capacity(capacity)
{
messages.reserve(capacity);
_messages.reserve(_capacity);
}
void clear() {
messages.clear();
current = 0;
flush = false;
_messages.clear();
_current = 0;
_flush = false;
}
void add(const char* prefix, const char* message) {
if (current >= capacity) {
flush = true;
template <typename T = Message>
void add(T&& message) {
if (_current >= _capacity) {
_flush = true;
send(wsConnected());
}
messages.emplace(messages.begin() + current, prefix, message);
flush = true;
++current;
_messages.emplace(_messages.begin() + _current, std::forward<T>(message));
_flush = true;
++_current;
}
void send(const bool connected);
void add(const char* prefix, const char* message) {
add(std::move(std::make_pair(prefix, message)));
}
void send(bool connected);
private:
bool flush;
size_t current;
const size_t capacity;
ws_debug_messages_t messages;
bool _flush;
size_t _current;
const size_t _capacity;
MsgList _messages;
};

Loading…
Cancel
Save