Browse Source

Helper methods to aggregate MQTT messages in JSON format

fastled
Xose Pérez 7 years ago
parent
commit
4c5d8d804b
4 changed files with 88 additions and 14 deletions
  1. +3
    -0
      code/espurna/config/general.h
  2. +14
    -12
      code/espurna/espurna.ino
  3. +65
    -0
      code/espurna/mqtt.ino
  4. +6
    -2
      code/espurna/relay.ino

+ 3
- 0
code/espurna/config/general.h View File

@ -201,6 +201,9 @@ PROGMEM const char* const custom_reset_string[] = {
#define MQTT_SKIP_RETAINED 1 #define MQTT_SKIP_RETAINED 1
#define MQTT_SKIP_TIME 1000 #define MQTT_SKIP_TIME 1000
#define MQTT_USE_JSON 1
#define MQTT_TOPIC_JSON "data"
#define MQTT_TOPIC_ACTION "action" #define MQTT_TOPIC_ACTION "action"
#define MQTT_TOPIC_RELAY "relay" #define MQTT_TOPIC_RELAY "relay"
#define MQTT_TOPIC_LED "led" #define MQTT_TOPIC_LED "led"


+ 14
- 12
code/espurna/espurna.ino View File

@ -53,34 +53,34 @@ void heartbeat() {
#if (MQTT_REPORT_INTERVAL) #if (MQTT_REPORT_INTERVAL)
mqttSend(MQTT_TOPIC_INTERVAL, HEARTBEAT_INTERVAL / 1000);
mqttAppend(MQTT_TOPIC_INTERVAL, HEARTBEAT_INTERVAL / 1000);
#endif #endif
#if (MQTT_REPORT_APP) #if (MQTT_REPORT_APP)
mqttSend(MQTT_TOPIC_APP, APP_NAME);
mqttAppend(MQTT_TOPIC_APP, APP_NAME);
#endif #endif
#if (MQTT_REPORT_VERSION) #if (MQTT_REPORT_VERSION)
mqttSend(MQTT_TOPIC_VERSION, APP_VERSION);
mqttAppend(MQTT_TOPIC_VERSION, APP_VERSION);
#endif #endif
#if (MQTT_REPORT_HOSTNAME) #if (MQTT_REPORT_HOSTNAME)
mqttSend(MQTT_TOPIC_HOSTNAME, getSetting("hostname").c_str());
//mqttAppend(MQTT_TOPIC_HOSTNAME, getSetting("hostname").c_str());
#endif #endif
#if (MQTT_REPORT_IP) #if (MQTT_REPORT_IP)
mqttSend(MQTT_TOPIC_IP, getIP().c_str());
mqttAppend(MQTT_TOPIC_IP, getIP().c_str());
#endif #endif
#if (MQTT_REPORT_MAC) #if (MQTT_REPORT_MAC)
mqttSend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str());
mqttAppend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str());
#endif #endif
#if (MQTT_REPORT_RSSI) #if (MQTT_REPORT_RSSI)
mqttSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str());
mqttAppend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str());
#endif #endif
#if (MQTT_REPORT_UPTIME) #if (MQTT_REPORT_UPTIME)
mqttSend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str());
mqttAppend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str());
#if ENABLE_INFLUXDB #if ENABLE_INFLUXDB
influxDBSend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str()); influxDBSend(MQTT_TOPIC_UPTIME, String(uptime_seconds).c_str());
#endif #endif
#endif #endif
#if (MQTT_REPORT_FREEHEAP) #if (MQTT_REPORT_FREEHEAP)
mqttSend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str());
mqttAppend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str());
#if ENABLE_INFLUXDB #if ENABLE_INFLUXDB
influxDBSend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str()); influxDBSend(MQTT_TOPIC_FREEHEAP, String(free_heap).c_str());
#endif #endif
@ -90,18 +90,20 @@ void heartbeat() {
#endif #endif
#if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE #if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE
#if (MQTT_REPORT_COLOR) #if (MQTT_REPORT_COLOR)
mqttSend(MQTT_TOPIC_COLOR, lightColor().c_str());
mqttAppend(MQTT_TOPIC_COLOR, lightColor().c_str());
#endif #endif
#endif #endif
#if (MQTT_REPORT_VCC) #if (MQTT_REPORT_VCC)
#if ENABLE_ADC_VCC #if ENABLE_ADC_VCC
mqttSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str());
mqttAppend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str());
#endif #endif
#endif #endif
#if (MQTT_REPORT_STATUS) #if (MQTT_REPORT_STATUS)
mqttSend(MQTT_TOPIC_STATUS, MQTT_STATUS_ONLINE);
mqttAppend(MQTT_TOPIC_STATUS, MQTT_STATUS_ONLINE);
#endif #endif
mqttSend();
} }
void customReset(unsigned char status) { void customReset(unsigned char status) {


+ 65
- 0
code/espurna/mqtt.ino View File

@ -7,6 +7,7 @@ Copyright (C) 2016-2017 by Xose Pérez <xose dot perez at gmail dot com>
*/ */
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include <ArduinoJson.h>
#include <vector> #include <vector>
const char *mqtt_user = 0; const char *mqtt_user = 0;
@ -31,6 +32,13 @@ std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
unsigned long mqttConnectedAt = 0; unsigned long mqttConnectedAt = 0;
#endif #endif
typedef struct {
char * topic;
unsigned char index;
char * message;
} mqtt_message_t;
std::vector<mqtt_message_t> _mqtt_queue;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Public API // Public API
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -80,6 +88,50 @@ void mqttSendRaw(const char * topic, const char * message) {
} }
} }
void mqttSend() {
#if MQTT_USE_JSON
DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
if (element.index < 255) {
String topic = String(element.topic) + String("/") + String(element.index);
root[topic] = element.message;
} else {
root[element.topic] = element.message;
}
}
root["time"] = NTP.getTimeDateString();
root["hostname"] = getSetting("hostname", HOSTNAME);
String output;
root.printTo(output);
String path = mqttTopic + String(MQTT_TOPIC_JSON);
mqttSendRaw(path.c_str(), output.c_str());
#else
String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER);
for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
String path = mqttTopic + String(element.topic);
if (element.index < 255) path += String ("/") + String(element.index);
path += mqttGetter;
mqttSendRaw(path.c_str(), element.message);
}
#endif
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
free(element.topic);
free(element.message);
}
_mqtt_queue.clear();
}
void mqttSend(const char * topic, const char * message) { void mqttSend(const char * topic, const char * message) {
String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER); String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER);
String path = mqttTopic + String(topic) + mqttGetter; String path = mqttTopic + String(topic) + mqttGetter;
@ -92,6 +144,19 @@ void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSendRaw(path.c_str(), message); mqttSendRaw(path.c_str(), message);
} }
unsigned int mqttAppend(const char * topic, unsigned int index, const char * message) {
mqtt_message_t element;
element.topic = strdup(topic);
element.index = index;
element.message = strdup(message);
_mqtt_queue.push_back(element);
return _mqtt_queue.size();
}
unsigned int mqttAppend(const char * topic, const char * message) {
return mqttAppend(topic, 255, message);
}
void mqttSubscribeRaw(const char * topic) { void mqttSubscribeRaw(const char * topic) {
if (mqtt.connected() && (strlen(topic) > 0)) { if (mqtt.connected() && (strlen(topic) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);


+ 6
- 2
code/espurna/relay.ino View File

@ -381,7 +381,7 @@ void relayDomoticzSetup() {
void relayMQTT(unsigned char id) { void relayMQTT(unsigned char id) {
if (id >= _relays.size()) return; if (id >= _relays.size()) return;
mqttSend(MQTT_TOPIC_RELAY, id, relayStatus(id) ? "1" : "0");
mqttAppend(MQTT_TOPIC_RELAY, id, relayStatus(id) ? "1" : "0");
} }
#if ENABLE_INFLUXDB #if ENABLE_INFLUXDB
@ -405,6 +405,7 @@ void relayMQTTCallback(unsigned int type, const char * topic, const char * paylo
#if not MQTT_REPORT_RELAY #if not MQTT_REPORT_RELAY
relayMQTT(); relayMQTT();
mqttSend();
#endif #endif
char buffer[strlen(MQTT_TOPIC_RELAY) + 3]; char buffer[strlen(MQTT_TOPIC_RELAY) + 3];
@ -523,7 +524,10 @@ void relayLoop(void) {
ledStatus(_relays[id].led - 1, status); ledStatus(_relays[id].led - 1, status);
} }
if (_relays[id].scheduledReport) relayMQTT(id);
if (_relays[id].scheduledReport) {
relayMQTT(id);
mqttSend();
}
if (!recursive) { if (!recursive) {
relayPulse(id); relayPulse(id);
relaySync(id); relaySync(id);


Loading…
Cancel
Save