From 2be5ff3e47c9414754a70efcf3f6541bf1cdcdfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xose=20P=C3=A9rez?= Date: Fri, 19 Jan 2018 10:18:30 +0100 Subject: [PATCH] Add MQTT message ID when in json mode --- code/espurna/config/general.h | 9 +++++-- code/espurna/mqtt.ino | 51 +++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index 33a30844..e90759a8 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -143,7 +143,8 @@ #define EEPROM_ENERGY_COUNT 1 // Address for the energy counter (4 bytes) #define EEPROM_CUSTOM_RESET 5 // Address for the reset reason (1 byte) #define EEPROM_CRASH_COUNTER 6 // Address for the crash counter (1 byte) -#define EEPROM_DATA_END 7 // End of custom EEPROM data block +#define EEPROM_MESSAGE_ID 7 // Address for the MQTT message id (4 bytes) +#define EEPROM_DATA_END 11 // End of custom EEPROM data block //------------------------------------------------------------------------------ // HEARTBEAT @@ -490,11 +491,12 @@ PROGMEM const char* const custom_reset_string[] = { #define MQTT_USE_JSON_DELAY 100 // Wait this many ms before grouping messages #define MQTT_QUEUE_MAX_SIZE 10 // Size of the MQTT queue when MQTT_USE_JSON is enabled -// These are the properties that will be send when useJson is true +// These are the properties that will be sent when useJson is true #define MQTT_ENQUEUE_IP 1 #define MQTT_ENQUEUE_MAC 1 #define MQTT_ENQUEUE_HOSTNAME 1 #define MQTT_ENQUEUE_DATETIME 1 +#define MQTT_ENQUEUE_MESSAGE_ID 1 // These particles will be concatenated to the MQTT_TOPIC base to form the actual topic #define MQTT_TOPIC_JSON "data" @@ -511,6 +513,7 @@ PROGMEM const char* const custom_reset_string[] = { #define MQTT_TOPIC_STATUS "status" #define MQTT_TOPIC_MAC "mac" #define MQTT_TOPIC_RSSI "rssi" +#define MQTT_TOPIC_MESSAGE_ID "id" #define MQTT_TOPIC_APP "app" #define MQTT_TOPIC_INTERVAL "interval" #define MQTT_TOPIC_HOSTNAME "host" @@ -541,6 +544,8 @@ PROGMEM const char* const custom_reset_string[] = { #define MQTT_DISCONNECT_EVENT 1 #define MQTT_MESSAGE_EVENT 2 +#define MQTT_MESSAGE_ID_SHIFT 1000 // Store MQTT message id into EEPROM every these many + // Custom get and set postfixes // Use something like "/status" or "/set", with leading slash // Since 1.9.0 the default value is "" for getter and "/set" for setter diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index 4661d899..7f33fbb3 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -8,6 +8,7 @@ Copyright (C) 2016-2018 by Xose PĂ©rez #if MQTT_SUPPORT +#include #include #include #include @@ -244,6 +245,48 @@ void _mqttConfigure() { } +unsigned long _mqttNextMessageId() { + + static unsigned long id = 0; + + // just reboot, get last count from EEPROM + if (id == 0) { + + // read id from EEPROM and shift it + id = EEPROM.read(EEPROM_MESSAGE_ID); + if (id == 0xFF) { + + // There was nothing in EEPROM, + // next message is first message + id = 0; + + } else { + + id = (id << 8) + EEPROM.read(EEPROM_MESSAGE_ID + 1); + id = (id << 8) + EEPROM.read(EEPROM_MESSAGE_ID + 2); + id = (id << 8) + EEPROM.read(EEPROM_MESSAGE_ID + 3); + + // Calculate next block and start from there + id = MQTT_MESSAGE_ID_SHIFT * (1 + (id / MQTT_MESSAGE_ID_SHIFT)); + + } + + } + + // Save to EEPROM every MQTT_MESSAGE_ID_SHIFT + if (id % MQTT_MESSAGE_ID_SHIFT == 0) { + EEPROM.write(EEPROM_MESSAGE_ID + 0, (id >> 24) & 0xFF); + EEPROM.write(EEPROM_MESSAGE_ID + 1, (id >> 16) & 0xFF); + EEPROM.write(EEPROM_MESSAGE_ID + 2, (id >> 8) & 0xFF); + EEPROM.write(EEPROM_MESSAGE_ID + 3, (id >> 0) & 0xFF); + EEPROM.commit(); + } + + id++; + return id; + +} + // ----------------------------------------------------------------------------- // WEB // ----------------------------------------------------------------------------- @@ -428,6 +471,7 @@ void mqttSendRaw(const char * topic, const char * message) { void mqttFlush() { + if (!_mqtt.connected()) return; if (_mqtt_queue.size() == 0) return; DynamicJsonBuffer jsonBuffer; @@ -452,6 +496,9 @@ void mqttFlush() { #if MQTT_ENQUEUE_IP root[MQTT_TOPIC_IP] = getIP(); #endif + #if MQTT_ENQUEUE_MESSAGE_ID + root[MQTT_TOPIC_MESSAGE_ID] = _mqttNextMessageId(); + #endif // Send String output; @@ -478,6 +525,10 @@ void mqttQueueTopic(const char * topic) { void mqttEnqueue(const char * topic, const char * message) { + // Queue is not meant to send message "offline" + // We must prevent the queue does not get full while offline + if (!_mqtt.connected()) return; + // Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();