Browse Source

Added tree structure to mqttQueue

rfm69
Xose Pérez 6 years ago
parent
commit
0062f01ffd
1 changed files with 87 additions and 54 deletions
  1. +87
    -54
      code/espurna/mqtt.ino

+ 87
- 54
code/espurna/mqtt.ino View File

@ -55,8 +55,9 @@ unsigned long _mqtt_connected_at = 0;
std::vector<mqtt_callback_f> _mqtt_callbacks;
typedef struct {
char parent = -1;
char * topic;
char * message;
char * message = NULL;
} mqtt_message_t;
std::vector<mqtt_message_t> _mqtt_queue;
Ticker _mqtt_flush_ticker;
@ -502,20 +503,85 @@ void mqttSendRaw(const char * topic, const char * message) {
mqttSendRaw (topic, message, _mqtt_retain);
}
void mqttFlush() {
void mqttSend(const char * topic, const char * message, bool force, bool retain) {
if (!_mqtt.connected()) return;
if (_mqtt_queue.size() == 0) return;
bool useJson = force ? false : _mqtt_use_json;
DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
// Equeue message
if (useJson) {
// Set default queue topic
mqttQueueTopic(MQTT_TOPIC_JSON);
// Enqueue new message
mqttEnqueue(topic, message);
// Reset flush timer
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
// Send it right away
} else {
mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
}
}
void mqttSend(const char * topic, const char * message, bool force) {
mqttSend(topic, message, force, _mqtt_retain);
}
void mqttSend(const char * topic, const char * message) {
mqttSend(topic, message, false);
}
void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
mqttSend(buffer, message, force, retain);
}
void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
mqttSend(topic, index, message, force, _mqtt_retain);
}
void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSend(topic, index, message, false);
}
// -----------------------------------------------------------------------------
unsigned char _mqttBuildTree(JsonObject& root, int parent = -1) {
unsigned char count = 0;
// Add enqueued messages
for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
root[element.topic] = element.message;
if (element.parent == parent) {
++count;
JsonObject& elements = root.createNestedObject(element.topic);
unsigned char num = _mqttBuildTree(elements, i);
if (0 == num) {
root.set(element.topic, element.message);
}
}
}
return count;
}
void mqttFlush() {
if (!_mqtt.connected()) return;
if (_mqtt_queue.size() == 0) return;
// Build tree recursively
DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
_mqttBuildTree(root);
// Add extra propeties
#if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
if (ntpSynced()) root[MQTT_TOPIC_TIME] = ntpDateTime();
@ -542,7 +608,9 @@ void mqttFlush() {
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
free(element.topic);
free(element.message);
if (element.message) {
free(element.message);
}
}
_mqtt_queue.clear();
@ -556,67 +624,32 @@ void mqttQueueTopic(const char * topic) {
}
}
void mqttEnqueue(const char * topic, const char * message) {
int8_t mqttEnqueue(const char * topic, const char * message, int8_t parent) {
// Queue is not meant to send message "offline"
// We must prevent the queue does not get full while offline
if (!_mqtt.connected()) return;
if (!_mqtt.connected()) return -1;
// Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached
if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();
int8_t index = _mqtt_queue.size();
// Enqueue new message
mqtt_message_t element;
element.parent = parent;
element.topic = strdup(topic);
element.message = strdup(message);
_mqtt_queue.push_back(element);
}
void mqttSend(const char * topic, const char * message, bool force, bool retain) {
bool useJson = force ? false : _mqtt_use_json;
// Equeue message
if (useJson) {
// Set default queue topic
mqttQueueTopic(MQTT_TOPIC_JSON);
// Enqueue new message
mqttEnqueue(topic, message);
// Reset flush timer
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
// Send it right away
} else {
mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
if (NULL != message) {
element.message = strdup(message);
}
_mqtt_queue.push_back(element);
}
void mqttSend(const char * topic, const char * message, bool force) {
mqttSend(topic, message, force, _mqtt_retain);
}
void mqttSend(const char * topic, const char * message) {
mqttSend(topic, message, false);
}
void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
mqttSend(buffer, message, force, retain);
}
return index;
void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
mqttSend(topic, index, message, force, _mqtt_retain);
}
void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSend(topic, index, message, false);
int8_t mqttEnqueue(const char * topic, const char * message) {
return mqttEnqueue(topic, message, -1);
}
// -----------------------------------------------------------------------------


Loading…
Cancel
Save