From 59269789dc80308e9afc1e4b3051d9d33e13bf8f Mon Sep 17 00:00:00 2001 From: Maxim Prokhorov Date: Fri, 19 Feb 2021 17:33:25 +0300 Subject: [PATCH] mqtt: improve qos support an additional callbacks - return PID when sending and subscribing, allow to subscribe to a special event when receiving acknowledgement from the broker (separate from normal lifecycle callback, since we allow a generic callable) - rework HomeAssistant to publish with QoS 1 and wait until the broker responds. lwmqtt sync library already did that, so this change only affects async library - re-add `ha.send` command - don't send HomeAssistant config when disabled, unless requested - some more comments --- code/espurna/homeassistant.cpp | 241 ++++++++++++++++++++--------- code/espurna/mqtt.cpp | 271 +++++++++++++++++++++------------ code/espurna/mqtt.h | 34 +++-- 3 files changed, 355 insertions(+), 191 deletions(-) diff --git a/code/espurna/homeassistant.cpp b/code/espurna/homeassistant.cpp index f672239d..a87a41e2 100644 --- a/code/espurna/homeassistant.cpp +++ b/code/espurna/homeassistant.cpp @@ -629,22 +629,20 @@ private: #endif -// Reworked discovery class. Continiously schedules itself until we have no more entities to send. -// Topic and message are generated on demand and most of JSON payload is cached for re-use. -// (both, to avoid manually generating JSON and to avoid possible UTF8 issues when concatenating char raw strings) +// Reworked discovery class. Try to send and wait for MQTT QoS 1 publish ACK to continue. +// Topic and message are generated on demand and most of JSON payload is cached for re-use to save RAM. class DiscoveryTask { public: using Entity = std::unique_ptr; using Entities = std::forward_list; - using Action = std::function; - static constexpr int Retries { 5 }; + static constexpr unsigned long WaitShortMs { 100ul }; + static constexpr unsigned long WaitLongMs { 1000ul }; - DiscoveryTask(bool enabled, Action action) : - _enabled(enabled), - _action(action) + DiscoveryTask(bool enabled) : + _enabled(enabled) {} void add(Entity&& entity) { @@ -656,126 +654,205 @@ public: _entities.push_front(std::make_unique(_ctx)); } + bool retry() { + if (_retry < 0) { + return false; + } + + return (--_retry < 0); + } + Context& context() { return _ctx; } - bool empty() const { + bool done() const { return _entities.empty(); } - bool operator()() { - if (!mqttConnected() || _entities.empty()) { - return false; - } - - auto& entity = _entities.front(); - if (!entity->ok()) { - _entities.pop_front(); - _ctx.reset(); - return true; + bool ok() const { + if ((_retry > 0) && !_entities.empty()) { + auto& entity = _entities.front(); + return entity->ok(); } - const auto* topic = entity->topic().c_str(); - const auto* msg = _enabled - ? entity->message().c_str() - : ""; + return false; + } - auto res = _action(topic, msg); - if (!res) { - if (--_retry < 0) { - DEBUG_MSG_P(PSTR("[HASS] Discovery failed after %d retries\n"), Retries); - return false; + template + bool send(T&& action) { + while (!_entities.empty()) { + auto& entity = _entities.front(); + if (!entity->ok()) { + _entities.pop_front(); + _ctx.reset(); + continue; } - DEBUG_MSG_P(PSTR("[HASS] Sending failed, retrying %d / %d\n"), (Retries - _retry), Retries); - return true; - } - - _retry = Retries; - if (entity->next()) { - return true; - } + const auto* topic = entity->topic().c_str(); + const auto* msg = _enabled + ? entity->message().c_str() + : ""; + + if (action(topic, msg)) { + if (!entity->next()) { + _retry = Retries; + _entities.pop_front(); + _ctx.reset(); + } + return true; + } - _entities.pop_front(); - if (!_entities.empty()) { - _ctx.reset(); - return true; + return false; } - + return false; } private: bool _enabled { false }; + int _retry { Retries }; Context _ctx { makeContext() }; - Action _action; Entities _entities; }; namespace internal { -constexpr unsigned long interval { 100ul }; +using TaskPtr = std::shared_ptr; +using FlagPtr = std::shared_ptr; + bool retain { false }; bool enabled { false }; -bool sent { false }; + +enum class State { + Initial, + Pending, + Sent +}; + +State state { State::Initial }; Ticker timer; -} // namespace internal +void send(TaskPtr ptr, FlagPtr flag_ptr); + +void stop(bool done) { + timer.detach(); + state = done ? State::Sent : State::Pending; +} -bool mqttSend(const String& topic, const String& message) { - return ::mqttSendRaw(topic.c_str(), message.c_str(), internal::retain) > 0; +void schedule(unsigned long wait, TaskPtr ptr, FlagPtr flag_ptr) { + internal::timer.once_ms_scheduled(wait, [ptr, flag_ptr]() { + send(ptr, flag_ptr); + }); } -bool enabled() { - return internal::enabled; +void schedule(TaskPtr ptr, FlagPtr flag_ptr) { + schedule(DiscoveryTask::WaitShortMs, ptr, flag_ptr); } -void publishDiscovery() { - static bool busy { false }; - if (busy) { +void schedule(TaskPtr ptr) { + schedule(DiscoveryTask::WaitShortMs, ptr, std::make_shared(true)); +} + +void send(TaskPtr ptr, FlagPtr flag_ptr) { + auto& task = *ptr; + if (!mqttConnected() || task.done()) { + stop(true); return; } - if (internal::sent) { + auto& flag = *flag_ptr; + if (!flag) { + if (task.retry()) { + schedule(ptr, flag_ptr); + } else { + stop(false); + } return; } - bool current = internal::enabled; - internal::enabled = getSetting("haEnabled", 1 == HOMEASSISTANT_ENABLED); - internal::retain = getSetting("haRetain", 1 == HOMEASSISTANT_RETAIN); + uint16_t pid { 0u }; + auto res = task.send([&](const char* topic, const char* message) { + pid = ::mqttSendRaw(topic, message, internal::retain, 1); + return pid > 0; + }); + +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + // - async fails when disconneted and when it's buffers are filled, which should be resolved after $LATENCY + // and the time it takes for the lwip to process it. future versions use queue, but could still fail when low on RAM + // - lwmqtt will fail when disconnected (already checked above) and *will* disconnect in case publish fails. publish funciton will + // wait for the puback all by itself. not tested. + // - pubsub will fail when it can't buffer the payload *or* the underlying wificlient fails. also not tested. + + if (res) { + flag = false; + mqttOnPublish(pid, [flag_ptr]() { + (*flag_ptr) = true; + }); + } +#endif + + auto wait = res + ? DiscoveryTask::WaitShortMs + : DiscoveryTask::WaitLongMs; + + if (res || task.retry()) { + schedule(wait, ptr, flag_ptr); + return; + } + + if (task.done()) { + stop(true); + return; + } +} + +} // namespace internal + +void publishDiscovery() { + if (!mqttConnected() || internal::timer.active() || (internal::state != internal::State::Pending)) { + return; + } - if (current != internal::enabled) { - auto task = std::make_shared(internal::enabled, homeassistant::mqttSend); + auto task = std::make_shared(internal::enabled); #if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE - task->add(); + task->add(); #endif #if RELAY_SUPPORT - task->add(); + task->add(); #endif #if SENSOR_SUPPORT - task->add(); + task->add(); #endif - if (task->empty()) { - return; - } + // only happens when nothing is configured to do the add() + if (task->done()) { + return; + } - internal::timer.attach_ms(internal::interval, [task]() { - if (!(*task)()) { - internal::timer.detach(); - internal::sent = true; - busy = false; - } - }); + internal::schedule(task); +} + +void configure() { + bool current = internal::enabled; + internal::enabled = getSetting("haEnabled", 1 == HOMEASSISTANT_ENABLED); + internal::retain = getSetting("haRetain", 1 == HOMEASSISTANT_RETAIN); + + if (internal::enabled != current) { + internal::state = internal::State::Pending; } + + homeassistant::publishDiscovery(); } void mqttCallback(unsigned int type, const char* topic, char* payload) { if (MQTT_DISCONNECT_EVENT == type) { - internal::sent = false; + if (internal::state == internal::State::Sent) { + internal::state = internal::State::Pending; + } + internal::timer.detach(); return; } @@ -822,6 +899,12 @@ bool onKeyCheck(const char* key, JsonVariant& value) { } // namespace web } // namespace homeassistant +// This module does not implement .yaml generation, since we can't: +// - use unique_id in the device config +// - have abbreviated keys +// - have mqtt return the correct status & command payloads when it is disabled +// (yet? needs reworked configuration section or making functions read settings directly) + void haSetup() { #if WEB_SUPPORT wsRegister() @@ -835,11 +918,17 @@ void haSetup() { #endif mqttRegister(homeassistant::mqttCallback); - espurnaRegisterReload([]() { - if (mqttConnected()) { - homeassistant::publishDiscovery(); - } +#if TERMINAL_SUPPORT + terminalRegisterCommand(F("HA.SEND"), [](const terminal::CommandContext& ctx) { + using namespace homeassistant::internal; + state = State::Pending; + homeassistant::publishDiscovery(); + terminalOK(ctx); }); +#endif + + espurnaRegisterReload(homeassistant::configure); + homeassistant::configure(); } #endif // HOMEASSISTANT_SUPPORT diff --git a/code/espurna/mqtt.cpp b/code/espurna/mqtt.cpp index 613b5811..57b7ec32 100644 --- a/code/espurna/mqtt.cpp +++ b/code/espurna/mqtt.cpp @@ -92,6 +92,20 @@ String _mqtt_server; uint16_t _mqtt_port; String _mqtt_clientid; +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + +struct MqttPidCallback { + uint16_t pid; + mqtt_pid_callback_f run; +}; + +using MqttPidCallbacks = std::forward_list; + +MqttPidCallbacks _mqtt_publish_callbacks; +MqttPidCallbacks _mqtt_subscribe_callbacks; + +#endif + std::forward_list _mqtt_heartbeat_callbacks; heartbeat::Mode _mqtt_heartbeat_mode; heartbeat::Seconds _mqtt_heartbeat_interval; @@ -648,7 +662,6 @@ bool _mqttHeartbeat(heartbeat::Mask mask) { } void _mqttOnConnect() { - _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; _mqtt_last_connection = millis(); @@ -656,35 +669,59 @@ void _mqttOnConnect() { systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval); - DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); - - // Clean subscriptions - mqttUnsubscribeRaw("#"); - // Notify all subscribers about the connection for (auto& callback : _mqtt_callbacks) { callback(MQTT_CONNECT_EVENT, nullptr, nullptr); } + DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); } void _mqttOnDisconnect() { +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + _mqtt_publish_callbacks.clear(); + _mqtt_subscribe_callbacks.clear(); +#endif - // Reset reconnection delay _mqtt_last_connection = millis(); _mqtt_state = AsyncClientState::Disconnected; systemStopHeartbeat(_mqttHeartbeat); - DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); - // Notify all subscribers about the disconnect for (auto& callback : _mqtt_callbacks) { callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr); } + DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); } +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + +// Run the associated callback when message PID is acknowledged by the broker + +void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) { + if (callbacks.empty()) { + return; + } + + auto end = callbacks.end(); + auto prev = callbacks.before_begin(); + auto it = callbacks.begin(); + + while (it != end) { + if ((*it).pid == pid) { + (*it).run(); + it = callbacks.erase_after(prev); + } else { + prev = it; + ++it; + } + } +} + +#endif + // Force-skip everything received in a short window right after connecting to avoid syncronization issues. bool _mqttMaybeSkipRetained(char* topic) { @@ -764,23 +801,24 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) { @return String object with the magnitude part. */ String mqttMagnitude(const char* topic) { + String output; String pattern = _mqtt_topic + _mqtt_setter; int position = pattern.indexOf("#"); - if (position == -1) return String(); - String start = pattern.substring(0, position); - String end = pattern.substring(position + 1); - - String magnitude = String(topic); - if (magnitude.startsWith(start) && magnitude.endsWith(end)) { - magnitude.replace(start, ""); - magnitude.replace(end, ""); - } else { - magnitude = String(); - } - return magnitude; + if (position >= 0) { + String start = pattern.substring(0, position); + String end = pattern.substring(position + 1); + + String magnitude(topic); + if (magnitude.startsWith(start) && magnitude.endsWith(end)) { + magnitude.replace(start, ""); + magnitude.replace(end, ""); + output = std::move(magnitude); + } + } + return output; } /** @@ -791,10 +829,17 @@ String mqttMagnitude(const char* topic) { or a state topic (false). @return String full MQTT topic. */ -String mqttTopic(const char * magnitude, bool is_set) { - String output = _mqtt_topic; +String mqttTopic(const char* magnitude, bool is_set) { + String output; + output.reserve(strlen(magnitude) + + _mqtt_topic.length() + + _mqtt_setter.length() + + _mqtt_getter.length()); + + output += _mqtt_topic; output.replace("#", magnitude); output += is_set ? _mqtt_setter : _mqtt_getter; + return output; } @@ -807,23 +852,26 @@ String mqttTopic(const char * magnitude, bool is_set) { or a state topic (false). @return String full MQTT topic. */ -String mqttTopic(const char * magnitude, unsigned int index, bool is_set) { - char buffer[strlen(magnitude)+5]; - snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), magnitude, index); - return mqttTopic(buffer, is_set); +String mqttTopic(const char* magnitude, unsigned int index, bool is_set) { + String output; + output.reserve(strlen(magnitude) + (sizeof(decltype(index)) * 4)); + output += magnitude; + output += '/'; + output += index; + return mqttTopic(output.c_str(), is_set); } // ----------------------------------------------------------------------------- -bool mqttSendRaw(const char * topic, const char * message, bool retain) { +uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int qos) { constexpr size_t MessageLogMax { 128ul }; if (_mqtt.connected()) { const unsigned int packetId { #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - _mqtt.publish(topic, _mqtt_qos, retain, message) + _mqtt.publish(topic, qos, retain, message) #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT - _mqtt.publish(topic, message, retain, _mqtt_qos) + _mqtt.publish(topic, message, retain, qos) #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT _mqtt.publish(topic, message, retain) #endif @@ -836,50 +884,50 @@ bool mqttSendRaw(const char * topic, const char * message, bool retain) { DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId); } - return (packetId > 0); + return packetId; } return false; } +uint16_t mqttSendRaw(const char * topic, const char * message, bool retain) { + return mqttSendRaw(topic, message, retain, _mqtt_qos); +} -bool mqttSendRaw(const char * topic, const char * message) { - return mqttSendRaw(topic, message, _mqtt_retain); +uint16_t mqttSendRaw(const char * topic, const char * message) { + return mqttSendRaw(topic, message, _mqtt_retain, _mqtt_qos); } -void mqttSend(const char * topic, const char * message, bool force, bool retain) { - // TODO: refactor JSON mode to trigger WS-like status payloads instead sending single topic+message? - // (i.e. instead of {"relay/0": "1", ...} have {"relays": ["1"], ...}) - // Heartbeat handles periodic status dumps for everything, mqttSend alternative simply notifies the module to send it's status data +bool mqttSend(const char * topic, const char * message, bool force, bool retain) { if (!force && _mqtt_use_json) { mqttEnqueue(topic, message); _mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush); - return; + return true; } - mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain); + return mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain) > 0; } -void mqttSend(const char * topic, const char * message, bool force) { - mqttSend(topic, message, force, _mqtt_retain); +bool mqttSend(const char * topic, const char * message, bool force) { + return mqttSend(topic, message, force, _mqtt_retain); } -void mqttSend(const char * topic, const char * message) { - mqttSend(topic, message, false); +bool mqttSend(const char * topic, const char * message) { + return mqttSend(topic, message, false); } -void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) { +bool mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) { char buffer[strlen(topic)+5]; snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); - mqttSend(buffer, message, force, retain); + return mqttSend(buffer, message, force, retain); } -void mqttSend(const char * topic, unsigned int index, const char * message, bool force) { - mqttSend(topic, index, message, force, _mqtt_retain); +bool mqttSend(const char * topic, unsigned int index, const char * message, bool force) { + return mqttSend(topic, index, message, force, _mqtt_retain); } -void mqttSend(const char * topic, unsigned int index, const char * message) { - mqttSend(topic, index, message, false); +bool mqttSend(const char * topic, unsigned int index, const char * message) { + return mqttSend(topic, index, message, false); } // ----------------------------------------------------------------------------- @@ -949,36 +997,38 @@ void mqttEnqueue(const char* topic, const char* message) { // ----------------------------------------------------------------------------- -void mqttSubscribeRaw(const char * topic) { +// Only async client returns resulting PID, sync libraries return either success (1) or failure (0) + +uint16_t mqttSubscribeRaw(const char* topic, int qos) { + uint16_t pid { 0u }; if (_mqtt.connected() && (strlen(topic) > 0)) { - #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos); - DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId); - #else // Arduino-MQTT or PubSubClient - _mqtt.subscribe(topic, _mqtt_qos); - DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); - #endif + pid = _mqtt.subscribe(topic, qos); + DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, pid); } + + return pid; } -void mqttSubscribe(const char * topic) { - mqttSubscribeRaw(mqttTopic(topic, true).c_str()); +uint16_t mqttSubscribeRaw(const char* topic) { + return mqttSubscribeRaw(topic, _mqtt_qos); } -void mqttUnsubscribeRaw(const char * topic) { +bool mqttSubscribe(const char * topic) { + return mqttSubscribeRaw(mqttTopic(topic, true).c_str(), _mqtt_qos); +} + +uint16_t mqttUnsubscribeRaw(const char * topic) { + uint16_t pid { 0u }; if (_mqtt.connected() && (strlen(topic) > 0)) { - #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - unsigned int packetId = _mqtt.unsubscribe(topic); - DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId); - #else // Arduino-MQTT or PubSubClient - _mqtt.unsubscribe(topic); - DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic); - #endif + pid = _mqtt.unsubscribe(topic); + DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing from %s (PID %d)\n"), topic, pid); } + + return pid; } -void mqttUnsubscribe(const char * topic) { - mqttUnsubscribeRaw(mqttTopic(topic, true).c_str()); +bool mqttUnsubscribe(const char * topic) { + return mqttUnsubscribeRaw(mqttTopic(topic, true).c_str()); } // ----------------------------------------------------------------------------- @@ -1006,10 +1056,39 @@ bool mqttForward() { return _mqtt_forward; } +/** + Register a persistent lifecycle callback + + @param standalone function pointer +*/ void mqttRegister(mqtt_callback_f callback) { _mqtt_callbacks.push_front(callback); } +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + +/** + Register a temporary publish callback + + @param callable object +*/ +void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f callback) { + auto callable = MqttPidCallback { pid, callback }; + _mqtt_publish_callbacks.push_front(std::move(callable)); +} + +/** + Register a temporary subscribe callback + + @param callable object +*/ +void mqttOnSubscribe(uint16_t pid, mqtt_pid_callback_f callback) { + auto callable = MqttPidCallback { pid, callback }; + _mqtt_subscribe_callbacks.push_front(std::move(callable)); +} + +#endif + void mqttSetBroker(IPAddress ip, uint16_t port) { setSetting("mqttServer", ip.toString()); _mqtt_server = ip.toString(); @@ -1026,6 +1105,8 @@ void mqttSetBrokerIfNone(IPAddress ip, uint16_t port) { } } +// TODO: these strings are only updated after running the configuration routine and when MQTT is *enabled* + const String& mqttPayloadOnline() { return _mqtt_payload_online; } @@ -1047,13 +1128,12 @@ void mqttSendStatus() { // ----------------------------------------------------------------------------- void _mqttConnect() { - - // Do not connect if disabled - if (!_mqtt_enabled) return; - // Do not connect if already connected or still trying to connect if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return; + // Do not connect if disabled or no WiFi + if (!_mqtt_enabled || (WiFi.status() != WL_CONNECTED)) return; + // Check reconnect interval if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return; @@ -1097,31 +1177,19 @@ void _mqttConnect() { } void mqttLoop() { - - if (WiFi.status() != WL_CONNECTED) return; - - #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - - _mqttConnect(); - - #else // MQTT_LIBRARY != MQTT_LIBRARY_ASYNCMQTTCLIENT - - if (_mqtt.connected()) { - - _mqtt.loop(); - - } else { - - if (_mqtt_state != AsyncClientState::Disconnected) { - _mqttOnDisconnect(); - } - - _mqttConnect(); - +#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT + _mqttConnect(); +#else + if (_mqtt.connected()) { + _mqtt.loop(); + } else { + if (_mqtt_state != AsyncClientState::Disconnected) { + _mqttOnDisconnect(); } - #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - + _mqttConnect(); + } +#endif } void mqttHeartbeat(heartbeat::Callback callback) { @@ -1154,11 +1222,12 @@ void mqttSetup() { _mqttOnConnect(); }); - _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) { - DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %u\n"), packetId); + _mqtt.onSubscribe([](uint16_t pid, int) { + _mqttPidCallback(_mqtt_subscribe_callbacks, pid); }); - _mqtt.onPublish([](uint16_t packetId) { - DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %u\n"), packetId); + + _mqtt.onPublish([](uint16_t pid) { + _mqttPidCallback(_mqtt_publish_callbacks, pid); }); _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { @@ -1208,7 +1277,7 @@ void mqttSetup() { #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT - _mqtt.onMessageAdvanced([](MQTTClient *client, char topic[], char payload[], int length) { + _mqtt.onMessageAdvanced([](MQTTClient* , char topic[], char payload[], int length) { _mqttOnMessage(topic, payload, length); }); diff --git a/code/espurna/mqtt.h b/code/espurna/mqtt.h index f8e5c919..7132fb4f 100644 --- a/code/espurna/mqtt.h +++ b/code/espurna/mqtt.h @@ -56,25 +56,37 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot #define MQTT_TOPIC_CMD "cmd" using mqtt_callback_f = std::function; +using mqtt_pid_callback_f = std::function; void mqttHeartbeat(heartbeat::Callback); void mqttRegister(mqtt_callback_f callback); +void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f); +void mqttOnSubscribe(uint16_t pid, mqtt_pid_callback_f); + String mqttTopic(const char * magnitude, bool is_set); String mqttTopic(const char * magnitude, unsigned int index, bool is_set); String mqttMagnitude(const char* topic); -bool mqttSendRaw(const char * topic, const char * message, bool retain); -bool mqttSendRaw(const char * topic, const char * message); +uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int qos); +uint16_t mqttSendRaw(const char * topic, const char * message, bool retain); +uint16_t mqttSendRaw(const char * topic, const char * message); + +uint16_t mqttSubscribeRaw(const char * topic, int qos); +uint16_t mqttSubscribeRaw(const char * topic); +bool mqttSubscribe(const char * topic); + +uint16_t mqttUnsubscribeRaw(const char * topic); +bool mqttUnsubscribe(const char * topic); -void mqttSend(const char * topic, const char * message, bool force, bool retain); -void mqttSend(const char * topic, const char * message, bool force); -void mqttSend(const char * topic, const char * message); +bool mqttSend(const char * topic, const char * message, bool force, bool retain); +bool mqttSend(const char * topic, const char * message, bool force); +bool mqttSend(const char * topic, const char * message); -void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain); -void mqttSend(const char * topic, unsigned int index, const char * message, bool force); -void mqttSend(const char * topic, unsigned int index, const char * message); +bool mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain); +bool mqttSend(const char * topic, unsigned int index, const char * message, bool force); +bool mqttSend(const char * topic, unsigned int index, const char * message); void mqttSendStatus(); void mqttFlush(); @@ -88,12 +100,6 @@ const char* mqttPayloadStatus(bool status); void mqttSetBroker(IPAddress ip, uint16_t port); void mqttSetBrokerIfNone(IPAddress ip, uint16_t port); -void mqttSubscribeRaw(const char * topic); -void mqttSubscribe(const char * topic); - -void mqttUnsubscribeRaw(const char * topic); -void mqttUnsubscribe(const char * topic); - void mqttEnabled(bool status); bool mqttEnabled();