From 0062f01ffddc59b9838583a4fc8573340ae207b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xose=20P=C3=A9rez?= Date: Thu, 22 Mar 2018 22:00:12 +0100 Subject: [PATCH] Added tree structure to mqttQueue --- code/espurna/mqtt.ino | 141 ++++++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 54 deletions(-) diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index 0e6d1d3a..722897b8 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -55,8 +55,9 @@ unsigned long _mqtt_connected_at = 0; std::vector _mqtt_callbacks; typedef struct { + char parent = -1; char * topic; - char * message; + char * message = NULL; } mqtt_message_t; std::vector _mqtt_queue; Ticker _mqtt_flush_ticker; @@ -502,20 +503,85 @@ void mqttSendRaw(const char * topic, const char * message) { mqttSendRaw (topic, message, _mqtt_retain); } -void mqttFlush() { +void mqttSend(const char * topic, const char * message, bool force, bool retain) { - if (!_mqtt.connected()) return; - if (_mqtt_queue.size() == 0) return; + bool useJson = force ? false : _mqtt_use_json; - DynamicJsonBuffer jsonBuffer; - JsonObject& root = jsonBuffer.createObject(); + // Equeue message + if (useJson) { + + // Set default queue topic + mqttQueueTopic(MQTT_TOPIC_JSON); + + // Enqueue new message + mqttEnqueue(topic, message); + + // Reset flush timer + _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush); + + // Send it right away + } else { + mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain); + + } + +} + +void mqttSend(const char * topic, const char * message, bool force) { + mqttSend(topic, message, force, _mqtt_retain); +} + +void mqttSend(const char * topic, const char * message) { + mqttSend(topic, message, false); +} + +void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) { + char buffer[strlen(topic)+5]; + snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); + mqttSend(buffer, message, force, retain); +} + +void mqttSend(const char * topic, unsigned int index, const char * message, bool force) { + mqttSend(topic, index, message, force, _mqtt_retain); +} + +void mqttSend(const char * topic, unsigned int index, const char * message) { + mqttSend(topic, index, message, false); +} + +// ----------------------------------------------------------------------------- + +unsigned char _mqttBuildTree(JsonObject& root, int parent = -1) { + + unsigned char count = 0; // Add enqueued messages for (unsigned char i=0; i<_mqtt_queue.size(); i++) { mqtt_message_t element = _mqtt_queue[i]; - root[element.topic] = element.message; + if (element.parent == parent) { + ++count; + JsonObject& elements = root.createNestedObject(element.topic); + unsigned char num = _mqttBuildTree(elements, i); + if (0 == num) { + root.set(element.topic, element.message); + } + } } + return count; + +} + +void mqttFlush() { + + if (!_mqtt.connected()) return; + if (_mqtt_queue.size() == 0) return; + + // Build tree recursively + DynamicJsonBuffer jsonBuffer; + JsonObject& root = jsonBuffer.createObject(); + _mqttBuildTree(root); + // Add extra propeties #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME if (ntpSynced()) root[MQTT_TOPIC_TIME] = ntpDateTime(); @@ -542,7 +608,9 @@ void mqttFlush() { for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { mqtt_message_t element = _mqtt_queue[i]; free(element.topic); - free(element.message); + if (element.message) { + free(element.message); + } } _mqtt_queue.clear(); @@ -556,67 +624,32 @@ void mqttQueueTopic(const char * topic) { } } -void mqttEnqueue(const char * topic, const char * message) { +int8_t mqttEnqueue(const char * topic, const char * message, int8_t parent) { // Queue is not meant to send message "offline" // We must prevent the queue does not get full while offline - if (!_mqtt.connected()) return; + if (!_mqtt.connected()) return -1; // Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush(); + int8_t index = _mqtt_queue.size(); + // Enqueue new message mqtt_message_t element; + element.parent = parent; element.topic = strdup(topic); - element.message = strdup(message); - _mqtt_queue.push_back(element); - -} - -void mqttSend(const char * topic, const char * message, bool force, bool retain) { - - bool useJson = force ? false : _mqtt_use_json; - - // Equeue message - if (useJson) { - - // Set default queue topic - mqttQueueTopic(MQTT_TOPIC_JSON); - - // Enqueue new message - mqttEnqueue(topic, message); - - // Reset flush timer - _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush); - - // Send it right away - } else { - mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain); - + if (NULL != message) { + element.message = strdup(message); } + _mqtt_queue.push_back(element); -} - -void mqttSend(const char * topic, const char * message, bool force) { - mqttSend(topic, message, force, _mqtt_retain); -} - -void mqttSend(const char * topic, const char * message) { - mqttSend(topic, message, false); -} - -void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) { - char buffer[strlen(topic)+5]; - snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); - mqttSend(buffer, message, force, retain); -} + return index; -void mqttSend(const char * topic, unsigned int index, const char * message, bool force) { - mqttSend(topic, index, message, force, _mqtt_retain); } -void mqttSend(const char * topic, unsigned int index, const char * message) { - mqttSend(topic, index, message, false); +int8_t mqttEnqueue(const char * topic, const char * message) { + return mqttEnqueue(topic, message, -1); } // -----------------------------------------------------------------------------