From 43c2c41cba95c1fd8e702f4bf1609b6e4d10936e Mon Sep 17 00:00:00 2001 From: Max Prokhorov Date: Fri, 20 Mar 2020 21:07:02 +0300 Subject: [PATCH] mqtt(async): buffer some data (#2181) * mqtt: enum state * mqtt: async client buffer with MQTT_MAX_PACKET_SIZE * mqtt: rework debug messages * mqtt: MQTT_BUFFER_MAX_SIZE * mqtt/test: debug log for async callback * mqtt/test: don't log things we don't handle * button: fix typo --- code/espurna/button.ino | 2 +- code/espurna/config/dependencies.h | 12 ++ code/espurna/config/general.h | 15 +- code/espurna/mqtt.h | 3 + code/espurna/mqtt.ino | 303 ++++++++++++++++++++--------- code/platformio.ini | 2 +- 6 files changed, 229 insertions(+), 108 deletions(-) diff --git a/code/espurna/button.ino b/code/espurna/button.ino index f60bd660..ff311352 100644 --- a/code/espurna/button.ino +++ b/code/espurna/button.ino @@ -677,7 +677,7 @@ void _buttonLoopSonoffDual() { DEBUG_MSG_P(PSTR("[BUTTON] [LIGHTFOX] Received buttons mask: %u\n"), value); for (unsigned int i=0; i<_buttons.size(); i++) { - if ((value & (1 << i)) > 0); + if ((value & (1 << i)) > 0) { buttonEvent(i, button_event_t::Click); } } diff --git a/code/espurna/config/dependencies.h b/code/espurna/config/dependencies.h index bb25db23..b657f479 100644 --- a/code/espurna/config/dependencies.h +++ b/code/espurna/config/dependencies.h @@ -82,6 +82,8 @@ #endif #if THERMOSTAT_SUPPORT +#undef MQTT_USE_JSON +#define MQTT_USE_JSON 1 // Thermostat depends on group messages in a JSON body #undef RELAY_SUPPORT #define RELAY_SUPPORT 1 // Thermostat depends on switches #endif @@ -195,3 +197,13 @@ #undef BUTTON1_LNGCLICK #define BUTTON1_LNGCLICK BUTTON_ACTION_TOGGLE #endif + +//------------------------------------------------------------------------------ +// We should always set MQTT_MAX_PACKET_SIZE +// + +#if MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT +#if not defined(MQTT_MAX_PACKET_SIZE) +#warning "MQTT_MAX_PACKET_SIZE should be set in `build_flags = ...` of the environment! Default value is used instead." +#endif +#endif diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index 856ba74b..038c91eb 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -1084,15 +1084,8 @@ #define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection #endif - -#if THERMOSTAT_SUPPORT == 1 - #ifndef MQTT_USE_JSON - #define MQTT_USE_JSON 1 // Group messages in a JSON body - #endif -#else - #ifndef MQTT_USE_JSON - #define MQTT_USE_JSON 0 // Don't group messages in a JSON body (default) - #endif +#ifndef MQTT_USE_JSON +#define MQTT_USE_JSON 0 // Don't group messages in a JSON body by default #endif #ifndef MQTT_USE_JSON_DELAY @@ -1103,6 +1096,10 @@ #define MQTT_QUEUE_MAX_SIZE 20 // Size of the MQTT queue when MQTT_USE_JSON is enabled #endif +#ifndef MQTT_BUFFER_MAX_SIZE +#define MQTT_BUFFER_MAX_SIZE 1024 // Size of the MQTT payload buffer for MQTT_MESSAGE_EVENT. Large messages will only be available via MQTT_MESSAGE_RAW_EVENT. + // Note: When using MQTT_LIBRARY_PUBSUBCLIENT, MQTT_MAX_PACKET_SIZE should not be more than this value. +#endif // These are the properties that will be sent when useJson is true #ifndef MQTT_ENQUEUE_IP diff --git a/code/espurna/mqtt.h b/code/espurna/mqtt.h index 0c79d08c..1fc73461 100644 --- a/code/espurna/mqtt.h +++ b/code/espurna/mqtt.h @@ -17,6 +17,9 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot using mqtt_callback_f = std::function; using mqtt_msg_t = std::pair; // topic, payload +// TODO: need this prototype for .ino +class AsyncMqttClientMessageProperties; + #if MQTT_SUPPORT #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index 06c79f67..8d1d82b6 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -21,6 +21,7 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot #include "rpc.h" #include "ws.h" +#include "libs/AsyncClientHelpers.h" #include "libs/SecureClientHelpers.h" #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT @@ -44,13 +45,13 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot #endif // SECURE_CLIENT != SECURE_CLIENT_NONE #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT -#ifdef MQTT_MAX_PACKET_SIZE - MQTTClient _mqtt(MQTT_MAX_PACKET_SIZE); -#else - MQTTClient _mqtt; -#endif + + MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE); + #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT - PubSubClient _mqtt; + + PubSubClient _mqtt; + #endif #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT @@ -60,8 +61,8 @@ bool _mqtt_enabled = MQTT_ENABLED; bool _mqtt_use_json = false; unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; unsigned long _mqtt_last_connection = 0; -bool _mqtt_connected = false; -bool _mqtt_connecting = false; +AsyncClientState _mqtt_state = AsyncClientState::Disconnected; +bool _mqtt_retain_skipped = false; bool _mqtt_retain = MQTT_RETAIN; int _mqtt_qos = MQTT_QOS; int _mqtt_keepalive = MQTT_KEEPALIVE; @@ -220,7 +221,7 @@ void _mqttConnect() { if (!_mqtt_enabled) return; // Do not connect if already connected or still trying to connect - if (_mqtt.connected() || _mqtt_connecting) return; + if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return; // Check reconnect interval if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return; @@ -243,7 +244,7 @@ void _mqttConnect() { DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive); DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str()); - _mqtt_connecting = true; + _mqtt_state = AsyncClientState::Connecting; #if SECURE_CLIENT != SECURE_CLIENT_NONE const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED); @@ -401,40 +402,76 @@ void _mqttBackwards() { } void _mqttInfo() { - DEBUG_MSG_P(PSTR( - "[MQTT] " - #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - "AsyncMqttClient" - #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT - "Arduino-MQTT" - #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT - "PubSubClient" - #endif - ", SSL " - #if SECURE_CLIENT != SEURE_CLIENT_NONE - "ENABLED" - #else - "DISABLED" - #endif - ", Autoconnect " - #if MQTT_AUTOCONNECT - "ENABLED" - #else - "DISABLED" - #endif - "\n" - )); - DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"), - _mqtt_enabled ? "ENABLED" : "DISABLED", - _mqtt.connected() ? "CONNECTED" : "DISCONNECTED" - ); - DEBUG_MSG_P(PSTR("[MQTT] Retry %s (Now %u, Last %u, Delay %u, Step %u)\n"), - _mqtt_connecting ? "CONNECTING" : "WAITING", - millis(), - _mqtt_last_connection, - _mqtt_reconnect_delay, - MQTT_RECONNECT_DELAY_STEP - ); + // Build information + { + #define __MQTT_INFO_STR(X) #X + #define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X) + DEBUG_MSG_P(PSTR( + "[MQTT] " + #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + "AsyncMqttClient" + #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT + "Arduino-MQTT" + #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT + "PubSubClient" + #endif + ", SSL " + #if SECURE_CLIENT != SEURE_CLIENT_NONE + "ENABLED" + #else + "DISABLED" + #endif + ", Autoconnect " + #if MQTT_AUTOCONNECT + "ENABLED" + #else + "DISABLED" + #endif + ", Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " bytes" + "\n" + )); + #undef _MQTT_INFO_STR + #undef __MQTT_INFO_STR + } + + // Notify about the general state of the client + { + const __FlashStringHelper* enabled = _mqtt_enabled + ? F("ENABLED") + : F("DISABLED"); + + const __FlashStringHelper* state = nullptr; + switch (_mqtt_state) { + case AsyncClientState::Connecting: + state = F("CONNECTING"); + break; + case AsyncClientState::Connected: + state = F("CONNECTED"); + break; + case AsyncClientState::Disconnected: + state = F("DISCONNECTED"); + break; + case AsyncClientState::Disconnecting: + state = F("DISCONNECTING"); + break; + default: + state = F("WAITING"); + break; + } + + DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"), + String(enabled).c_str(), + String(state).c_str() + ); + + if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) { + DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"), + _mqtt_last_connection, + _mqtt_reconnect_delay, + MQTT_RECONNECT_DELAY_STEP + ); + } + } } // ----------------------------------------------------------------------------- @@ -533,19 +570,20 @@ void _mqttCallback(unsigned int type, const char * topic, const char * payload) void _mqttOnConnect() { - DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; _mqtt_last_connection = millis(); - _mqtt_connecting = false; - _mqtt_connected = true; + _mqtt_state = AsyncClientState::Connected; + _mqtt_retain_skipped = false; + + DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); // Clean subscriptions mqttUnsubscribeRaw("#"); - // Send connect event to subscribers - for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { - (_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL); + // Notify all subscribers about the connection + for (auto& callback : _mqtt_callbacks) { + callback(MQTT_CONNECT_EVENT, nullptr, nullptr); } } @@ -554,40 +592,88 @@ void _mqttOnDisconnect() { // Reset reconnection delay _mqtt_last_connection = millis(); - _mqtt_connecting = false; - _mqtt_connected = false; + _mqtt_state = AsyncClientState::Disconnected; + _mqtt_retain_skipped = false; DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); - // Send disconnect event to subscribers - for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { - (_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL); + // Notify all subscribers about the disconnect + for (auto& callback : _mqtt_callbacks) { + callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr); + } + +} + +// Force-skip everything received in a short window right after connecting to avoid syncronization issues. + +bool _mqttMaybeSkipRetained(char* topic) { + #if MQTT_SKIP_RETAINED + if (!_mqtt_retain_skipped && (millis() - _mqtt_last_connection < MQTT_SKIP_TIME)) { + DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic); + return true; + } + #endif + + _mqtt_retain_skipped = true; + return false; +} + +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + +// MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could* +// receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the +// data until `(len + index) == total`. +// TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String. +// In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages. +// TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks. + +void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { + if (!len || (len > MQTT_BUFFER_MAX_SIZE) || (total > MQTT_BUFFER_MAX_SIZE)) return; + if (_mqttMaybeSkipRetained(topic)) return; + + static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0}; + memmove(message + index, (char *) payload, len); + + // Not done yet + if (total != (len + index)) { + DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total); + return; + } + message[len + index] = '\0'; + DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message); + + // Call subscribers with the message buffer + for (auto& callback : _mqtt_callbacks) { + callback(MQTT_MESSAGE_EVENT, topic, message); } } +#else + +// Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/ +// TODO: consider reworking this (and async counterpart), giving callback func length of the message. + void _mqttOnMessage(char* topic, char* payload, unsigned int len) { - if (len == 0) return; + if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return; + if (_mqttMaybeSkipRetained(topic)) return; - char message[len + 1]; - strlcpy(message, (char *) payload, len + 1); + static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0}; + memmove(message, (char *) payload, len); + message[len] = '\0'; - #if MQTT_SKIP_RETAINED - if (millis() - _mqtt_last_connection < MQTT_SKIP_TIME) { - DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message); - return; - } - #endif DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message); - // Send message event to subscribers - for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { - (_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message); + // Call subscribers with the message buffer + for (auto& callback : _mqtt_callbacks) { + callback(MQTT_MESSAGE_EVENT, topic, message); } } +#endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + // ----------------------------------------------------------------------------- // Public API // ----------------------------------------------------------------------------- @@ -948,40 +1034,63 @@ void mqttSetup() { } #endif // SECURE_CLIENT != SECURE_CLIENT_NONE - _mqtt.onConnect([](bool sessionPresent) { + _mqtt.onMessage(_mqttOnMessageAsync); + + _mqtt.onConnect([](bool) { _mqttOnConnect(); }); - _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { - if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) { - DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n")); - } - if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) { - DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n")); - } - if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) { - DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n")); - } - if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) { - DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n")); - } - if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) { - DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n")); - } - #if SECURE_CLIENT == SECURE_CLIENT_AXTLS - if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) { - DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n")); - } - #endif - _mqttOnDisconnect(); - }); - _mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { - _mqttOnMessage(topic, payload, len); - }); + _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) { - DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId); + DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %u\n"), packetId); }); _mqtt.onPublish([](uint16_t packetId) { - DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId); + DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %u\n"), packetId); + }); + + _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { + + switch (reason) { + case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED: + DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n")); + break; + + case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED: + DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n")); + break; + + case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE: + DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n")); + break; + + case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS: + DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n")); + break; + + case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED: + DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n")); + break; + + case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT: + #if ASYNC_TCP_SSL_ENABLED + DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n")); + #endif + break; + + case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION: + // This is never used by the AsyncMqttClient source + #if 0 + DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n")); + #endif + break; + + case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE: + DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n")); + break; + + } + + _mqttOnDisconnect(); + }); #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT @@ -1039,7 +1148,7 @@ void mqttLoop() { } else { - if (_mqtt_connected) { + if (_mqtt_state != AsyncClientState::Disconnected) { _mqttOnDisconnect(); } diff --git a/code/platformio.ini b/code/platformio.ini index 7276f967..83c95a6c 100644 --- a/code/platformio.ini +++ b/code/platformio.ini @@ -77,7 +77,7 @@ board_1m = esp01_1m board_2m = esp_wroom_02 board_4m = esp12e -build_flags = -g -w -DMQTT_MAX_PACKET_SIZE=1024 -DNO_GLOBAL_EEPROM -DPIO_FRAMEWORK_ARDUINO_LWIP2_HIGHER_BANDWIDTH +build_flags = -g -w -DNO_GLOBAL_EEPROM -DPIO_FRAMEWORK_ARDUINO_LWIP2_HIGHER_BANDWIDTH build_flags_512k = ${common.build_flags} -Wl,-Teagle.flash.512k0m1s.ld build_flags_1m0m = ${common.build_flags} -Wl,-Teagle.flash.1m0m1s.ld build_flags_2m1m = ${common.build_flags} -Wl,-Teagle.flash.2m1m4s.ld