From 4c5d8d804b49e195fe1044da9cc89f8f239915b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xose=20P=C3=A9rez?= Date: Wed, 19 Jul 2017 01:51:54 +0200 Subject: [PATCH] Helper methods to aggregate MQTT messages in JSON format --- code/espurna/config/general.h | 3 ++ code/espurna/espurna.ino | 26 +++++++------- code/espurna/mqtt.ino | 65 +++++++++++++++++++++++++++++++++++ code/espurna/relay.ino | 8 +++-- 4 files changed, 88 insertions(+), 14 deletions(-) diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index 432c1e51..89e35bf7 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -201,6 +201,9 @@ PROGMEM const char* const custom_reset_string[] = { #define MQTT_SKIP_RETAINED 1 #define MQTT_SKIP_TIME 1000 +#define MQTT_USE_JSON 1 + +#define MQTT_TOPIC_JSON "data" #define MQTT_TOPIC_ACTION "action" #define MQTT_TOPIC_RELAY "relay" #define MQTT_TOPIC_LED "led" diff --git a/code/espurna/espurna.ino b/code/espurna/espurna.ino index 3302ddf8..62725a1b 100644 --- a/code/espurna/espurna.ino +++ b/code/espurna/espurna.ino @@ -53,34 +53,34 @@ void heartbeat() { #if (MQTT_REPORT_INTERVAL) - mqttSend(MQTT_TOPIC_INTERVAL, HEARTBEAT_INTERVAL / 1000); + mqttAppend(MQTT_TOPIC_INTERVAL, HEARTBEAT_INTERVAL / 1000); #endif #if (MQTT_REPORT_APP) - mqttSend(MQTT_TOPIC_APP, APP_NAME); + mqttAppend(MQTT_TOPIC_APP, APP_NAME); #endif #if (MQTT_REPORT_VERSION) - mqttSend(MQTT_TOPIC_VERSION, APP_VERSION); + mqttAppend(MQTT_TOPIC_VERSION, APP_VERSION); #endif #if (MQTT_REPORT_HOSTNAME) - mqttSend(MQTT_TOPIC_HOSTNAME, getSetting("hostname").c_str()); + //mqttAppend(MQTT_TOPIC_HOSTNAME, getSetting("hostname").c_str()); #endif #if (MQTT_REPORT_IP) - mqttSend(MQTT_TOPIC_IP, getIP().c_str()); + mqttAppend(MQTT_TOPIC_IP, getIP().c_str()); #endif #if (MQTT_REPORT_MAC) - mqttSend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str()); + mqttAppend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str()); #endif #if (MQTT_REPORT_RSSI) - mqttSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str()); + mqttAppend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str()); #endif #if (MQTT_REPORT_UPTIME) - mqttSend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str()); + mqttAppend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str()); #if ENABLE_INFLUXDB influxDBSend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str()); #endif #endif #if (MQTT_REPORT_FREEHEAP) - mqttSend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str()); + mqttAppend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str()); #if ENABLE_INFLUXDB influxDBSend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str()); #endif @@ -90,18 +90,20 @@ void heartbeat() { #endif #if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE #if (MQTT_REPORT_COLOR) - mqttSend(MQTT_TOPIC_COLOR, lightColor().c_str()); + mqttAppend(MQTT_TOPIC_COLOR, lightColor().c_str()); #endif #endif #if (MQTT_REPORT_VCC) #if ENABLE_ADC_VCC - mqttSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str()); + mqttAppend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str()); #endif #endif #if (MQTT_REPORT_STATUS) - mqttSend(MQTT_TOPIC_STATUS, MQTT_STATUS_ONLINE); + mqttAppend(MQTT_TOPIC_STATUS, MQTT_STATUS_ONLINE); #endif + mqttSend(); + } void customReset(unsigned char status) { diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index 67ba5a86..8d105001 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -7,6 +7,7 @@ Copyright (C) 2016-2017 by Xose PĂ©rez */ #include +#include #include const char *mqtt_user = 0; @@ -31,6 +32,13 @@ std::vector _mqtt_callbacks; unsigned long mqttConnectedAt = 0; #endif +typedef struct { + char * topic; + unsigned char index; + char * message; +} mqtt_message_t; +std::vector _mqtt_queue; + // ----------------------------------------------------------------------------- // Public API // ----------------------------------------------------------------------------- @@ -80,6 +88,50 @@ void mqttSendRaw(const char * topic, const char * message) { } } +void mqttSend() { + + #if MQTT_USE_JSON + + DynamicJsonBuffer jsonBuffer; + JsonObject& root = jsonBuffer.createObject(); + for (unsigned char i=0; i<_mqtt_queue.size(); i++) { + mqtt_message_t element = _mqtt_queue[i]; + if (element.index < 255) { + String topic = String(element.topic) + String("/") + String(element.index); + root[topic] = element.message; + } else { + root[element.topic] = element.message; + } + } + + root["time"] = NTP.getTimeDateString(); + root["hostname"] = getSetting("hostname", HOSTNAME); + + String output; + root.printTo(output); + String path = mqttTopic + String(MQTT_TOPIC_JSON); + mqttSendRaw(path.c_str(), output.c_str()); + + #else + String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER); + for (unsigned char i=0; i<_mqtt_queue.size(); i++) { + mqtt_message_t element = _mqtt_queue[i]; + String path = mqttTopic + String(element.topic); + if (element.index < 255) path += String ("/") + String(element.index); + path += mqttGetter; + mqttSendRaw(path.c_str(), element.message); + } + #endif + + for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { + mqtt_message_t element = _mqtt_queue[i]; + free(element.topic); + free(element.message); + } + _mqtt_queue.clear(); + +} + void mqttSend(const char * topic, const char * message) { String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER); String path = mqttTopic + String(topic) + mqttGetter; @@ -92,6 +144,19 @@ void mqttSend(const char * topic, unsigned int index, const char * message) { mqttSendRaw(path.c_str(), message); } +unsigned int mqttAppend(const char * topic, unsigned int index, const char * message) { + mqtt_message_t element; + element.topic = strdup(topic); + element.index = index; + element.message = strdup(message); + _mqtt_queue.push_back(element); + return _mqtt_queue.size(); +} + +unsigned int mqttAppend(const char * topic, const char * message) { + return mqttAppend(topic, 255, message); +} + void mqttSubscribeRaw(const char * topic) { if (mqtt.connected() && (strlen(topic) > 0)) { DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); diff --git a/code/espurna/relay.ino b/code/espurna/relay.ino index 810bead1..f1b24228 100644 --- a/code/espurna/relay.ino +++ b/code/espurna/relay.ino @@ -381,7 +381,7 @@ void relayDomoticzSetup() { void relayMQTT(unsigned char id) { if (id >= _relays.size()) return; - mqttSend(MQTT_TOPIC_RELAY, id, relayStatus(id) ? "1" : "0"); + mqttAppend(MQTT_TOPIC_RELAY, id, relayStatus(id) ? "1" : "0"); } #if ENABLE_INFLUXDB @@ -405,6 +405,7 @@ void relayMQTTCallback(unsigned int type, const char * topic, const char * paylo #if not MQTT_REPORT_RELAY relayMQTT(); + mqttSend(); #endif char buffer[strlen(MQTT_TOPIC_RELAY) + 3]; @@ -523,7 +524,10 @@ void relayLoop(void) { ledStatus(_relays[id].led - 1, status); } - if (_relays[id].scheduledReport) relayMQTT(id); + if (_relays[id].scheduledReport) { + relayMQTT(id); + mqttSend(); + } if (!recursive) { relayPulse(id); relaySync(id);