From c5db2758de82233ce79f8e762dfa6b6ea0651621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xose=20P=C3=A9rez?= Date: Mon, 15 Jan 2018 16:37:02 +0100 Subject: [PATCH] New mqttEnqueue API --- code/espurna/config/general.h | 5 + code/espurna/mqtt.ino | 171 ++++++++++++++++++++++------------ 2 files changed, 117 insertions(+), 59 deletions(-) diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index e8b5c911..fb97335e 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -479,6 +479,11 @@ PROGMEM const char* const custom_reset_string[] = { #define MQTT_USE_JSON_DELAY 100 // Wait this many ms before grouping messages #define MQTT_QUEUE_MAX_SIZE 10 // Size of the MQTT queue when MQTT_USE_JSON is enabled +// These are the properties that will be send when useJson is true +#define MQTT_ENQUEUE_IP 1 +#define MQTT_ENQUEUE_MAC 1 +#define MQTT_ENQUEUE_HOSTNAME 1 +#define MQTT_ENQUEUE_DATETIME 1 // These particles will be concatenated to the MQTT_TOPIC base to form the actual topic #define MQTT_TOPIC_JSON "data" diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index d038eaea..90de7202 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -39,6 +39,7 @@ unsigned char _mqtt_qos = MQTT_QOS; bool _mqtt_retain = MQTT_RETAIN; unsigned char _mqtt_keepalive = MQTT_KEEPALIVE; String _mqtt_topic; +String _mqtt_topic_json; String _mqtt_setter; String _mqtt_getter; bool _mqtt_forward; @@ -63,35 +64,6 @@ Ticker _mqtt_flush_ticker; // Private // ----------------------------------------------------------------------------- -void _mqttFlush() { - - if (_mqtt_queue.size() == 0) return; - - DynamicJsonBuffer jsonBuffer; - JsonObject& root = jsonBuffer.createObject(); - for (unsigned char i=0; i<_mqtt_queue.size(); i++) { - mqtt_message_t element = _mqtt_queue[i]; - root[element.topic] = element.message; - } - #if NTP_SUPPORT - if (ntpConnected()) root[MQTT_TOPIC_TIME] = ntpDateTime(); - #endif - root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname"); - root[MQTT_TOPIC_IP] = getIP(); - - String output; - root.printTo(output); - mqttSendRaw(mqttTopic(MQTT_TOPIC_JSON, false).c_str(), output.c_str()); - - for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { - mqtt_message_t element = _mqtt_queue[i]; - free(element.topic); - free(element.message); - } - _mqtt_queue.clear(); - -} - void _mqttConnect() { // Do not connect if disabled @@ -266,6 +238,7 @@ void _mqttConfigure() { _mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1; } _mqtt_use_json = (getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1); + mqttQueueTopic(MQTT_TOPIC_JSON); _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; @@ -406,30 +379,6 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) { // Public API // ----------------------------------------------------------------------------- -void mqttEnabled(bool status) { - _mqtt_enabled = status; - setSetting("mqttEnabled", status ? 1 : 0); -} - -bool mqttEnabled() { - return _mqtt_enabled; -} - -bool mqttConnected() { - return _mqtt.connected(); -} - -void mqttDisconnect() { - if (_mqtt.connected()) { - DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n")); - _mqtt.disconnect(); - } -} - -bool mqttForward() { - return _mqtt_forward; -} - String mqttTopicKey(char * topic) { String pattern = _mqtt_topic + _mqtt_setter; @@ -463,6 +412,8 @@ String mqttTopic(const char * topic, unsigned int index, bool is_set) { return mqttTopic(buffer, is_set); } +// ----------------------------------------------------------------------------- + void mqttSendRaw(const char * topic, const char * message) { if (_mqtt.connected()) { #if MQTT_USE_ASYNC @@ -475,21 +426,91 @@ void mqttSendRaw(const char * topic, const char * message) { } } +void mqttFlush() { + + if (_mqtt_queue.size() == 0) return; + + DynamicJsonBuffer jsonBuffer; + JsonObject& root = jsonBuffer.createObject(); + + // Add enqueued messages + for (unsigned char i=0; i<_mqtt_queue.size(); i++) { + mqtt_message_t element = _mqtt_queue[i]; + root[element.topic] = element.message; + } + + // Add extra propeties + #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME + if (ntpConnected()) root[MQTT_TOPIC_TIME] = ntpDateTime(); + #endif + #if MQTT_ENQUEUE_MAC + root[MQTT_TOPIC_MAC] = WiFi.macAddress(); + #endif + #if MQTT_ENQUEUE_HOSTNAME + root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname"); + #endif + #if MQTT_ENQUEUE_IP + root[MQTT_TOPIC_IP] = getIP(); + #endif + + // Send + String output; + root.printTo(output); + mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str()); + + // Clear queue + for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { + mqtt_message_t element = _mqtt_queue[i]; + free(element.topic); + free(element.message); + } + _mqtt_queue.clear(); + +} + +void mqttQueueTopic(const char * topic) { + String t = mqttTopic(topic, false); + if (!t.equals(_mqtt_topic_json)) { + mqttFlush(); + _mqtt_topic_json = t; + } +} + +void mqttEnqueue(const char * topic, const char * message) { + + // Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached + if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush(); + + // Enqueue new message + mqtt_message_t element; + element.topic = strdup(topic); + element.message = strdup(message); + _mqtt_queue.push_back(element); + +} + void mqttSend(const char * topic, const char * message, bool force) { + bool useJson = force ? false : _mqtt_use_json; + + // Equeue message if (useJson) { - if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) _mqttFlush(); + // Set default queue topic + mqttQueueTopic(MQTT_TOPIC_JSON); - mqtt_message_t element; - element.topic = strdup(topic); - element.message = strdup(message); - _mqtt_queue.push_back(element); - _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); + // Enqueue new message + mqttEnqueue(topic, message); + // Reset flush timer + _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush); + + // Send it right away } else { mqttSendRaw(mqttTopic(topic, false).c_str(), message); + } + } void mqttSend(const char * topic, const char * message) { @@ -506,6 +527,8 @@ void mqttSend(const char * topic, unsigned int index, const char * message) { mqttSend(topic, index, message, false); } +// ----------------------------------------------------------------------------- + void mqttSubscribeRaw(const char * topic) { if (_mqtt.connected() && (strlen(topic) > 0)) { #if MQTT_USE_ASYNC @@ -534,6 +557,36 @@ void mqttUnsubscribeRaw(const char * topic) { } } +void mqttUnsubscribe(const char * topic) { + mqttUnsubscribeRaw(mqttTopic(topic, true).c_str()); +} + +// ----------------------------------------------------------------------------- + +void mqttEnabled(bool status) { + _mqtt_enabled = status; + setSetting("mqttEnabled", status ? 1 : 0); +} + +bool mqttEnabled() { + return _mqtt_enabled; +} + +bool mqttConnected() { + return _mqtt.connected(); +} + +void mqttDisconnect() { + if (_mqtt.connected()) { + DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n")); + _mqtt.disconnect(); + } +} + +bool mqttForward() { + return _mqtt_forward; +} + void mqttRegister(mqtt_callback_f callback) { _mqtt_callbacks.push_back(callback); }