Browse Source

mqtt: improve qos support an additional callbacks

- return PID when sending and subscribing, allow to subscribe
to a special event when receiving acknowledgement from the broker
(separate from normal lifecycle callback, since we allow a generic callable)
- rework HomeAssistant to publish with QoS 1 and wait until the broker responds.
lwmqtt sync library already did that, so this change only affects async library
- re-add `ha.send` command
- don't send HomeAssistant config when disabled, unless requested
- some more comments
mcspr-patch-1
Maxim Prokhorov 3 years ago
parent
commit
59269789dc
3 changed files with 355 additions and 191 deletions
  1. +165
    -76
      code/espurna/homeassistant.cpp
  2. +170
    -101
      code/espurna/mqtt.cpp
  3. +20
    -14
      code/espurna/mqtt.h

+ 165
- 76
code/espurna/homeassistant.cpp View File

@ -629,22 +629,20 @@ private:
#endif #endif
// Reworked discovery class. Continiously schedules itself until we have no more entities to send.
// Topic and message are generated on demand and most of JSON payload is cached for re-use.
// (both, to avoid manually generating JSON and to avoid possible UTF8 issues when concatenating char raw strings)
// Reworked discovery class. Try to send and wait for MQTT QoS 1 publish ACK to continue.
// Topic and message are generated on demand and most of JSON payload is cached for re-use to save RAM.
class DiscoveryTask { class DiscoveryTask {
public: public:
using Entity = std::unique_ptr<Discovery>; using Entity = std::unique_ptr<Discovery>;
using Entities = std::forward_list<Entity>; using Entities = std::forward_list<Entity>;
using Action = std::function<bool(const String&, const String&)>;
static constexpr int Retries { 5 }; static constexpr int Retries { 5 };
static constexpr unsigned long WaitShortMs { 100ul };
static constexpr unsigned long WaitLongMs { 1000ul };
DiscoveryTask(bool enabled, Action action) :
_enabled(enabled),
_action(action)
DiscoveryTask(bool enabled) :
_enabled(enabled)
{} {}
void add(Entity&& entity) { void add(Entity&& entity) {
@ -656,126 +654,205 @@ public:
_entities.push_front(std::make_unique<T>(_ctx)); _entities.push_front(std::make_unique<T>(_ctx));
} }
bool retry() {
if (_retry < 0) {
return false;
}
return (--_retry < 0);
}
Context& context() { Context& context() {
return _ctx; return _ctx;
} }
bool empty() const {
bool done() const {
return _entities.empty(); return _entities.empty();
} }
bool operator()() {
if (!mqttConnected() || _entities.empty()) {
return false;
}
auto& entity = _entities.front();
if (!entity->ok()) {
_entities.pop_front();
_ctx.reset();
return true;
bool ok() const {
if ((_retry > 0) && !_entities.empty()) {
auto& entity = _entities.front();
return entity->ok();
} }
const auto* topic = entity->topic().c_str();
const auto* msg = _enabled
? entity->message().c_str()
: "";
return false;
}
auto res = _action(topic, msg);
if (!res) {
if (--_retry < 0) {
DEBUG_MSG_P(PSTR("[HASS] Discovery failed after %d retries\n"), Retries);
return false;
template <typename T>
bool send(T&& action) {
while (!_entities.empty()) {
auto& entity = _entities.front();
if (!entity->ok()) {
_entities.pop_front();
_ctx.reset();
continue;
} }
DEBUG_MSG_P(PSTR("[HASS] Sending failed, retrying %d / %d\n"), (Retries - _retry), Retries);
return true;
}
_retry = Retries;
if (entity->next()) {
return true;
}
const auto* topic = entity->topic().c_str();
const auto* msg = _enabled
? entity->message().c_str()
: "";
if (action(topic, msg)) {
if (!entity->next()) {
_retry = Retries;
_entities.pop_front();
_ctx.reset();
}
return true;
}
_entities.pop_front();
if (!_entities.empty()) {
_ctx.reset();
return true;
return false;
} }
return false; return false;
} }
private: private:
bool _enabled { false }; bool _enabled { false };
int _retry { Retries }; int _retry { Retries };
Context _ctx { makeContext() }; Context _ctx { makeContext() };
Action _action;
Entities _entities; Entities _entities;
}; };
namespace internal { namespace internal {
constexpr unsigned long interval { 100ul };
using TaskPtr = std::shared_ptr<DiscoveryTask>;
using FlagPtr = std::shared_ptr<bool>;
bool retain { false }; bool retain { false };
bool enabled { false }; bool enabled { false };
bool sent { false };
enum class State {
Initial,
Pending,
Sent
};
State state { State::Initial };
Ticker timer; Ticker timer;
} // namespace internal
void send(TaskPtr ptr, FlagPtr flag_ptr);
void stop(bool done) {
timer.detach();
state = done ? State::Sent : State::Pending;
}
bool mqttSend(const String& topic, const String& message) {
return ::mqttSendRaw(topic.c_str(), message.c_str(), internal::retain) > 0;
void schedule(unsigned long wait, TaskPtr ptr, FlagPtr flag_ptr) {
internal::timer.once_ms_scheduled(wait, [ptr, flag_ptr]() {
send(ptr, flag_ptr);
});
} }
bool enabled() {
return internal::enabled;
void schedule(TaskPtr ptr, FlagPtr flag_ptr) {
schedule(DiscoveryTask::WaitShortMs, ptr, flag_ptr);
} }
void publishDiscovery() {
static bool busy { false };
if (busy) {
void schedule(TaskPtr ptr) {
schedule(DiscoveryTask::WaitShortMs, ptr, std::make_shared<bool>(true));
}
void send(TaskPtr ptr, FlagPtr flag_ptr) {
auto& task = *ptr;
if (!mqttConnected() || task.done()) {
stop(true);
return; return;
} }
if (internal::sent) {
auto& flag = *flag_ptr;
if (!flag) {
if (task.retry()) {
schedule(ptr, flag_ptr);
} else {
stop(false);
}
return; return;
} }
bool current = internal::enabled;
internal::enabled = getSetting("haEnabled", 1 == HOMEASSISTANT_ENABLED);
internal::retain = getSetting("haRetain", 1 == HOMEASSISTANT_RETAIN);
uint16_t pid { 0u };
auto res = task.send([&](const char* topic, const char* message) {
pid = ::mqttSendRaw(topic, message, internal::retain, 1);
return pid > 0;
});
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
// - async fails when disconneted and when it's buffers are filled, which should be resolved after $LATENCY
// and the time it takes for the lwip to process it. future versions use queue, but could still fail when low on RAM
// - lwmqtt will fail when disconnected (already checked above) and *will* disconnect in case publish fails. publish funciton will
// wait for the puback all by itself. not tested.
// - pubsub will fail when it can't buffer the payload *or* the underlying wificlient fails. also not tested.
if (res) {
flag = false;
mqttOnPublish(pid, [flag_ptr]() {
(*flag_ptr) = true;
});
}
#endif
auto wait = res
? DiscoveryTask::WaitShortMs
: DiscoveryTask::WaitLongMs;
if (res || task.retry()) {
schedule(wait, ptr, flag_ptr);
return;
}
if (task.done()) {
stop(true);
return;
}
}
} // namespace internal
void publishDiscovery() {
if (!mqttConnected() || internal::timer.active() || (internal::state != internal::State::Pending)) {
return;
}
if (current != internal::enabled) {
auto task = std::make_shared<DiscoveryTask>(internal::enabled, homeassistant::mqttSend);
auto task = std::make_shared<DiscoveryTask>(internal::enabled);
#if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE #if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE
task->add<LightDiscovery>();
task->add<LightDiscovery>();
#endif #endif
#if RELAY_SUPPORT #if RELAY_SUPPORT
task->add<RelayDiscovery>();
task->add<RelayDiscovery>();
#endif #endif
#if SENSOR_SUPPORT #if SENSOR_SUPPORT
task->add<SensorDiscovery>();
task->add<SensorDiscovery>();
#endif #endif
if (task->empty()) {
return;
}
// only happens when nothing is configured to do the add()
if (task->done()) {
return;
}
internal::timer.attach_ms(internal::interval, [task]() {
if (!(*task)()) {
internal::timer.detach();
internal::sent = true;
busy = false;
}
});
internal::schedule(task);
}
void configure() {
bool current = internal::enabled;
internal::enabled = getSetting("haEnabled", 1 == HOMEASSISTANT_ENABLED);
internal::retain = getSetting("haRetain", 1 == HOMEASSISTANT_RETAIN);
if (internal::enabled != current) {
internal::state = internal::State::Pending;
} }
homeassistant::publishDiscovery();
} }
void mqttCallback(unsigned int type, const char* topic, char* payload) { void mqttCallback(unsigned int type, const char* topic, char* payload) {
if (MQTT_DISCONNECT_EVENT == type) { if (MQTT_DISCONNECT_EVENT == type) {
internal::sent = false;
if (internal::state == internal::State::Sent) {
internal::state = internal::State::Pending;
}
internal::timer.detach();
return; return;
} }
@ -822,6 +899,12 @@ bool onKeyCheck(const char* key, JsonVariant& value) {
} // namespace web } // namespace web
} // namespace homeassistant } // namespace homeassistant
// This module does not implement .yaml generation, since we can't:
// - use unique_id in the device config
// - have abbreviated keys
// - have mqtt return the correct status & command payloads when it is disabled
// (yet? needs reworked configuration section or making functions read settings directly)
void haSetup() { void haSetup() {
#if WEB_SUPPORT #if WEB_SUPPORT
wsRegister() wsRegister()
@ -835,11 +918,17 @@ void haSetup() {
#endif #endif
mqttRegister(homeassistant::mqttCallback); mqttRegister(homeassistant::mqttCallback);
espurnaRegisterReload([]() {
if (mqttConnected()) {
homeassistant::publishDiscovery();
}
#if TERMINAL_SUPPORT
terminalRegisterCommand(F("HA.SEND"), [](const terminal::CommandContext& ctx) {
using namespace homeassistant::internal;
state = State::Pending;
homeassistant::publishDiscovery();
terminalOK(ctx);
}); });
#endif
espurnaRegisterReload(homeassistant::configure);
homeassistant::configure();
} }
#endif // HOMEASSISTANT_SUPPORT #endif // HOMEASSISTANT_SUPPORT

+ 170
- 101
code/espurna/mqtt.cpp View File

@ -92,6 +92,20 @@ String _mqtt_server;
uint16_t _mqtt_port; uint16_t _mqtt_port;
String _mqtt_clientid; String _mqtt_clientid;
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
struct MqttPidCallback {
uint16_t pid;
mqtt_pid_callback_f run;
};
using MqttPidCallbacks = std::forward_list<MqttPidCallback>;
MqttPidCallbacks _mqtt_publish_callbacks;
MqttPidCallbacks _mqtt_subscribe_callbacks;
#endif
std::forward_list<heartbeat::Callback> _mqtt_heartbeat_callbacks; std::forward_list<heartbeat::Callback> _mqtt_heartbeat_callbacks;
heartbeat::Mode _mqtt_heartbeat_mode; heartbeat::Mode _mqtt_heartbeat_mode;
heartbeat::Seconds _mqtt_heartbeat_interval; heartbeat::Seconds _mqtt_heartbeat_interval;
@ -648,7 +662,6 @@ bool _mqttHeartbeat(heartbeat::Mask mask) {
} }
void _mqttOnConnect() { void _mqttOnConnect() {
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
_mqtt_last_connection = millis(); _mqtt_last_connection = millis();
@ -656,35 +669,59 @@ void _mqttOnConnect() {
systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval); systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval);
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
// Clean subscriptions
mqttUnsubscribeRaw("#");
// Notify all subscribers about the connection // Notify all subscribers about the connection
for (auto& callback : _mqtt_callbacks) { for (auto& callback : _mqtt_callbacks) {
callback(MQTT_CONNECT_EVENT, nullptr, nullptr); callback(MQTT_CONNECT_EVENT, nullptr, nullptr);
} }
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
} }
void _mqttOnDisconnect() { void _mqttOnDisconnect() {
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
_mqtt_publish_callbacks.clear();
_mqtt_subscribe_callbacks.clear();
#endif
// Reset reconnection delay
_mqtt_last_connection = millis(); _mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Disconnected; _mqtt_state = AsyncClientState::Disconnected;
systemStopHeartbeat(_mqttHeartbeat); systemStopHeartbeat(_mqttHeartbeat);
DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
// Notify all subscribers about the disconnect // Notify all subscribers about the disconnect
for (auto& callback : _mqtt_callbacks) { for (auto& callback : _mqtt_callbacks) {
callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr); callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr);
} }
DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
} }
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
// Run the associated callback when message PID is acknowledged by the broker
void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) {
if (callbacks.empty()) {
return;
}
auto end = callbacks.end();
auto prev = callbacks.before_begin();
auto it = callbacks.begin();
while (it != end) {
if ((*it).pid == pid) {
(*it).run();
it = callbacks.erase_after(prev);
} else {
prev = it;
++it;
}
}
}
#endif
// Force-skip everything received in a short window right after connecting to avoid syncronization issues. // Force-skip everything received in a short window right after connecting to avoid syncronization issues.
bool _mqttMaybeSkipRetained(char* topic) { bool _mqttMaybeSkipRetained(char* topic) {
@ -764,23 +801,24 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
@return String object with the magnitude part. @return String object with the magnitude part.
*/ */
String mqttMagnitude(const char* topic) { String mqttMagnitude(const char* topic) {
String output;
String pattern = _mqtt_topic + _mqtt_setter; String pattern = _mqtt_topic + _mqtt_setter;
int position = pattern.indexOf("#"); int position = pattern.indexOf("#");
if (position == -1) return String();
String start = pattern.substring(0, position);
String end = pattern.substring(position + 1);
String magnitude = String(topic);
if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
magnitude.replace(start, "");
magnitude.replace(end, "");
} else {
magnitude = String();
}
return magnitude;
if (position >= 0) {
String start = pattern.substring(0, position);
String end = pattern.substring(position + 1);
String magnitude(topic);
if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
magnitude.replace(start, "");
magnitude.replace(end, "");
output = std::move(magnitude);
}
}
return output;
} }
/** /**
@ -791,10 +829,17 @@ String mqttMagnitude(const char* topic) {
or a state topic (false). or a state topic (false).
@return String full MQTT topic. @return String full MQTT topic.
*/ */
String mqttTopic(const char * magnitude, bool is_set) {
String output = _mqtt_topic;
String mqttTopic(const char* magnitude, bool is_set) {
String output;
output.reserve(strlen(magnitude)
+ _mqtt_topic.length()
+ _mqtt_setter.length()
+ _mqtt_getter.length());
output += _mqtt_topic;
output.replace("#", magnitude); output.replace("#", magnitude);
output += is_set ? _mqtt_setter : _mqtt_getter; output += is_set ? _mqtt_setter : _mqtt_getter;
return output; return output;
} }
@ -807,23 +852,26 @@ String mqttTopic(const char * magnitude, bool is_set) {
or a state topic (false). or a state topic (false).
@return String full MQTT topic. @return String full MQTT topic.
*/ */
String mqttTopic(const char * magnitude, unsigned int index, bool is_set) {
char buffer[strlen(magnitude)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), magnitude, index);
return mqttTopic(buffer, is_set);
String mqttTopic(const char* magnitude, unsigned int index, bool is_set) {
String output;
output.reserve(strlen(magnitude) + (sizeof(decltype(index)) * 4));
output += magnitude;
output += '/';
output += index;
return mqttTopic(output.c_str(), is_set);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool mqttSendRaw(const char * topic, const char * message, bool retain) {
uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int qos) {
constexpr size_t MessageLogMax { 128ul }; constexpr size_t MessageLogMax { 128ul };
if (_mqtt.connected()) { if (_mqtt.connected()) {
const unsigned int packetId { const unsigned int packetId {
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
_mqtt.publish(topic, _mqtt_qos, retain, message)
_mqtt.publish(topic, qos, retain, message)
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
_mqtt.publish(topic, message, retain, _mqtt_qos)
_mqtt.publish(topic, message, retain, qos)
#elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
_mqtt.publish(topic, message, retain) _mqtt.publish(topic, message, retain)
#endif #endif
@ -836,50 +884,50 @@ bool mqttSendRaw(const char * topic, const char * message, bool retain) {
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId); DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
} }
return (packetId > 0);
return packetId;
} }
return false; return false;
} }
uint16_t mqttSendRaw(const char * topic, const char * message, bool retain) {
return mqttSendRaw(topic, message, retain, _mqtt_qos);
}
bool mqttSendRaw(const char * topic, const char * message) {
return mqttSendRaw(topic, message, _mqtt_retain);
uint16_t mqttSendRaw(const char * topic, const char * message) {
return mqttSendRaw(topic, message, _mqtt_retain, _mqtt_qos);
} }
void mqttSend(const char * topic, const char * message, bool force, bool retain) {
// TODO: refactor JSON mode to trigger WS-like status payloads instead sending single topic+message?
// (i.e. instead of {"relay/0": "1", ...} have {"relays": ["1"], ...})
// Heartbeat handles periodic status dumps for everything, mqttSend alternative simply notifies the module to send it's status data
bool mqttSend(const char * topic, const char * message, bool force, bool retain) {
if (!force && _mqtt_use_json) { if (!force && _mqtt_use_json) {
mqttEnqueue(topic, message); mqttEnqueue(topic, message);
_mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush); _mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
return;
return true;
} }
mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
return mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain) > 0;
} }
void mqttSend(const char * topic, const char * message, bool force) {
mqttSend(topic, message, force, _mqtt_retain);
bool mqttSend(const char * topic, const char * message, bool force) {
return mqttSend(topic, message, force, _mqtt_retain);
} }
void mqttSend(const char * topic, const char * message) {
mqttSend(topic, message, false);
bool mqttSend(const char * topic, const char * message) {
return mqttSend(topic, message, false);
} }
void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
bool mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
char buffer[strlen(topic)+5]; char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
mqttSend(buffer, message, force, retain);
return mqttSend(buffer, message, force, retain);
} }
void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
mqttSend(topic, index, message, force, _mqtt_retain);
bool mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
return mqttSend(topic, index, message, force, _mqtt_retain);
} }
void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSend(topic, index, message, false);
bool mqttSend(const char * topic, unsigned int index, const char * message) {
return mqttSend(topic, index, message, false);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -949,36 +997,38 @@ void mqttEnqueue(const char* topic, const char* message) {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void mqttSubscribeRaw(const char * topic) {
// Only async client returns resulting PID, sync libraries return either success (1) or failure (0)
uint16_t mqttSubscribeRaw(const char* topic, int qos) {
uint16_t pid { 0u };
if (_mqtt.connected() && (strlen(topic) > 0)) { if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
#else // Arduino-MQTT or PubSubClient
_mqtt.subscribe(topic, _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
#endif
pid = _mqtt.subscribe(topic, qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, pid);
} }
return pid;
} }
void mqttSubscribe(const char * topic) {
mqttSubscribeRaw(mqttTopic(topic, true).c_str());
uint16_t mqttSubscribeRaw(const char* topic) {
return mqttSubscribeRaw(topic, _mqtt_qos);
} }
void mqttUnsubscribeRaw(const char * topic) {
bool mqttSubscribe(const char * topic) {
return mqttSubscribeRaw(mqttTopic(topic, true).c_str(), _mqtt_qos);
}
uint16_t mqttUnsubscribeRaw(const char * topic) {
uint16_t pid { 0u };
if (_mqtt.connected() && (strlen(topic) > 0)) { if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
unsigned int packetId = _mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
#else // Arduino-MQTT or PubSubClient
_mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
#endif
pid = _mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing from %s (PID %d)\n"), topic, pid);
} }
return pid;
} }
void mqttUnsubscribe(const char * topic) {
mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
bool mqttUnsubscribe(const char * topic) {
return mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -1006,10 +1056,39 @@ bool mqttForward() {
return _mqtt_forward; return _mqtt_forward;
} }
/**
Register a persistent lifecycle callback
@param standalone function pointer
*/
void mqttRegister(mqtt_callback_f callback) { void mqttRegister(mqtt_callback_f callback) {
_mqtt_callbacks.push_front(callback); _mqtt_callbacks.push_front(callback);
} }
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
/**
Register a temporary publish callback
@param callable object
*/
void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f callback) {
auto callable = MqttPidCallback { pid, callback };
_mqtt_publish_callbacks.push_front(std::move(callable));
}
/**
Register a temporary subscribe callback
@param callable object
*/
void mqttOnSubscribe(uint16_t pid, mqtt_pid_callback_f callback) {
auto callable = MqttPidCallback { pid, callback };
_mqtt_subscribe_callbacks.push_front(std::move(callable));
}
#endif
void mqttSetBroker(IPAddress ip, uint16_t port) { void mqttSetBroker(IPAddress ip, uint16_t port) {
setSetting("mqttServer", ip.toString()); setSetting("mqttServer", ip.toString());
_mqtt_server = ip.toString(); _mqtt_server = ip.toString();
@ -1026,6 +1105,8 @@ void mqttSetBrokerIfNone(IPAddress ip, uint16_t port) {
} }
} }
// TODO: these strings are only updated after running the configuration routine and when MQTT is *enabled*
const String& mqttPayloadOnline() { const String& mqttPayloadOnline() {
return _mqtt_payload_online; return _mqtt_payload_online;
} }
@ -1047,13 +1128,12 @@ void mqttSendStatus() {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void _mqttConnect() { void _mqttConnect() {
// Do not connect if disabled
if (!_mqtt_enabled) return;
// Do not connect if already connected or still trying to connect // Do not connect if already connected or still trying to connect
if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return; if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return;
// Do not connect if disabled or no WiFi
if (!_mqtt_enabled || (WiFi.status() != WL_CONNECTED)) return;
// Check reconnect interval // Check reconnect interval
if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return; if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
@ -1097,31 +1177,19 @@ void _mqttConnect() {
} }
void mqttLoop() { void mqttLoop() {
if (WiFi.status() != WL_CONNECTED) return;
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
_mqttConnect();
#else // MQTT_LIBRARY != MQTT_LIBRARY_ASYNCMQTTCLIENT
if (_mqtt.connected()) {
_mqtt.loop();
} else {
if (_mqtt_state != AsyncClientState::Disconnected) {
_mqttOnDisconnect();
}
_mqttConnect();
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
_mqttConnect();
#else
if (_mqtt.connected()) {
_mqtt.loop();
} else {
if (_mqtt_state != AsyncClientState::Disconnected) {
_mqttOnDisconnect();
} }
#endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
_mqttConnect();
}
#endif
} }
void mqttHeartbeat(heartbeat::Callback callback) { void mqttHeartbeat(heartbeat::Callback callback) {
@ -1154,11 +1222,12 @@ void mqttSetup() {
_mqttOnConnect(); _mqttOnConnect();
}); });
_mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %u\n"), packetId);
_mqtt.onSubscribe([](uint16_t pid, int) {
_mqttPidCallback(_mqtt_subscribe_callbacks, pid);
}); });
_mqtt.onPublish([](uint16_t packetId) {
DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %u\n"), packetId);
_mqtt.onPublish([](uint16_t pid) {
_mqttPidCallback(_mqtt_publish_callbacks, pid);
}); });
_mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
@ -1208,7 +1277,7 @@ void mqttSetup() {
#elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
_mqtt.onMessageAdvanced([](MQTTClient *client, char topic[], char payload[], int length) {
_mqtt.onMessageAdvanced([](MQTTClient* , char topic[], char payload[], int length) {
_mqttOnMessage(topic, payload, length); _mqttOnMessage(topic, payload, length);
}); });


+ 20
- 14
code/espurna/mqtt.h View File

@ -56,25 +56,37 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#define MQTT_TOPIC_CMD "cmd" #define MQTT_TOPIC_CMD "cmd"
using mqtt_callback_f = std::function<void(unsigned int type, const char * topic, char * payload)>; using mqtt_callback_f = std::function<void(unsigned int type, const char * topic, char * payload)>;
using mqtt_pid_callback_f = std::function<void()>;
void mqttHeartbeat(heartbeat::Callback); void mqttHeartbeat(heartbeat::Callback);
void mqttRegister(mqtt_callback_f callback); void mqttRegister(mqtt_callback_f callback);
void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f);
void mqttOnSubscribe(uint16_t pid, mqtt_pid_callback_f);
String mqttTopic(const char * magnitude, bool is_set); String mqttTopic(const char * magnitude, bool is_set);
String mqttTopic(const char * magnitude, unsigned int index, bool is_set); String mqttTopic(const char * magnitude, unsigned int index, bool is_set);
String mqttMagnitude(const char* topic); String mqttMagnitude(const char* topic);
bool mqttSendRaw(const char * topic, const char * message, bool retain);
bool mqttSendRaw(const char * topic, const char * message);
uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int qos);
uint16_t mqttSendRaw(const char * topic, const char * message, bool retain);
uint16_t mqttSendRaw(const char * topic, const char * message);
uint16_t mqttSubscribeRaw(const char * topic, int qos);
uint16_t mqttSubscribeRaw(const char * topic);
bool mqttSubscribe(const char * topic);
uint16_t mqttUnsubscribeRaw(const char * topic);
bool mqttUnsubscribe(const char * topic);
void mqttSend(const char * topic, const char * message, bool force, bool retain);
void mqttSend(const char * topic, const char * message, bool force);
void mqttSend(const char * topic, const char * message);
bool mqttSend(const char * topic, const char * message, bool force, bool retain);
bool mqttSend(const char * topic, const char * message, bool force);
bool mqttSend(const char * topic, const char * message);
void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain);
void mqttSend(const char * topic, unsigned int index, const char * message, bool force);
void mqttSend(const char * topic, unsigned int index, const char * message);
bool mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain);
bool mqttSend(const char * topic, unsigned int index, const char * message, bool force);
bool mqttSend(const char * topic, unsigned int index, const char * message);
void mqttSendStatus(); void mqttSendStatus();
void mqttFlush(); void mqttFlush();
@ -88,12 +100,6 @@ const char* mqttPayloadStatus(bool status);
void mqttSetBroker(IPAddress ip, uint16_t port); void mqttSetBroker(IPAddress ip, uint16_t port);
void mqttSetBrokerIfNone(IPAddress ip, uint16_t port); void mqttSetBrokerIfNone(IPAddress ip, uint16_t port);
void mqttSubscribeRaw(const char * topic);
void mqttSubscribe(const char * topic);
void mqttUnsubscribeRaw(const char * topic);
void mqttUnsubscribe(const char * topic);
void mqttEnabled(bool status); void mqttEnabled(bool status);
bool mqttEnabled(); bool mqttEnabled();


Loading…
Cancel
Save