From dcc423ecaf556082ea7d358b886167f6ad179a21 Mon Sep 17 00:00:00 2001 From: Maxim Prokhorov Date: Tue, 2 Mar 2021 18:42:30 +0300 Subject: [PATCH] relay: mqtt events and group topics refactoring - fix relay-id check breaking group topics - group -> pub and sub topics - wildcard in subscription topic will be properly handled - make sure on disconnect event only triggers when mqtt is changing state from connected to disconnected, don't trigger every failed re-try - replace receive-only mode with separate sub and pub topics - some more build time settings (and some... questionable code to handle that) --- code/espurna/config/defaults.h | 100 +++++ code/espurna/config/general.h | 8 + code/espurna/config/types.h | 11 +- code/espurna/relay.cpp | 652 +++++++++++++++++++++------------ code/espurna/relay.h | 8 +- code/espurna/relay_config.h | 54 ++- code/html/index.html | 24 +- 7 files changed, 609 insertions(+), 248 deletions(-) diff --git a/code/espurna/config/defaults.h b/code/espurna/config/defaults.h index 55154290..d3464f68 100644 --- a/code/espurna/config/defaults.h +++ b/code/espurna/config/defaults.h @@ -887,6 +887,106 @@ #define RELAY8_PULSE_TIME RELAY_PULSE_TIME #endif +#ifndef RELAY1_MQTT_TOPIC_SUB +#define RELAY1_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY2_MQTT_TOPIC_SUB +#define RELAY2_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY3_MQTT_TOPIC_SUB +#define RELAY3_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY4_MQTT_TOPIC_SUB +#define RELAY4_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY5_MQTT_TOPIC_SUB +#define RELAY5_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY6_MQTT_TOPIC_SUB +#define RELAY6_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY7_MQTT_TOPIC_SUB +#define RELAY7_MQTT_TOPIC_SUB "" +#endif +#ifndef RELAY8_MQTT_TOPIC_SUB +#define RELAY8_MQTT_TOPIC_SUB "" +#endif + +#ifndef RELAY1_MQTT_TOPIC_PUB +#define RELAY1_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY2_MQTT_TOPIC_PUB +#define RELAY2_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY3_MQTT_TOPIC_PUB +#define RELAY3_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY4_MQTT_TOPIC_PUB +#define RELAY4_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY5_MQTT_TOPIC_PUB +#define RELAY5_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY6_MQTT_TOPIC_PUB +#define RELAY6_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY7_MQTT_TOPIC_PUB +#define RELAY7_MQTT_TOPIC_PUB "" +#endif +#ifndef RELAY8_MQTT_TOPIC_PUB +#define RELAY8_MQTT_TOPIC_PUB "" +#endif + +#ifndef RELAY1_MQTT_TOPIC_MODE +#define RELAY1_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY2_MQTT_TOPIC_MODE +#define RELAY2_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY3_MQTT_TOPIC_MODE +#define RELAY3_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY4_MQTT_TOPIC_MODE +#define RELAY4_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY5_MQTT_TOPIC_MODE +#define RELAY5_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY6_MQTT_TOPIC_MODE +#define RELAY6_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY7_MQTT_TOPIC_MODE +#define RELAY7_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif +#ifndef RELAY8_MQTT_TOPIC_MODE +#define RELAY8_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE +#endif + +#ifndef RELAY1_MQTT_DISCONNECT_STATUS +#define RELAY1_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY2_MQTT_DISCONNECT_STATUS +#define RELAY2_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY3_MQTT_DISCONNECT_STATUS +#define RELAY3_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY4_MQTT_DISCONNECT_STATUS +#define RELAY4_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY5_MQTT_DISCONNECT_STATUS +#define RELAY5_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY6_MQTT_DISCONNECT_STATUS +#define RELAY6_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY7_MQTT_DISCONNECT_STATUS +#define RELAY7_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif +#ifndef RELAY8_MQTT_DISCONNECT_STATUS +#define RELAY8_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS +#endif + // ----------------------------------------------------------------------------- // LEDs // ----------------------------------------------------------------------------- diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index 5cd6ba0c..29d85a5a 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -457,6 +457,14 @@ #define RELAY_MQTT_TOGGLE "2" #endif +#ifndef RELAY_MQTT_TOPIC_MODE +#define RELAY_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_NORMAL +#endif + +#ifndef RELAY_MQTT_DISCONNECT_STATUS +#define RELAY_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_NONE +#endif + //------------------------------------------------------------------------------ // BUTTON //------------------------------------------------------------------------------ diff --git a/code/espurna/config/types.h b/code/espurna/config/types.h index a14e2560..d2a5e4bd 100644 --- a/code/espurna/config/types.h +++ b/code/espurna/config/types.h @@ -122,9 +122,14 @@ #define RFB_PROVIDER_RCSWITCH 0 #define RFB_PROVIDER_EFM8BB1 1 -#define RELAY_GROUP_SYNC_NORMAL 0 -#define RELAY_GROUP_SYNC_INVERSE 1 -#define RELAY_GROUP_SYNC_RECEIVEONLY 2 +#define RELAY_MQTT_TOPIC_NORMAL RelayMqttTopicMode::Normal +#define RELAY_MQTT_TOPIC_INVERSE RelayMqttTopicMode::Inverse +#define RELAY_MQTT_TOPIC_RECEIVE_ONLY RelayMqttTopicMode::ReceiveOnly + +#define RELAY_MQTT_DISCONNECT_NONE PayloadStatus::Unknown +#define RELAY_MQTT_DISCONNECT_ON PayloadStatus::On +#define RELAY_MQTT_DISCONNECT_OFF PayloadStatus::Off +#define RELAY_MQTT_DISCONNECT_TOGGLE PayloadStatus::Toggle #define RELAY_LOCK_DISABLED RelayLock::None #define RELAY_LOCK_NONE RelayLock::None diff --git a/code/espurna/relay.cpp b/code/espurna/relay.cpp index ddc0fba5..abca2633 100644 --- a/code/espurna/relay.cpp +++ b/code/espurna/relay.cpp @@ -148,6 +148,32 @@ const char* _relayLockToPayload(RelayLock lock) { namespace settings { namespace internal { +template <> +PayloadStatus convert(const String& value) { + auto status = static_cast(value.toInt()); + switch (status) { + case PayloadStatus::Off: + case PayloadStatus::On: + case PayloadStatus::Toggle: + case PayloadStatus::Unknown: + return status; + } + + return PayloadStatus::Unknown; +} + +template <> +RelayMqttTopicMode convert(const String& value) { + auto mode = static_cast(value.toInt()); + switch (mode) { + case RelayMqttTopicMode::Normal: + case RelayMqttTopicMode::Inverse: + return mode; + } + + return RelayMqttTopicMode::Normal; +} + template <> RelayPulse convert(const String& value) { return _relayPayloadToTristate(value.c_str()); @@ -267,6 +293,10 @@ RelayStatusCallback _relay_status_change { nullptr }; bool _relay_report_ws = false; +void _relayWsReport() { + _relay_report_ws = true; +} + #endif // WEB_SUPPORT #if MQTT_SUPPORT || API_SUPPORT @@ -642,15 +672,28 @@ bool _relayHandlePulsePayload(unsigned char id, const String& payload) { return _relayHandlePulsePayload(id, payload.c_str()); } -PayloadStatus _relayStatusInvert(PayloadStatus status) { - return (status == PayloadStatus::On) ? PayloadStatus::Off : status; +PayloadStatus _relayInvertStatus(PayloadStatus status) { + switch (status) { + case PayloadStatus::On: + return PayloadStatus::Off; + case PayloadStatus::Off: + return PayloadStatus::On; + case PayloadStatus::Toggle: + case PayloadStatus::Unknown: + break; + } + + return PayloadStatus::Unknown; } -PayloadStatus _relayStatusTyped(unsigned char id) { - if (id >= _relays.size()) return PayloadStatus::Off; +PayloadStatus _relayPayloadStatus(unsigned char id) { + if (id < _relays.size()) { + return _relays[id].current_status + ? PayloadStatus::On + : PayloadStatus::Off; + } - const bool status = _relays[id].current_status; - return (status) ? PayloadStatus::On : PayloadStatus::Off; + return PayloadStatus::Unknown; } void _relayLockAll() { @@ -707,7 +750,7 @@ void _relaySyncUnlock() { auto action = []() { _relayUnlockAll(); #if WEB_SUPPORT - _relay_report_ws = true; + _relayWsReport(); #endif }; @@ -718,83 +761,6 @@ void _relaySyncUnlock() { } } -// ----------------------------------------------------------------------------- -// RELAY PROVIDERS -// ----------------------------------------------------------------------------- - -/** - * Walks the relay vector processing only those relays - * that have to change to the requested mode - * @bool mode Requested mode - */ -void _relayProcess(bool mode) { - - bool changed = false; - - for (unsigned char id = 0; id < _relays.size(); id++) { - - bool target = _relays[id].target_status; - - // Only process the relays we have to change - if (target == _relays[id].current_status) continue; - - // Only process the relays we have to change to the requested mode - if (target != mode) continue; - - // Only process if the change delay has expired - if (_relays[id].change_delay && (millis() - _relays[id].change_start < _relays[id].change_delay)) continue; - - // Purge existing delay in case of cancelation - _relays[id].change_delay = 0; - changed = true; - - DEBUG_MSG_P(PSTR("[RELAY] #%d set to %s\n"), id, target ? "ON" : "OFF"); - - // Call the provider to perform the action - _relays[id].current_status = target; - _relays[id].provider->change(target); - if (_relay_status_change) { - _relay_status_change(id, target); - } - - // Send to Broker - #if BROKER_SUPPORT - StatusBroker::Publish(MQTT_TOPIC_RELAY, id, target); - #endif - - // Send MQTT - #if MQTT_SUPPORT - relayMQTT(id); - #endif - - #if WEB_SUPPORT - _relay_report_ws = true; - #endif - - if (!_relayRecursive) { - - relayPulse(id); - - // We will trigger a eeprom save only if - // we care about current relay status on boot - const auto boot_mode = getSetting({"relayBoot", id}, _relayBootMode(id)); - const bool save_eeprom = ((RELAY_BOOT_SAME == boot_mode) || (RELAY_BOOT_TOGGLE == boot_mode)); - _relay_save_timer.once_ms(RELAY_SAVE_DELAY, relaySave, save_eeprom); - - } - - _relays[id].report = false; - _relays[id].group_report = false; - - } - - // Whenever we are using sync modes and any relay had changed the state, check if we can unlock - const bool needs_unlock = ((_relay_sync_mode == RELAY_SYNC_NONE_OR_ONE) || (_relay_sync_mode == RELAY_SYNC_ONE)); - if (_relay_sync_locked && needs_unlock && changed) { - _relaySyncUnlock(); - } -} - // ----------------------------------------------------------------------------- // RELAY // ----------------------------------------------------------------------------- @@ -1080,29 +1046,57 @@ unsigned char relayCount() { } PayloadStatus relayParsePayload(const char * payload) { - #if MQTT_SUPPORT || API_SUPPORT - return rpcParsePayload(payload, [](const char* payload) { - if (_relay_rpc_payload_off.equals(payload)) return PayloadStatus::Off; - if (_relay_rpc_payload_on.equals(payload)) return PayloadStatus::On; - if (_relay_rpc_payload_toggle.equals(payload)) return PayloadStatus::Toggle; - return PayloadStatus::Unknown; - }); - #else - return rpcParsePayload(payload); - #endif +#if MQTT_SUPPORT || API_SUPPORT + return rpcParsePayload(payload, [](const char* payload) { + if (_relay_rpc_payload_off.equals(payload)) { + return PayloadStatus::Off; + } else if (_relay_rpc_payload_on.equals(payload)) { + return PayloadStatus::On; + } else if (_relay_rpc_payload_toggle.equals(payload)) { + return PayloadStatus::Toggle; + } + + return PayloadStatus::Unknown; + }); +#else + return rpcParsePayload(payload); +#endif } void _relaySettingsMigrate(int version) { - if (!version || (version >= 5)) { - return; - } + if (version && (version < 5)) { + // just a rename + moveSetting("relayDelayInterlock", "relayIlkDelay"); + + // groups use a new set of keys + for (unsigned char index = 0; index < RelaysMax; ++index) { + auto group = getSetting({"mqttGroup", index}); + if (!group.length()) { + break; + } - delSettingPrefix({ - "relayGPIO", - "relayProvider", - "relayType", - }); - delSetting("relays"); + auto syncKey = SettingsKey("mqttGroupSync", index); + auto sync = getSetting(syncKey); + + setSetting({"relayTopicSub", index}, group); + if (sync.length()) { + if (sync != "2") { // aka RECEIVE_ONLY + setSetting("relayTopicMode", sync); + setSetting("relayTopicPub", group); + } + } + } + + delSettingPrefix({ + "mqttGroup", // migrated to relayTopic + "mqttGroupSync", // migrated to relayTopic + "relayOnDisc", // replaced with relayMqttDisc + "relayGPIO", // avoid depending on migrate.ino + "relayProvider", // different type + "relayType", // different type + }); + delSetting("relays"); // does not do anything + } } void _relayBoot(unsigned char index, const RelayMaskHelper& mask) { @@ -1187,7 +1181,7 @@ void _relayConfigure() { _relay_flood_window = (1000 * getSetting("relayFloodTime", RELAY_FLOOD_WINDOW)); _relay_flood_changes = getSetting("relayFloodChanges", RELAY_FLOOD_CHANGES); - _relay_delay_interlock = getSetting("relayDelayInterlock", RELAY_DELAY_INTERLOCK); + _relay_delay_interlock = getSetting("relayIlkDelay", RELAY_DELAY_INTERLOCK); _relay_sync_mode = getSetting("relaySync", RELAY_SYNC); #if MQTT_SUPPORT || API_SUPPORT @@ -1224,47 +1218,62 @@ void _relayWebSocketUpdate(JsonObject& root) { } void _relayWebSocketSendRelays(JsonObject& root) { + if (!relayCount()) { + return; + } + JsonObject& config = root.createNestedObject("relayConfig"); config["size"] = relayCount(); config["start"] = 0; - const char* keys[] = { - "prov", "name", "boot", "pulse", "pulse_time" - }; - JsonArray& schema = config.createNestedArray("schema"); - schema.copyFrom(keys, sizeof(keys) / sizeof(*keys)); - - #if SCHEDULER_SUPPORT - schema.add("sch_last"); - #endif - - #if MQTT_SUPPORT - schema.add("group"); - schema.add("group_sync"); - schema.add("on_disc"); - #endif + { + const char* schema_keys[] = { + "prov", + "name", + "boot", +#if SCHEDULER_SUPPORT + "sch_last", +#endif +#if MQTT_SUPPORT + "topic_pub", + "topic_sub", + "topic_mode", + "mqtt_disc", +#endif + "pulse", + "pulse_time" + }; - JsonArray& relays = config.createNestedArray("relays"); + JsonArray& schema = config.createNestedArray("schema"); + schema.copyFrom(schema_keys, sizeof(schema_keys) / sizeof(*schema_keys)); + } - for (unsigned char id = 0; id < relayCount(); ++id) { - JsonArray& relay = relays.createNestedArray(); - relay.add(_relays[id].provider->id()); - relay.add(getSetting({"relayName", id})); - relay.add(getSetting({"relayBoot", id}, _relayBootMode(id))); + { + JsonArray& relays = config.createNestedArray("relays"); - relay.add(static_cast(_relays[id].pulse)); - relay.add(_relays[id].pulse_ms / 1000.0); + for (unsigned char id = 0; id < relayCount(); ++id) { + JsonArray& relay = relays.createNestedArray(); + relay.add(_relays[id].provider->id()); + relay.add(getSetting({"relayName", id})); + relay.add(getSetting({"relayBoot", id}, _relayBootMode(id))); - #if SCHEDULER_SUPPORT +#if SCHEDULER_SUPPORT relay.add(getSetting({"relayLastSch", id}, SCHEDULER_RESTORE_LAST_SCHEDULE)); - #endif +#endif - #if MQTT_SUPPORT - relay.add(getSetting({"mqttGroup", id})); - relay.add(getSetting({"mqttGroupSync", id}, 0)); - relay.add(getSetting({"relayOnDisc", id}, 0)); - #endif +#if MQTT_SUPPORT + relay.add(getSetting({"relayTopicSub", id}, _relayMqttTopicSub(id))); + relay.add(getSetting({"relayTopicPub", id}, _relayMqttTopicPub(id))); + relay.add(static_cast(getSetting({"relayTopicMode", id}, + _relayMqttTopicMode(id)))); + relay.add(static_cast(getSetting({"relayMqttDisc", id}, + _relayMqttDisconnectionStatus(id)))); +#endif + + relay.add(static_cast(_relays[id].pulse)); + relay.add(_relays[id].pulse_ms / 1000.0); + } } } @@ -1274,19 +1283,14 @@ void _relayWebSocketOnVisible(JsonObject& root) { if (relayCount() > 1) { root["multirelayVisible"] = 1; root["relaySync"] = getSetting("relaySync", RELAY_SYNC); - root["relayDelayInterlock"] = getSetting("relayDelayInterlock", RELAY_DELAY_INTERLOCK); + root["relayIlkDelay"] = getSetting("relayIlkDelay", RELAY_DELAY_INTERLOCK); } root["relayVisible"] = 1; } void _relayWebSocketOnConnected(JsonObject& root) { - - if (relayCount() == 0) return; - - // Per-relay configuration _relayWebSocketSendRelays(root); - } void _relayWebSocketOnAction(uint32_t client_id, const char * action, JsonObject& data) { @@ -1419,39 +1423,150 @@ const char* relayPayload(PayloadStatus status) { #if MQTT_SUPPORT -void _relayMQTTGroup(unsigned char id) { - const String topic = getSetting({"mqttGroup", id}); - if (!topic.length()) return; +struct RelayCustomTopic { + RelayCustomTopic() = delete; + RelayCustomTopic(const RelayCustomTopic&) = delete; + RelayCustomTopic(RelayCustomTopic&&) = delete; - const auto mode = getSetting({"mqttGroupSync", id}, RELAY_GROUP_SYNC_NORMAL); - if (mode == RELAY_GROUP_SYNC_RECEIVEONLY) return; + template + RelayCustomTopic(unsigned char id, T&& topic, RelayMqttTopicMode mode) : + _id(id), + _topic(std::forward(topic)), + _parts(_topic), + _mode(mode) + {} - auto status = _relayStatusTyped(id); - if (mode == RELAY_GROUP_SYNC_INVERSE) status = _relayStatusInvert(status); - mqttSendRaw(topic.c_str(), relayPayload(status)); -} + unsigned char id() const { + return _id; + } -void relayMQTT(unsigned char id) { + const String& topic() const { + return _topic; + } - if (id >= _relays.size()) return; + const PathParts& parts() const { + return _parts; + } - // Send state topic - if (_relays[id].report) { - _relays[id].report = false; - mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayStatusTyped(id))); + const RelayMqttTopicMode mode() const { + return _mode; } - // Check group topic - if (_relays[id].group_report) { - _relays[id].group_report = false; - _relayMQTTGroup(id); + bool match(const String& other) const { + PathParts parts(other); + return _parts.match(parts); + } + + bool match(const PathParts& parts) const { + return _parts.match(parts); + } + +private: + unsigned char _id; + String _topic; + PathParts _parts; + RelayMqttTopicMode _mode; +}; + +std::forward_list _relay_custom_topics; + +// TODO: it *will* handle the duplicates, but we waste memory storing them +// TODO: mqttSubscribe(...) also happens multiple times +// +// this is not really an intended use-case though, but it is techically possible... + +void _relayMqttSubscribeCustomTopics() { + const size_t relays { relayCount() }; + if (!relays) { + return; } + struct CustomTopic { + String value; + RelayMqttTopicMode mode; + }; + + std::vector topics; + topics.reserve(relays); + + for (unsigned char id = 0; id < relays; ++id) { + topics[id].value = _relayMqttTopicSub(id); + topics[id].mode = _relayMqttTopicMode(id); + } + + settings::kv_store.foreach([&](settings::kvs_type::KeyValueResult&& kv) { + const char* const SubPrefix = "relayTopicSub"; + const char* const ModePrefix = "relayTopicMode"; + + if ((kv.key.length <= strlen(SubPrefix)) + && (kv.key.length <= strlen(ModePrefix))) { + return; + } + + if (!kv.value.length) { + return; + } + + const auto key = kv.key.read(); + unsigned char id; + + if (key.startsWith(SubPrefix)) { + if (_relayTryParseId(key.c_str() + strlen(SubPrefix), id)) { + topics[id].value = std::move(kv.value.read()); + } + } else if (key.startsWith(ModePrefix)) { + using namespace settings::internal; + if (_relayTryParseId(key.c_str() + strlen(ModePrefix), id)) { + topics[id].mode = convert(kv.value.read()); + } + } + }); + + _relay_custom_topics.clear(); + for (unsigned char id = 0; id < relays; ++id) { + auto& topic = topics[id]; + if (!topic.value.length()) { + continue; + } + + mqttSubscribeRaw(topic.value.c_str()); + _relay_custom_topics.emplace_front(id, std::move(topic.value), topic.mode); + } +} + +void _relayMqttPublishCustomTopic(unsigned char id) { + const String topic = getSetting({"relayTopicPub", id}, _relayMqttTopicPub(id)); + if (!topic.length()) { + return; + } + + auto status = _relayPayloadStatus(id); + + auto mode = getSetting({"relayTopicMode", id}, _relayMqttTopicMode(id)); + if (mode == RelayMqttTopicMode::Inverse) { + status = _relayInvertStatus(status); + } + + mqttSendRaw(topic.c_str(), relayPayload(status)); } -void relayMQTT() { +void _relayMqttReport(unsigned char id) { + if (id < _relays.size()) { + if (_relays[id].report) { + _relays[id].report = false; + mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayPayloadStatus(id))); + } + + if (_relays[id].group_report) { + _relays[id].group_report = false; + _relayMqttPublishCustomTopic(id); + } + } +} + +void _relayMqttReportAll() { for (unsigned int id=0; id < _relays.size(); id++) { - mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayStatusTyped(id))); + mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayPayloadStatus(id))); } } @@ -1474,20 +1589,56 @@ void relayStatusWrap(unsigned char id, PayloadStatus value, bool is_group_topic) case PayloadStatus::Unknown: default: _relays[id].report = true; - relayMQTT(id); + _relayMqttReport(id); break; } } bool _relayMqttHeartbeat(heartbeat::Mask mask) { if (mask & heartbeat::Report::Relay) - relayMQTT(); + _relayMqttReportAll(); return mqttConnected(); } +void _relayMqttHandleCustomTopic(const String& topic, const char* payload) { + PathParts received(topic); + for (auto& topic : _relay_custom_topics) { + if (topic.match(received)) { + auto status = relayParsePayload(payload); + if (topic.mode() == RelayMqttTopicMode::Inverse) { + status = _relayInvertStatus(status); + } + + const auto id = topic.id(); + _relayHandleStatus(id, status); + _relays[id].group_report = false; + } + } +} + +void _relayMqttHandleDisconnect() { + settings::kv_store.foreach([](settings::kvs_type::KeyValueResult&& kv) { + const char* const prefix = "relayMqttDisc"; + if (kv.key.length <= strlen(prefix)) { + return; + } + + const auto key = kv.key.read(); + if (key.startsWith(prefix)) { + unsigned char id; + if (_relayTryParseId(key.c_str() + strlen(prefix), id)) { + const auto value = kv.value.read(); + _relayHandleStatus(id, relayParsePayload(value.c_str())); + } + } + }); +} + void relayMQTTCallback(unsigned int type, const char * topic, const char * payload) { + static bool connected { false }; + if (!relayCount()) { return; } @@ -1503,82 +1654,46 @@ void relayMQTTCallback(unsigned int type, const char * topic, const char * paylo snprintf_P(pulse_topic, sizeof(pulse_topic), PSTR("%s/+"), MQTT_TOPIC_PULSE); mqttSubscribe(pulse_topic); - // Subscribe to group topics - for (unsigned char i=0; i < _relays.size(); i++) { - const auto t = getSetting({"mqttGroup", i}); - if (t.length() > 0) mqttSubscribeRaw(t.c_str()); - } + _relayMqttSubscribeCustomTopics(); + + connected = true; + return; } if (type == MQTT_MESSAGE_EVENT) { - String t = mqttMagnitude((char *) topic); - unsigned char id; - if (!_relayTryParseIdFromPath(t.c_str(), id)) { - return; - } - - if (t.startsWith(MQTT_TOPIC_PULSE)) { - _relayHandlePulsePayload(id, payload); - _relays[id].report = mqttForward(); - return; - } - - if (t.startsWith(MQTT_TOPIC_RELAY)) { - _relayHandlePayload(id, payload); - _relays[id].report = mqttForward(); - return; - } - - // TODO: cache group topics instead of reading settings each time? - // TODO: this is another kvs::foreach case, since we slow down MQTT when settings grow - for (unsigned char i=0; i < _relays.size(); i++) { - - const String t = getSetting({"mqttGroup", i}); - if (!t.length()) break; - - if (t == topic) { - auto value = relayParsePayload(payload); - if (value == PayloadStatus::Unknown) return; - - if ((value == PayloadStatus::On) || (value == PayloadStatus::Off)) { - if (getSetting({"mqttGroupSync", i}, RELAY_GROUP_SYNC_NORMAL) == RELAY_GROUP_SYNC_INVERSE) { - value = _relayStatusInvert(value); - } - } + auto is_relay = t.startsWith(MQTT_TOPIC_RELAY); + auto is_pulse = t.startsWith(MQTT_TOPIC_PULSE); + if (is_relay || is_pulse) { + unsigned char id; + if (!_relayTryParseIdFromPath(t.c_str(), id)) { + return; + } - DEBUG_MSG_P(PSTR("[RELAY] Matched group topic for relayID %d\n"), i); - _relayHandleStatus(i, value); - _relays[i].group_report = false; + if (is_relay) { + _relayHandlePayload(id, payload); + _relays[id].report = mqttForward(); + return; + } + if (is_pulse) { + _relayHandlePulsePayload(id, payload); + _relays[id].report = mqttForward(); + return; } } + _relayMqttHandleCustomTopic(topic, payload); + return; } - // TODO: safeguard against network issues. this one has good intentions, but we may end up - // switching relays back and forth when connection is unstable but reconnects very fast after the failure - if (type == MQTT_DISCONNECT_EVENT) { - for (unsigned char i=0; i < _relays.size(); i++) { - const auto reaction = getSetting({"relayOnDisc", i}, 0); - - bool status; - switch (reaction) { - case 1: - status = false; - break; - case 2: - status = true; - break; - default: - return; - } - - DEBUG_MSG_P(PSTR("[RELAY] Turn %s relay #%u due to MQTT disconnection\n"), status ? "ON" : "OFF", i); - relayStatus(i, status); + if (connected) { + connected = false; + _relayMqttHandleDisconnect(); } + return; } } @@ -1666,6 +1781,81 @@ void _relayInitCommands() { #endif // TERMINAL_SUPPORT +//------------------------------------------------------------------------------ + +void _relayReport(unsigned char id [[gnu::unused]], bool status [[gnu::unused]]) { +#if BROKER_SUPPORT + StatusBroker::Publish(MQTT_TOPIC_RELAY, id, status); +#endif +#if MQTT_SUPPORT + _relayMqttReport(id); +#endif +#if WEB_SUPPORT + _relayWsReport(); +#endif +} + +/** + * Walks the relay vector processing only those relays + * that have to change to the requested mode + * @bool mode Requested mode + */ +void _relayProcess(bool mode) { + + bool changed = false; + + for (unsigned char id = 0; id < _relays.size(); id++) { + + bool target = _relays[id].target_status; + + // Only process the relays we have to change + if (target == _relays[id].current_status) continue; + + // Only process the relays we have to change to the requested mode + if (target != mode) continue; + + // Only process if the change delay has expired + if (_relays[id].change_delay && (millis() - _relays[id].change_start < _relays[id].change_delay)) continue; + + // Purge existing delay in case of cancelation + _relays[id].change_delay = 0; + changed = true; + + DEBUG_MSG_P(PSTR("[RELAY] #%d set to %s\n"), id, target ? "ON" : "OFF"); + + // Call the provider to perform the action + _relays[id].current_status = target; + _relays[id].provider->change(target); + if (_relay_status_change) { + _relay_status_change(id, target); + } + + _relayReport(id, target); + + if (!_relayRecursive) { + + relayPulse(id); + + // We will trigger a eeprom save only if + // we care about current relay status on boot + const auto boot_mode = getSetting({"relayBoot", id}, _relayBootMode(id)); + const bool save_eeprom = ((RELAY_BOOT_SAME == boot_mode) || (RELAY_BOOT_TOGGLE == boot_mode)); + _relay_save_timer.once_ms(RELAY_SAVE_DELAY, relaySave, save_eeprom); + + } + + _relays[id].report = false; + _relays[id].group_report = false; + + } + + // Whenever we are using sync modes and any relay had changed the state, check if we can unlock + const bool needs_unlock = ((_relay_sync_mode == RELAY_SYNC_NONE_OR_ONE) || (_relay_sync_mode == RELAY_SYNC_ONE)); + if (_relay_sync_locked && needs_unlock && changed) { + _relaySyncUnlock(); + } +} + //------------------------------------------------------------------------------ // Setup //------------------------------------------------------------------------------ @@ -1673,12 +1863,12 @@ void _relayInitCommands() { void _relayLoop() { _relayProcess(false); _relayProcess(true); - #if WEB_SUPPORT - if (_relay_report_ws) { - wsPost(_relayWebSocketUpdate); - _relay_report_ws = false; - } - #endif +#if WEB_SUPPORT + if (_relay_report_ws) { + wsPost(_relayWebSocketUpdate); + _relay_report_ws = false; + } +#endif } // Dummy relays for virtual light switches (hardware-less), Sonoff Dual, Sonoff RF Bridge and Tuya diff --git a/code/espurna/relay.h b/code/espurna/relay.h index 8d88005f..2847435e 100644 --- a/code/espurna/relay.h +++ b/code/espurna/relay.h @@ -32,6 +32,11 @@ enum class RelayType : int { LatchedInverse }; +enum class RelayMqttTopicMode : int { + Normal, + Inverse +}; + enum class RelayProvider: int { None, Dummy, @@ -85,9 +90,6 @@ const String& relayPayloadToggle(); const char* relayPayload(PayloadStatus status); -void relayMQTT(unsigned char id); -void relayMQTT(); - void relayPulse(unsigned char id); void relaySync(unsigned char id); void relaySave(bool persist); diff --git a/code/espurna/relay_config.h b/code/espurna/relay_config.h index 7d6ed61c..36aaa3f1 100644 --- a/code/espurna/relay_config.h +++ b/code/espurna/relay_config.h @@ -134,6 +134,58 @@ constexpr RelayProvider _relayProvider(unsigned char index) { (index == 4) ? (RELAY5_PROVIDER) : (index == 5) ? (RELAY6_PROVIDER) : (index == 6) ? (RELAY7_PROVIDER) : - (index == 7) ? (RELAY8_PROVIDER) : RelayProvider::None + (index == 7) ? (RELAY8_PROVIDER) : RELAY_PROVIDER_NONE + ); +} + +constexpr RelayMqttTopicMode _relayMqttTopicMode(unsigned char index) { + return ( + (index == 0) ? (RELAY1_MQTT_TOPIC_MODE) : + (index == 1) ? (RELAY2_MQTT_TOPIC_MODE) : + (index == 2) ? (RELAY3_MQTT_TOPIC_MODE) : + (index == 3) ? (RELAY4_MQTT_TOPIC_MODE) : + (index == 4) ? (RELAY5_MQTT_TOPIC_MODE) : + (index == 5) ? (RELAY6_MQTT_TOPIC_MODE) : + (index == 6) ? (RELAY7_MQTT_TOPIC_MODE) : + (index == 7) ? (RELAY8_MQTT_TOPIC_MODE) : RELAY_MQTT_TOPIC_MODE + ); +} + +constexpr const char* _relayMqttTopicSub(unsigned char index) { + return ( + (index == 0) ? (RELAY1_MQTT_TOPIC_SUB) : + (index == 1) ? (RELAY2_MQTT_TOPIC_SUB) : + (index == 2) ? (RELAY3_MQTT_TOPIC_SUB) : + (index == 3) ? (RELAY4_MQTT_TOPIC_SUB) : + (index == 4) ? (RELAY5_MQTT_TOPIC_SUB) : + (index == 5) ? (RELAY6_MQTT_TOPIC_SUB) : + (index == 6) ? (RELAY7_MQTT_TOPIC_SUB) : + (index == 7) ? (RELAY8_MQTT_TOPIC_SUB) : "" + ); +} + +constexpr const char* _relayMqttTopicPub(unsigned char index) { + return ( + (index == 0) ? (RELAY1_MQTT_TOPIC_PUB) : + (index == 1) ? (RELAY2_MQTT_TOPIC_PUB) : + (index == 2) ? (RELAY3_MQTT_TOPIC_PUB) : + (index == 3) ? (RELAY4_MQTT_TOPIC_PUB) : + (index == 4) ? (RELAY5_MQTT_TOPIC_PUB) : + (index == 5) ? (RELAY6_MQTT_TOPIC_PUB) : + (index == 6) ? (RELAY7_MQTT_TOPIC_PUB) : + (index == 7) ? (RELAY8_MQTT_TOPIC_PUB) : "" + ); +} + +constexpr PayloadStatus _relayMqttDisconnectionStatus(unsigned char index) { + return ( + (index == 0) ? (RELAY1_MQTT_DISCONNECT_STATUS) : + (index == 1) ? (RELAY2_MQTT_DISCONNECT_STATUS) : + (index == 2) ? (RELAY3_MQTT_DISCONNECT_STATUS) : + (index == 3) ? (RELAY4_MQTT_DISCONNECT_STATUS) : + (index == 4) ? (RELAY5_MQTT_DISCONNECT_STATUS) : + (index == 5) ? (RELAY6_MQTT_DISCONNECT_STATUS) : + (index == 6) ? (RELAY7_MQTT_DISCONNECT_STATUS) : + (index == 7) ? (RELAY8_MQTT_DISCONNECT_STATUS) : RELAY_MQTT_DISCONNECT_NONE ); } diff --git a/code/html/index.html b/code/html/index.html index 4573d0f8..5f7f78d6 100644 --- a/code/html/index.html +++ b/code/html/index.html @@ -2138,23 +2138,27 @@
-
-
+
+
-
-
+ +
+
+
- + + + +