Browse Source

mqtt: make sure json topic is unique

mcspr-patch-1
Maxim Prokhorov 3 years ago
parent
commit
c01fbeb8ae
2 changed files with 57 additions and 19 deletions
  1. +57
    -15
      code/espurna/mqtt.cpp
  2. +0
    -4
      code/espurna/mqtt.h

+ 57
- 15
code/espurna/mqtt.cpp View File

@ -101,12 +101,46 @@ String _mqtt_payload_offline;
std::forward_list<mqtt_callback_f> _mqtt_callbacks;
int8_t _mqtt_queue_count { 0u };
std::forward_list<MqttMessage> _mqtt_queue;
Ticker _mqtt_flush_ticker;
// -----------------------------------------------------------------------------
// JSON payload
// -----------------------------------------------------------------------------
struct MqttPayload {
MqttPayload() = delete;
MqttPayload(const MqttPayload&) = default;
// TODO: replace String implementation with Core v3 (or just use newer Core)
// 2.7.x still has basic Arduino String move ctor that is not noexcept
MqttPayload(MqttPayload&& other) noexcept :
_topic(std::move(other._topic)),
_message(std::move(other._message))
{}
template <typename Topic, typename Message>
MqttPayload(Topic&& topic, Message&& message) :
_topic(std::forward<Topic>(topic)),
_message(std::forward<Message>(message))
{}
const String& topic() const {
return _topic;
}
const String& message() const {
return _message;
}
private:
String _topic;
String _message;
};
size_t _mqtt_json_payload_count { 0ul };
std::forward_list<MqttPayload> _mqtt_json_payload;
Ticker _mqtt_json_payload_flush;
// -----------------------------------------------------------------------------
// Private
// Secure client handlers
// -----------------------------------------------------------------------------
#if SECURE_CLIENT == SECURE_CLIENT_AXTLS
@ -144,6 +178,9 @@ SecureClientConfig _mqtt_sc_config {
};
#endif
// -----------------------------------------------------------------------------
// Client configuration & setup
// -----------------------------------------------------------------------------
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
@ -811,9 +848,12 @@ bool mqttSendRaw(const char * topic, const char * message) {
}
void mqttSend(const char * topic, const char * message, bool force, bool retain) {
// TODO: refactor JSON mode to trigger WS-like status payloads instead sending single topic+message?
// (i.e. instead of {"relay/0": "1", ...} have {"relays": ["1"], ...})
// Heartbeat handles periodic status dumps for everything, mqttSend alternative simply notifies the module to send it's status data
if (!force && _mqtt_use_json) {
mqttEnqueue(topic, message);
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
_mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
return;
}
@ -851,7 +891,7 @@ void mqttFlush() {
return;
}
if (_mqtt_queue.empty()) {
if (_mqtt_json_payload.empty()) {
return;
}
@ -876,32 +916,34 @@ void mqttFlush() {
root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
#endif
for (auto& element : _mqtt_queue) {
root[element.topic.c_str()] = element.message.c_str();
for (auto& payload : _mqtt_json_payload) {
root[payload.topic().c_str()] = payload.message().c_str();
}
String output;
root.printTo(output);
jsonBuffer.clear();
_mqtt_queue.clear();
_mqtt_json_payload_count = 0;
_mqtt_json_payload.clear();
_mqtt_queue_count = 0;
mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
}
void mqttEnqueue(const char * topic, const char * message) {
void mqttEnqueue(const char* topic, const char* message) {
// Queue is not meant to send message "offline"
// We must prevent the queue does not get full while offline
if (_mqtt.connected()) {
if (_mqtt_queue_count >= MQTT_QUEUE_MAX_SIZE) {
if (_mqtt_json_payload_count >= MQTT_QUEUE_MAX_SIZE) {
mqttFlush();
}
++_mqtt_queue_count;
_mqtt_queue.push_front({
topic, message
_mqtt_json_payload.remove_if([topic](const MqttPayload& payload) {
return payload.topic() == topic;
});
_mqtt_json_payload.emplace_front(topic, message);
++_mqtt_json_payload_count;
}
}


+ 0
- 4
code/espurna/mqtt.h View File

@ -56,10 +56,6 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#define MQTT_TOPIC_CMD "cmd"
using mqtt_callback_f = std::function<void(unsigned int type, const char * topic, char * payload)>;
struct MqttMessage {
String topic;
String message;
};
void mqttHeartbeat(heartbeat::Callback);
void mqttRegister(mqtt_callback_f callback);


Loading…
Cancel
Save