Browse Source

mqtt(async): buffer some data (#2181)

* mqtt: enum state

* mqtt: async client buffer with MQTT_MAX_PACKET_SIZE

* mqtt: rework debug messages

* mqtt: MQTT_BUFFER_MAX_SIZE

* mqtt/test: debug log for async callback

* mqtt/test: don't log things we don't handle

* button: fix typo
mcspr-patch-1
Max Prokhorov 4 years ago
committed by GitHub
parent
commit
43c2c41cba
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 108 deletions
  1. +1
    -1
      code/espurna/button.ino
  2. +12
    -0
      code/espurna/config/dependencies.h
  3. +6
    -9
      code/espurna/config/general.h
  4. +3
    -0
      code/espurna/mqtt.h
  5. +206
    -97
      code/espurna/mqtt.ino
  6. +1
    -1
      code/platformio.ini

+ 1
- 1
code/espurna/button.ino View File

@ -677,7 +677,7 @@ void _buttonLoopSonoffDual() {
DEBUG_MSG_P(PSTR("[BUTTON] [LIGHTFOX] Received buttons mask: %u\n"), value);
for (unsigned int i=0; i<_buttons.size(); i++) {
if ((value & (1 << i)) > 0);
if ((value & (1 << i)) > 0) {
buttonEvent(i, button_event_t::Click);
}
}


+ 12
- 0
code/espurna/config/dependencies.h View File

@ -82,6 +82,8 @@
#endif
#if THERMOSTAT_SUPPORT
#undef MQTT_USE_JSON
#define MQTT_USE_JSON 1 // Thermostat depends on group messages in a JSON body
#undef RELAY_SUPPORT
#define RELAY_SUPPORT 1 // Thermostat depends on switches
#endif
@ -195,3 +197,13 @@
#undef BUTTON1_LNGCLICK
#define BUTTON1_LNGCLICK BUTTON_ACTION_TOGGLE
#endif
//------------------------------------------------------------------------------
// We should always set MQTT_MAX_PACKET_SIZE
//
#if MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
#if not defined(MQTT_MAX_PACKET_SIZE)
#warning "MQTT_MAX_PACKET_SIZE should be set in `build_flags = ...` of the environment! Default value is used instead."
#endif
#endif

+ 6
- 9
code/espurna/config/general.h View File

@ -1084,15 +1084,8 @@
#define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection
#endif
#if THERMOSTAT_SUPPORT == 1
#ifndef MQTT_USE_JSON
#define MQTT_USE_JSON 1 // Group messages in a JSON body
#endif
#else
#ifndef MQTT_USE_JSON
#define MQTT_USE_JSON 0 // Don't group messages in a JSON body (default)
#endif
#ifndef MQTT_USE_JSON
#define MQTT_USE_JSON 0 // Don't group messages in a JSON body by default
#endif
#ifndef MQTT_USE_JSON_DELAY
@ -1103,6 +1096,10 @@
#define MQTT_QUEUE_MAX_SIZE 20 // Size of the MQTT queue when MQTT_USE_JSON is enabled
#endif
#ifndef MQTT_BUFFER_MAX_SIZE
#define MQTT_BUFFER_MAX_SIZE 1024 // Size of the MQTT payload buffer for MQTT_MESSAGE_EVENT. Large messages will only be available via MQTT_MESSAGE_RAW_EVENT.
// Note: When using MQTT_LIBRARY_PUBSUBCLIENT, MQTT_MAX_PACKET_SIZE should not be more than this value.
#endif
// These are the properties that will be sent when useJson is true
#ifndef MQTT_ENQUEUE_IP


+ 3
- 0
code/espurna/mqtt.h View File

@ -17,6 +17,9 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
using mqtt_callback_f = std::function<void(unsigned int type, const char * topic, char * payload)>;
using mqtt_msg_t = std::pair<String, String>; // topic, payload
// TODO: need this prototype for .ino
class AsyncMqttClientMessageProperties;
#if MQTT_SUPPORT
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT


+ 206
- 97
code/espurna/mqtt.ino View File

@ -21,6 +21,7 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#include "rpc.h"
#include "ws.h"
#include "libs/AsyncClientHelpers.h"
#include "libs/SecureClientHelpers.h"
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
@ -44,13 +45,13 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#endif // SECURE_CLIENT != SECURE_CLIENT_NONE
#if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
#ifdef MQTT_MAX_PACKET_SIZE
MQTTClient _mqtt(MQTT_MAX_PACKET_SIZE);
#else
MQTTClient _mqtt;
#endif
MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE);
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
PubSubClient _mqtt;
PubSubClient _mqtt;
#endif
#endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
@ -60,8 +61,8 @@ bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
unsigned long _mqtt_last_connection = 0;
bool _mqtt_connected = false;
bool _mqtt_connecting = false;
AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
bool _mqtt_retain_skipped = false;
bool _mqtt_retain = MQTT_RETAIN;
int _mqtt_qos = MQTT_QOS;
int _mqtt_keepalive = MQTT_KEEPALIVE;
@ -220,7 +221,7 @@ void _mqttConnect() {
if (!_mqtt_enabled) return;
// Do not connect if already connected or still trying to connect
if (_mqtt.connected() || _mqtt_connecting) return;
if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return;
// Check reconnect interval
if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
@ -243,7 +244,7 @@ void _mqttConnect() {
DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
_mqtt_connecting = true;
_mqtt_state = AsyncClientState::Connecting;
#if SECURE_CLIENT != SECURE_CLIENT_NONE
const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
@ -401,40 +402,76 @@ void _mqttBackwards() {
}
void _mqttInfo() {
DEBUG_MSG_P(PSTR(
"[MQTT] "
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
"AsyncMqttClient"
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
"Arduino-MQTT"
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
"PubSubClient"
#endif
", SSL "
#if SECURE_CLIENT != SEURE_CLIENT_NONE
"ENABLED"
#else
"DISABLED"
#endif
", Autoconnect "
#if MQTT_AUTOCONNECT
"ENABLED"
#else
"DISABLED"
#endif
"\n"
));
DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
_mqtt_enabled ? "ENABLED" : "DISABLED",
_mqtt.connected() ? "CONNECTED" : "DISCONNECTED"
);
DEBUG_MSG_P(PSTR("[MQTT] Retry %s (Now %u, Last %u, Delay %u, Step %u)\n"),
_mqtt_connecting ? "CONNECTING" : "WAITING",
millis(),
_mqtt_last_connection,
_mqtt_reconnect_delay,
MQTT_RECONNECT_DELAY_STEP
);
// Build information
{
#define __MQTT_INFO_STR(X) #X
#define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X)
DEBUG_MSG_P(PSTR(
"[MQTT] "
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
"AsyncMqttClient"
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
"Arduino-MQTT"
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
"PubSubClient"
#endif
", SSL "
#if SECURE_CLIENT != SEURE_CLIENT_NONE
"ENABLED"
#else
"DISABLED"
#endif
", Autoconnect "
#if MQTT_AUTOCONNECT
"ENABLED"
#else
"DISABLED"
#endif
", Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " bytes"
"\n"
));
#undef _MQTT_INFO_STR
#undef __MQTT_INFO_STR
}
// Notify about the general state of the client
{
const __FlashStringHelper* enabled = _mqtt_enabled
? F("ENABLED")
: F("DISABLED");
const __FlashStringHelper* state = nullptr;
switch (_mqtt_state) {
case AsyncClientState::Connecting:
state = F("CONNECTING");
break;
case AsyncClientState::Connected:
state = F("CONNECTED");
break;
case AsyncClientState::Disconnected:
state = F("DISCONNECTED");
break;
case AsyncClientState::Disconnecting:
state = F("DISCONNECTING");
break;
default:
state = F("WAITING");
break;
}
DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
String(enabled).c_str(),
String(state).c_str()
);
if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) {
DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"),
_mqtt_last_connection,
_mqtt_reconnect_delay,
MQTT_RECONNECT_DELAY_STEP
);
}
}
}
// -----------------------------------------------------------------------------
@ -533,19 +570,20 @@ void _mqttCallback(unsigned int type, const char * topic, const char * payload)
void _mqttOnConnect() {
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
_mqtt_last_connection = millis();
_mqtt_connecting = false;
_mqtt_connected = true;
_mqtt_state = AsyncClientState::Connected;
_mqtt_retain_skipped = false;
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
// Clean subscriptions
mqttUnsubscribeRaw("#");
// Send connect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
// Notify all subscribers about the connection
for (auto& callback : _mqtt_callbacks) {
callback(MQTT_CONNECT_EVENT, nullptr, nullptr);
}
}
@ -554,40 +592,88 @@ void _mqttOnDisconnect() {
// Reset reconnection delay
_mqtt_last_connection = millis();
_mqtt_connecting = false;
_mqtt_connected = false;
_mqtt_state = AsyncClientState::Disconnected;
_mqtt_retain_skipped = false;
DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
// Send disconnect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
// Notify all subscribers about the disconnect
for (auto& callback : _mqtt_callbacks) {
callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr);
}
}
// Force-skip everything received in a short window right after connecting to avoid syncronization issues.
bool _mqttMaybeSkipRetained(char* topic) {
#if MQTT_SKIP_RETAINED
if (!_mqtt_retain_skipped && (millis() - _mqtt_last_connection < MQTT_SKIP_TIME)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}
#endif
_mqtt_retain_skipped = true;
return false;
}
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
// MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could*
// receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the
// data until `(len + index) == total`.
// TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String.
// In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages.
// TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks.
void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
if (!len || (len > MQTT_BUFFER_MAX_SIZE) || (total > MQTT_BUFFER_MAX_SIZE)) return;
if (_mqttMaybeSkipRetained(topic)) return;
static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
memmove(message + index, (char *) payload, len);
// Not done yet
if (total != (len + index)) {
DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total);
return;
}
message[len + index] = '\0';
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
// Call subscribers with the message buffer
for (auto& callback : _mqtt_callbacks) {
callback(MQTT_MESSAGE_EVENT, topic, message);
}
}
#else
// Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/
// TODO: consider reworking this (and async counterpart), giving callback func length of the message.
void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
if (len == 0) return;
if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return;
if (_mqttMaybeSkipRetained(topic)) return;
char message[len + 1];
strlcpy(message, (char *) payload, len + 1);
static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
memmove(message, (char *) payload, len);
message[len] = '\0';
#if MQTT_SKIP_RETAINED
if (millis() - _mqtt_last_connection < MQTT_SKIP_TIME) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message);
return;
}
#endif
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
// Send message event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
// Call subscribers with the message buffer
for (auto& callback : _mqtt_callbacks) {
callback(MQTT_MESSAGE_EVENT, topic, message);
}
}
#endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
// -----------------------------------------------------------------------------
// Public API
// -----------------------------------------------------------------------------
@ -948,40 +1034,63 @@ void mqttSetup() {
}
#endif // SECURE_CLIENT != SECURE_CLIENT_NONE
_mqtt.onConnect([](bool sessionPresent) {
_mqtt.onMessage(_mqttOnMessageAsync);
_mqtt.onConnect([](bool) {
_mqttOnConnect();
});
_mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) {
DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) {
DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) {
DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
}
if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
}
#if SECURE_CLIENT == SECURE_CLIENT_AXTLS
if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
}
#endif
_mqttOnDisconnect();
});
_mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
_mqttOnMessage(topic, payload, len);
});
_mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId);
DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %u\n"), packetId);
});
_mqtt.onPublish([](uint16_t packetId) {
DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId);
DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %u\n"), packetId);
});
_mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
switch (reason) {
case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED:
DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
break;
case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED:
DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
break;
case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE:
DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
break;
case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS:
DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
break;
case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED:
DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
break;
case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT:
#if ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
#endif
break;
case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
// This is never used by the AsyncMqttClient source
#if 0
DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n"));
#endif
break;
case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE:
DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n"));
break;
}
_mqttOnDisconnect();
});
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
@ -1039,7 +1148,7 @@ void mqttLoop() {
} else {
if (_mqtt_connected) {
if (_mqtt_state != AsyncClientState::Disconnected) {
_mqttOnDisconnect();
}


+ 1
- 1
code/platformio.ini View File

@ -77,7 +77,7 @@ board_1m = esp01_1m
board_2m = esp_wroom_02
board_4m = esp12e
build_flags = -g -w -DMQTT_MAX_PACKET_SIZE=1024 -DNO_GLOBAL_EEPROM -DPIO_FRAMEWORK_ARDUINO_LWIP2_HIGHER_BANDWIDTH
build_flags = -g -w -DNO_GLOBAL_EEPROM -DPIO_FRAMEWORK_ARDUINO_LWIP2_HIGHER_BANDWIDTH
build_flags_512k = ${common.build_flags} -Wl,-Teagle.flash.512k0m1s.ld
build_flags_1m0m = ${common.build_flags} -Wl,-Teagle.flash.1m0m1s.ld
build_flags_2m1m = ${common.build_flags} -Wl,-Teagle.flash.2m1m4s.ld


Loading…
Cancel
Save