Browse Source

mqtt: clean-up json queueing

- simply use a basic {String, String} struct
- fix MQTT_TOPIC_TIME vs. MQTT_TOPIC_DATETIME
- get rid of unused chaining and indexes
mcspr-patch-1
Maxim Prokhorov 4 years ago
parent
commit
7ba1283024
3 changed files with 69 additions and 128 deletions
  1. +3
    -3
      code/espurna/homeassistant.cpp
  2. +64
    -122
      code/espurna/mqtt.cpp
  3. +2
    -3
      code/espurna/mqtt.h

+ 3
- 3
code/espurna/homeassistant.cpp View File

@ -133,13 +133,13 @@ struct ha_discovery_t {
}
void add(String& topic, String& message) {
auto msg = mqtt_msg_t { std::move(topic), std::move(message) };
auto msg = MqttMessage { std::move(topic), std::move(message) };
_messages.push_back(std::move(msg));
}
// We don't particulary care about the order since names have indexes?
// If we ever do, use iterators to reference elems and pop the String contents instead
mqtt_msg_t& next() {
MqttMessage& next() {
return _messages.back();
}
@ -162,7 +162,7 @@ struct ha_discovery_t {
#endif
Ticker timer;
std::vector<mqtt_msg_t> _messages;
std::vector<MqttMessage> _messages;
unsigned char _retry;
};


+ 64
- 122
code/espurna/mqtt.cpp View File

@ -11,7 +11,6 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#if MQTT_SUPPORT
#include <vector>
#include <forward_list>
#include <utility>
#include <Ticker.h>
@ -102,13 +101,8 @@ String _mqtt_payload_offline;
std::forward_list<mqtt_callback_f> _mqtt_callbacks;
struct mqtt_message_t {
static const unsigned char END = 255;
unsigned char parent = END;
char * topic;
char * message = NULL;
};
std::vector<mqtt_message_t> _mqtt_queue;
int8_t _mqtt_queue_count { 0u };
std::forward_list<MqttMessage> _mqtt_queue;
Ticker _mqtt_flush_ticker;
// -----------------------------------------------------------------------------
@ -785,54 +779,45 @@ String mqttTopic(const char * magnitude, unsigned int index, bool is_set) {
// -----------------------------------------------------------------------------
bool mqttSendRaw(const char * topic, const char * message, bool retain) {
constexpr size_t MessageLogMax { 128ul };
if (!_mqtt.connected()) return false;
const unsigned int packetId(
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
if (_mqtt.connected()) {
const unsigned int packetId {
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
_mqtt.publish(topic, _mqtt_qos, retain, message)
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
_mqtt.publish(topic, message, retain, _mqtt_qos)
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
_mqtt.publish(topic, message, retain)
#endif
);
#endif
};
const size_t message_len = strlen(message);
if (message_len > 128) {
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, message_len, packetId);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
}
const size_t message_len = strlen(message);
if (message_len > MessageLogMax) {
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, message_len, packetId);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
}
return (packetId > 0);
return (packetId > 0);
}
return false;
}
bool mqttSendRaw(const char * topic, const char * message) {
return mqttSendRaw (topic, message, _mqtt_retain);
return mqttSendRaw(topic, message, _mqtt_retain);
}
void mqttSend(const char * topic, const char * message, bool force, bool retain) {
bool useJson = force ? false : _mqtt_use_json;
// Equeue message
if (useJson) {
// Enqueue new message
if (!force && _mqtt_use_json) {
mqttEnqueue(topic, message);
// Reset flush timer
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
// Send it right away
} else {
mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
return;
}
mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
}
void mqttSend(const char * topic, const char * message, bool force) {
@ -859,108 +844,65 @@ void mqttSend(const char * topic, unsigned int index, const char * message) {
// -----------------------------------------------------------------------------
unsigned char _mqttBuildTree(JsonObject& root, char parent) {
unsigned char count = 0;
// Add enqueued messages
for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
if (element.parent == parent) {
++count;
JsonObject& elements = root.createNestedObject(element.topic);
unsigned char num = _mqttBuildTree(elements, i);
if (0 == num) {
if (isNumber(element.message)) {
double value = atof(element.message);
if (value == int(value)) {
root.set(element.topic, int(value));
} else {
root.set(element.topic, value);
}
} else {
root.set(element.topic, element.message);
}
}
}
}
return count;
}
constexpr size_t MqttJsonPayloadBufferSize { 1024ul };
void mqttFlush() {
if (!_mqtt.connected()) {
return;
}
if (!_mqtt.connected()) return;
if (_mqtt_queue.size() == 0) return;
if (_mqtt_queue.empty()) {
return;
}
// Build tree recursively
DynamicJsonBuffer jsonBuffer(1024);
DynamicJsonBuffer jsonBuffer(MqttJsonPayloadBufferSize);
JsonObject& root = jsonBuffer.createObject();
_mqttBuildTree(root, mqtt_message_t::END);
// Add extra propeties
#if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
if (ntpSynced()) root[MQTT_TOPIC_TIME] = ntpDateTime();
#endif
#if MQTT_ENQUEUE_MAC
root[MQTT_TOPIC_MAC] = WiFi.macAddress();
#endif
#if MQTT_ENQUEUE_HOSTNAME
root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
#endif
#if MQTT_ENQUEUE_IP
root[MQTT_TOPIC_IP] = getIP();
#endif
#if MQTT_ENQUEUE_MESSAGE_ID
root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
#endif
#if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
if (ntpSynced()) {
root[MQTT_TOPIC_DATETIME] = ntpDateTime();
}
#endif
#if MQTT_ENQUEUE_MAC
root[MQTT_TOPIC_MAC] = WiFi.macAddress();
#endif
#if MQTT_ENQUEUE_HOSTNAME
root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname", getIdentifier());
#endif
#if MQTT_ENQUEUE_IP
root[MQTT_TOPIC_IP] = getIP();
#endif
#if MQTT_ENQUEUE_MESSAGE_ID
root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
#endif
for (auto& element : _mqtt_queue) {
root[element.topic.c_str()] = element.message.c_str();
}
// Send
String output;
root.printTo(output);
jsonBuffer.clear();
mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
// Clear queue
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
free(element.topic);
if (element.message) {
free(element.message);
}
}
jsonBuffer.clear();
_mqtt_queue.clear();
_mqtt_queue_count = 0;
mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
}
int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) {
void mqttEnqueue(const char * topic, const char * message) {
// Queue is not meant to send message "offline"
// We must prevent the queue does not get full while offline
if (!_mqtt.connected()) return -1;
// Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached
if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();
int8_t index = _mqtt_queue.size();
if (_mqtt.connected()) {
if (_mqtt_queue_count >= MQTT_QUEUE_MAX_SIZE) {
mqttFlush();
}
// Enqueue new message
mqtt_message_t element;
element.parent = parent;
element.topic = strdup(topic);
if (NULL != message) {
element.message = strdup(message);
++_mqtt_queue_count;
_mqtt_queue.push_front({
topic, message
});
}
_mqtt_queue.push_back(element);
return index;
}
int8_t mqttEnqueue(const char * topic, const char * message) {
return mqttEnqueue(topic, message, mqtt_message_t::END);
}
// -----------------------------------------------------------------------------


+ 2
- 3
code/espurna/mqtt.h View File

@ -56,7 +56,7 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#define MQTT_TOPIC_CMD "cmd"
using mqtt_callback_f = std::function<void(unsigned int type, const char * topic, char * payload)>;
struct mqtt_msg_t {
struct MqttMessage {
String topic;
String message;
};
@ -83,8 +83,7 @@ void mqttSend(const char * topic, unsigned int index, const char * message);
void mqttSendStatus();
void mqttFlush();
int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent);
int8_t mqttEnqueue(const char * topic, const char * message);
void mqttEnqueue(const char* topic, const char* message);
const String& mqttPayloadOnline();
const String& mqttPayloadOffline();


Loading…
Cancel
Save