Browse Source

relay: mqtt events and group topics refactoring

- fix relay-id check breaking group topics
- group -> pub and sub topics
- wildcard in subscription topic will be properly handled
- make sure on disconnect event only triggers when mqtt is changing
state from connected to disconnected, don't trigger every failed re-try
- replace receive-only mode with separate sub and pub topics
- some more build time settings (and some... questionable code to handle that)
dev
Maxim Prokhorov 3 years ago
parent
commit
dcc423ecaf
7 changed files with 609 additions and 248 deletions
  1. +100
    -0
      code/espurna/config/defaults.h
  2. +8
    -0
      code/espurna/config/general.h
  3. +8
    -3
      code/espurna/config/types.h
  4. +421
    -231
      code/espurna/relay.cpp
  5. +5
    -3
      code/espurna/relay.h
  6. +53
    -1
      code/espurna/relay_config.h
  7. +14
    -10
      code/html/index.html

+ 100
- 0
code/espurna/config/defaults.h View File

@ -887,6 +887,106 @@
#define RELAY8_PULSE_TIME RELAY_PULSE_TIME
#endif
#ifndef RELAY1_MQTT_TOPIC_SUB
#define RELAY1_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY2_MQTT_TOPIC_SUB
#define RELAY2_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY3_MQTT_TOPIC_SUB
#define RELAY3_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY4_MQTT_TOPIC_SUB
#define RELAY4_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY5_MQTT_TOPIC_SUB
#define RELAY5_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY6_MQTT_TOPIC_SUB
#define RELAY6_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY7_MQTT_TOPIC_SUB
#define RELAY7_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY8_MQTT_TOPIC_SUB
#define RELAY8_MQTT_TOPIC_SUB ""
#endif
#ifndef RELAY1_MQTT_TOPIC_PUB
#define RELAY1_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY2_MQTT_TOPIC_PUB
#define RELAY2_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY3_MQTT_TOPIC_PUB
#define RELAY3_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY4_MQTT_TOPIC_PUB
#define RELAY4_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY5_MQTT_TOPIC_PUB
#define RELAY5_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY6_MQTT_TOPIC_PUB
#define RELAY6_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY7_MQTT_TOPIC_PUB
#define RELAY7_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY8_MQTT_TOPIC_PUB
#define RELAY8_MQTT_TOPIC_PUB ""
#endif
#ifndef RELAY1_MQTT_TOPIC_MODE
#define RELAY1_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY2_MQTT_TOPIC_MODE
#define RELAY2_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY3_MQTT_TOPIC_MODE
#define RELAY3_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY4_MQTT_TOPIC_MODE
#define RELAY4_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY5_MQTT_TOPIC_MODE
#define RELAY5_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY6_MQTT_TOPIC_MODE
#define RELAY6_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY7_MQTT_TOPIC_MODE
#define RELAY7_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY8_MQTT_TOPIC_MODE
#define RELAY8_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_MODE
#endif
#ifndef RELAY1_MQTT_DISCONNECT_STATUS
#define RELAY1_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY2_MQTT_DISCONNECT_STATUS
#define RELAY2_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY3_MQTT_DISCONNECT_STATUS
#define RELAY3_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY4_MQTT_DISCONNECT_STATUS
#define RELAY4_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY5_MQTT_DISCONNECT_STATUS
#define RELAY5_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY6_MQTT_DISCONNECT_STATUS
#define RELAY6_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY7_MQTT_DISCONNECT_STATUS
#define RELAY7_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
#ifndef RELAY8_MQTT_DISCONNECT_STATUS
#define RELAY8_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_STATUS
#endif
// -----------------------------------------------------------------------------
// LEDs
// -----------------------------------------------------------------------------


+ 8
- 0
code/espurna/config/general.h View File

@ -457,6 +457,14 @@
#define RELAY_MQTT_TOGGLE "2"
#endif
#ifndef RELAY_MQTT_TOPIC_MODE
#define RELAY_MQTT_TOPIC_MODE RELAY_MQTT_TOPIC_NORMAL
#endif
#ifndef RELAY_MQTT_DISCONNECT_STATUS
#define RELAY_MQTT_DISCONNECT_STATUS RELAY_MQTT_DISCONNECT_NONE
#endif
//------------------------------------------------------------------------------
// BUTTON
//------------------------------------------------------------------------------


+ 8
- 3
code/espurna/config/types.h View File

@ -122,9 +122,14 @@
#define RFB_PROVIDER_RCSWITCH 0
#define RFB_PROVIDER_EFM8BB1 1
#define RELAY_GROUP_SYNC_NORMAL 0
#define RELAY_GROUP_SYNC_INVERSE 1
#define RELAY_GROUP_SYNC_RECEIVEONLY 2
#define RELAY_MQTT_TOPIC_NORMAL RelayMqttTopicMode::Normal
#define RELAY_MQTT_TOPIC_INVERSE RelayMqttTopicMode::Inverse
#define RELAY_MQTT_TOPIC_RECEIVE_ONLY RelayMqttTopicMode::ReceiveOnly
#define RELAY_MQTT_DISCONNECT_NONE PayloadStatus::Unknown
#define RELAY_MQTT_DISCONNECT_ON PayloadStatus::On
#define RELAY_MQTT_DISCONNECT_OFF PayloadStatus::Off
#define RELAY_MQTT_DISCONNECT_TOGGLE PayloadStatus::Toggle
#define RELAY_LOCK_DISABLED RelayLock::None
#define RELAY_LOCK_NONE RelayLock::None


+ 421
- 231
code/espurna/relay.cpp View File

@ -148,6 +148,32 @@ const char* _relayLockToPayload(RelayLock lock) {
namespace settings {
namespace internal {
template <>
PayloadStatus convert(const String& value) {
auto status = static_cast<PayloadStatus>(value.toInt());
switch (status) {
case PayloadStatus::Off:
case PayloadStatus::On:
case PayloadStatus::Toggle:
case PayloadStatus::Unknown:
return status;
}
return PayloadStatus::Unknown;
}
template <>
RelayMqttTopicMode convert(const String& value) {
auto mode = static_cast<RelayMqttTopicMode>(value.toInt());
switch (mode) {
case RelayMqttTopicMode::Normal:
case RelayMqttTopicMode::Inverse:
return mode;
}
return RelayMqttTopicMode::Normal;
}
template <>
RelayPulse convert(const String& value) {
return _relayPayloadToTristate<RelayPulse>(value.c_str());
@ -267,6 +293,10 @@ RelayStatusCallback _relay_status_change { nullptr };
bool _relay_report_ws = false;
void _relayWsReport() {
_relay_report_ws = true;
}
#endif // WEB_SUPPORT
#if MQTT_SUPPORT || API_SUPPORT
@ -642,15 +672,28 @@ bool _relayHandlePulsePayload(unsigned char id, const String& payload) {
return _relayHandlePulsePayload(id, payload.c_str());
}
PayloadStatus _relayStatusInvert(PayloadStatus status) {
return (status == PayloadStatus::On) ? PayloadStatus::Off : status;
PayloadStatus _relayInvertStatus(PayloadStatus status) {
switch (status) {
case PayloadStatus::On:
return PayloadStatus::Off;
case PayloadStatus::Off:
return PayloadStatus::On;
case PayloadStatus::Toggle:
case PayloadStatus::Unknown:
break;
}
return PayloadStatus::Unknown;
}
PayloadStatus _relayStatusTyped(unsigned char id) {
if (id >= _relays.size()) return PayloadStatus::Off;
PayloadStatus _relayPayloadStatus(unsigned char id) {
if (id < _relays.size()) {
return _relays[id].current_status
? PayloadStatus::On
: PayloadStatus::Off;
}
const bool status = _relays[id].current_status;
return (status) ? PayloadStatus::On : PayloadStatus::Off;
return PayloadStatus::Unknown;
}
void _relayLockAll() {
@ -707,7 +750,7 @@ void _relaySyncUnlock() {
auto action = []() {
_relayUnlockAll();
#if WEB_SUPPORT
_relay_report_ws = true;
_relayWsReport();
#endif
};
@ -718,83 +761,6 @@ void _relaySyncUnlock() {
}
}
// -----------------------------------------------------------------------------
// RELAY PROVIDERS
// -----------------------------------------------------------------------------
/**
* Walks the relay vector processing only those relays
* that have to change to the requested mode
* @bool mode Requested mode
*/
void _relayProcess(bool mode) {
bool changed = false;
for (unsigned char id = 0; id < _relays.size(); id++) {
bool target = _relays[id].target_status;
// Only process the relays we have to change
if (target == _relays[id].current_status) continue;
// Only process the relays we have to change to the requested mode
if (target != mode) continue;
// Only process if the change delay has expired
if (_relays[id].change_delay && (millis() - _relays[id].change_start < _relays[id].change_delay)) continue;
// Purge existing delay in case of cancelation
_relays[id].change_delay = 0;
changed = true;
DEBUG_MSG_P(PSTR("[RELAY] #%d set to %s\n"), id, target ? "ON" : "OFF");
// Call the provider to perform the action
_relays[id].current_status = target;
_relays[id].provider->change(target);
if (_relay_status_change) {
_relay_status_change(id, target);
}
// Send to Broker
#if BROKER_SUPPORT
StatusBroker::Publish(MQTT_TOPIC_RELAY, id, target);
#endif
// Send MQTT
#if MQTT_SUPPORT
relayMQTT(id);
#endif
#if WEB_SUPPORT
_relay_report_ws = true;
#endif
if (!_relayRecursive) {
relayPulse(id);
// We will trigger a eeprom save only if
// we care about current relay status on boot
const auto boot_mode = getSetting({"relayBoot", id}, _relayBootMode(id));
const bool save_eeprom = ((RELAY_BOOT_SAME == boot_mode) || (RELAY_BOOT_TOGGLE == boot_mode));
_relay_save_timer.once_ms(RELAY_SAVE_DELAY, relaySave, save_eeprom);
}
_relays[id].report = false;
_relays[id].group_report = false;
}
// Whenever we are using sync modes and any relay had changed the state, check if we can unlock
const bool needs_unlock = ((_relay_sync_mode == RELAY_SYNC_NONE_OR_ONE) || (_relay_sync_mode == RELAY_SYNC_ONE));
if (_relay_sync_locked && needs_unlock && changed) {
_relaySyncUnlock();
}
}
// -----------------------------------------------------------------------------
// RELAY
// -----------------------------------------------------------------------------
@ -1080,29 +1046,57 @@ unsigned char relayCount() {
}
PayloadStatus relayParsePayload(const char * payload) {
#if MQTT_SUPPORT || API_SUPPORT
return rpcParsePayload(payload, [](const char* payload) {
if (_relay_rpc_payload_off.equals(payload)) return PayloadStatus::Off;
if (_relay_rpc_payload_on.equals(payload)) return PayloadStatus::On;
if (_relay_rpc_payload_toggle.equals(payload)) return PayloadStatus::Toggle;
return PayloadStatus::Unknown;
});
#else
return rpcParsePayload(payload);
#endif
#if MQTT_SUPPORT || API_SUPPORT
return rpcParsePayload(payload, [](const char* payload) {
if (_relay_rpc_payload_off.equals(payload)) {
return PayloadStatus::Off;
} else if (_relay_rpc_payload_on.equals(payload)) {
return PayloadStatus::On;
} else if (_relay_rpc_payload_toggle.equals(payload)) {
return PayloadStatus::Toggle;
}
return PayloadStatus::Unknown;
});
#else
return rpcParsePayload(payload);
#endif
}
void _relaySettingsMigrate(int version) {
if (!version || (version >= 5)) {
return;
}
if (version && (version < 5)) {
// just a rename
moveSetting("relayDelayInterlock", "relayIlkDelay");
// groups use a new set of keys
for (unsigned char index = 0; index < RelaysMax; ++index) {
auto group = getSetting({"mqttGroup", index});
if (!group.length()) {
break;
}
delSettingPrefix({
"relayGPIO",
"relayProvider",
"relayType",
});
delSetting("relays");
auto syncKey = SettingsKey("mqttGroupSync", index);
auto sync = getSetting(syncKey);
setSetting({"relayTopicSub", index}, group);
if (sync.length()) {
if (sync != "2") { // aka RECEIVE_ONLY
setSetting("relayTopicMode", sync);
setSetting("relayTopicPub", group);
}
}
}
delSettingPrefix({
"mqttGroup", // migrated to relayTopic
"mqttGroupSync", // migrated to relayTopic
"relayOnDisc", // replaced with relayMqttDisc
"relayGPIO", // avoid depending on migrate.ino
"relayProvider", // different type
"relayType", // different type
});
delSetting("relays"); // does not do anything
}
}
void _relayBoot(unsigned char index, const RelayMaskHelper& mask) {
@ -1187,7 +1181,7 @@ void _relayConfigure() {
_relay_flood_window = (1000 * getSetting("relayFloodTime", RELAY_FLOOD_WINDOW));
_relay_flood_changes = getSetting("relayFloodChanges", RELAY_FLOOD_CHANGES);
_relay_delay_interlock = getSetting("relayDelayInterlock", RELAY_DELAY_INTERLOCK);
_relay_delay_interlock = getSetting("relayIlkDelay", RELAY_DELAY_INTERLOCK);
_relay_sync_mode = getSetting("relaySync", RELAY_SYNC);
#if MQTT_SUPPORT || API_SUPPORT
@ -1224,47 +1218,62 @@ void _relayWebSocketUpdate(JsonObject& root) {
}
void _relayWebSocketSendRelays(JsonObject& root) {
if (!relayCount()) {
return;
}
JsonObject& config = root.createNestedObject("relayConfig");
config["size"] = relayCount();
config["start"] = 0;
const char* keys[] = {
"prov", "name", "boot", "pulse", "pulse_time"
};
JsonArray& schema = config.createNestedArray("schema");
schema.copyFrom(keys, sizeof(keys) / sizeof(*keys));
#if SCHEDULER_SUPPORT
schema.add("sch_last");
#endif
#if MQTT_SUPPORT
schema.add("group");
schema.add("group_sync");
schema.add("on_disc");
#endif
{
const char* schema_keys[] = {
"prov",
"name",
"boot",
#if SCHEDULER_SUPPORT
"sch_last",
#endif
#if MQTT_SUPPORT
"topic_pub",
"topic_sub",
"topic_mode",
"mqtt_disc",
#endif
"pulse",
"pulse_time"
};
JsonArray& relays = config.createNestedArray("relays");
JsonArray& schema = config.createNestedArray("schema");
schema.copyFrom(schema_keys, sizeof(schema_keys) / sizeof(*schema_keys));
}
for (unsigned char id = 0; id < relayCount(); ++id) {
JsonArray& relay = relays.createNestedArray();
relay.add(_relays[id].provider->id());
relay.add(getSetting({"relayName", id}));
relay.add(getSetting({"relayBoot", id}, _relayBootMode(id)));
{
JsonArray& relays = config.createNestedArray("relays");
relay.add(static_cast<uint8_t>(_relays[id].pulse));
relay.add(_relays[id].pulse_ms / 1000.0);
for (unsigned char id = 0; id < relayCount(); ++id) {
JsonArray& relay = relays.createNestedArray();
relay.add(_relays[id].provider->id());
relay.add(getSetting({"relayName", id}));
relay.add(getSetting({"relayBoot", id}, _relayBootMode(id)));
#if SCHEDULER_SUPPORT
#if SCHEDULER_SUPPORT
relay.add(getSetting({"relayLastSch", id}, SCHEDULER_RESTORE_LAST_SCHEDULE));
#endif
#endif
#if MQTT_SUPPORT
relay.add(getSetting({"mqttGroup", id}));
relay.add(getSetting({"mqttGroupSync", id}, 0));
relay.add(getSetting({"relayOnDisc", id}, 0));
#endif
#if MQTT_SUPPORT
relay.add(getSetting({"relayTopicSub", id}, _relayMqttTopicSub(id)));
relay.add(getSetting({"relayTopicPub", id}, _relayMqttTopicPub(id)));
relay.add(static_cast<int>(getSetting({"relayTopicMode", id},
_relayMqttTopicMode(id))));
relay.add(static_cast<int>(getSetting({"relayMqttDisc", id},
_relayMqttDisconnectionStatus(id))));
#endif
relay.add(static_cast<uint8_t>(_relays[id].pulse));
relay.add(_relays[id].pulse_ms / 1000.0);
}
}
}
@ -1274,19 +1283,14 @@ void _relayWebSocketOnVisible(JsonObject& root) {
if (relayCount() > 1) {
root["multirelayVisible"] = 1;
root["relaySync"] = getSetting("relaySync", RELAY_SYNC);
root["relayDelayInterlock"] = getSetting("relayDelayInterlock", RELAY_DELAY_INTERLOCK);
root["relayIlkDelay"] = getSetting("relayIlkDelay", RELAY_DELAY_INTERLOCK);
}
root["relayVisible"] = 1;
}
void _relayWebSocketOnConnected(JsonObject& root) {
if (relayCount() == 0) return;
// Per-relay configuration
_relayWebSocketSendRelays(root);
}
void _relayWebSocketOnAction(uint32_t client_id, const char * action, JsonObject& data) {
@ -1419,39 +1423,150 @@ const char* relayPayload(PayloadStatus status) {
#if MQTT_SUPPORT
void _relayMQTTGroup(unsigned char id) {
const String topic = getSetting({"mqttGroup", id});
if (!topic.length()) return;
struct RelayCustomTopic {
RelayCustomTopic() = delete;
RelayCustomTopic(const RelayCustomTopic&) = delete;
RelayCustomTopic(RelayCustomTopic&&) = delete;
const auto mode = getSetting({"mqttGroupSync", id}, RELAY_GROUP_SYNC_NORMAL);
if (mode == RELAY_GROUP_SYNC_RECEIVEONLY) return;
template <typename T>
RelayCustomTopic(unsigned char id, T&& topic, RelayMqttTopicMode mode) :
_id(id),
_topic(std::forward<T>(topic)),
_parts(_topic),
_mode(mode)
{}
auto status = _relayStatusTyped(id);
if (mode == RELAY_GROUP_SYNC_INVERSE) status = _relayStatusInvert(status);
mqttSendRaw(topic.c_str(), relayPayload(status));
}
unsigned char id() const {
return _id;
}
void relayMQTT(unsigned char id) {
const String& topic() const {
return _topic;
}
if (id >= _relays.size()) return;
const PathParts& parts() const {
return _parts;
}
// Send state topic
if (_relays[id].report) {
_relays[id].report = false;
mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayStatusTyped(id)));
const RelayMqttTopicMode mode() const {
return _mode;
}
// Check group topic
if (_relays[id].group_report) {
_relays[id].group_report = false;
_relayMQTTGroup(id);
bool match(const String& other) const {
PathParts parts(other);
return _parts.match(parts);
}
bool match(const PathParts& parts) const {
return _parts.match(parts);
}
private:
unsigned char _id;
String _topic;
PathParts _parts;
RelayMqttTopicMode _mode;
};
std::forward_list<RelayCustomTopic> _relay_custom_topics;
// TODO: it *will* handle the duplicates, but we waste memory storing them
// TODO: mqttSubscribe(...) also happens multiple times
//
// this is not really an intended use-case though, but it is techically possible...
void _relayMqttSubscribeCustomTopics() {
const size_t relays { relayCount() };
if (!relays) {
return;
}
struct CustomTopic {
String value;
RelayMqttTopicMode mode;
};
std::vector<CustomTopic> topics;
topics.reserve(relays);
for (unsigned char id = 0; id < relays; ++id) {
topics[id].value = _relayMqttTopicSub(id);
topics[id].mode = _relayMqttTopicMode(id);
}
settings::kv_store.foreach([&](settings::kvs_type::KeyValueResult&& kv) {
const char* const SubPrefix = "relayTopicSub";
const char* const ModePrefix = "relayTopicMode";
if ((kv.key.length <= strlen(SubPrefix))
&& (kv.key.length <= strlen(ModePrefix))) {
return;
}
if (!kv.value.length) {
return;
}
const auto key = kv.key.read();
unsigned char id;
if (key.startsWith(SubPrefix)) {
if (_relayTryParseId(key.c_str() + strlen(SubPrefix), id)) {
topics[id].value = std::move(kv.value.read());
}
} else if (key.startsWith(ModePrefix)) {
using namespace settings::internal;
if (_relayTryParseId(key.c_str() + strlen(ModePrefix), id)) {
topics[id].mode = convert<RelayMqttTopicMode>(kv.value.read());
}
}
});
_relay_custom_topics.clear();
for (unsigned char id = 0; id < relays; ++id) {
auto& topic = topics[id];
if (!topic.value.length()) {
continue;
}
mqttSubscribeRaw(topic.value.c_str());
_relay_custom_topics.emplace_front(id, std::move(topic.value), topic.mode);
}
}
void _relayMqttPublishCustomTopic(unsigned char id) {
const String topic = getSetting({"relayTopicPub", id}, _relayMqttTopicPub(id));
if (!topic.length()) {
return;
}
auto status = _relayPayloadStatus(id);
auto mode = getSetting({"relayTopicMode", id}, _relayMqttTopicMode(id));
if (mode == RelayMqttTopicMode::Inverse) {
status = _relayInvertStatus(status);
}
mqttSendRaw(topic.c_str(), relayPayload(status));
}
void relayMQTT() {
void _relayMqttReport(unsigned char id) {
if (id < _relays.size()) {
if (_relays[id].report) {
_relays[id].report = false;
mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayPayloadStatus(id)));
}
if (_relays[id].group_report) {
_relays[id].group_report = false;
_relayMqttPublishCustomTopic(id);
}
}
}
void _relayMqttReportAll() {
for (unsigned int id=0; id < _relays.size(); id++) {
mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayStatusTyped(id)));
mqttSend(MQTT_TOPIC_RELAY, id, relayPayload(_relayPayloadStatus(id)));
}
}
@ -1474,20 +1589,56 @@ void relayStatusWrap(unsigned char id, PayloadStatus value, bool is_group_topic)
case PayloadStatus::Unknown:
default:
_relays[id].report = true;
relayMQTT(id);
_relayMqttReport(id);
break;
}
}
bool _relayMqttHeartbeat(heartbeat::Mask mask) {
if (mask & heartbeat::Report::Relay)
relayMQTT();
_relayMqttReportAll();
return mqttConnected();
}
void _relayMqttHandleCustomTopic(const String& topic, const char* payload) {
PathParts received(topic);
for (auto& topic : _relay_custom_topics) {
if (topic.match(received)) {
auto status = relayParsePayload(payload);
if (topic.mode() == RelayMqttTopicMode::Inverse) {
status = _relayInvertStatus(status);
}
const auto id = topic.id();
_relayHandleStatus(id, status);
_relays[id].group_report = false;
}
}
}
void _relayMqttHandleDisconnect() {
settings::kv_store.foreach([](settings::kvs_type::KeyValueResult&& kv) {
const char* const prefix = "relayMqttDisc";
if (kv.key.length <= strlen(prefix)) {
return;
}
const auto key = kv.key.read();
if (key.startsWith(prefix)) {
unsigned char id;
if (_relayTryParseId(key.c_str() + strlen(prefix), id)) {
const auto value = kv.value.read();
_relayHandleStatus(id, relayParsePayload(value.c_str()));
}
}
});
}
void relayMQTTCallback(unsigned int type, const char * topic, const char * payload) {
static bool connected { false };
if (!relayCount()) {
return;
}
@ -1503,82 +1654,46 @@ void relayMQTTCallback(unsigned int type, const char * topic, const char * paylo
snprintf_P(pulse_topic, sizeof(pulse_topic), PSTR("%s/+"), MQTT_TOPIC_PULSE);
mqttSubscribe(pulse_topic);
// Subscribe to group topics
for (unsigned char i=0; i < _relays.size(); i++) {
const auto t = getSetting({"mqttGroup", i});
if (t.length() > 0) mqttSubscribeRaw(t.c_str());
}
_relayMqttSubscribeCustomTopics();
connected = true;
return;
}
if (type == MQTT_MESSAGE_EVENT) {
String t = mqttMagnitude((char *) topic);
unsigned char id;
if (!_relayTryParseIdFromPath(t.c_str(), id)) {
return;
}
if (t.startsWith(MQTT_TOPIC_PULSE)) {
_relayHandlePulsePayload(id, payload);
_relays[id].report = mqttForward();
return;
}
if (t.startsWith(MQTT_TOPIC_RELAY)) {
_relayHandlePayload(id, payload);
_relays[id].report = mqttForward();
return;
}
// TODO: cache group topics instead of reading settings each time?
// TODO: this is another kvs::foreach case, since we slow down MQTT when settings grow
for (unsigned char i=0; i < _relays.size(); i++) {
const String t = getSetting({"mqttGroup", i});
if (!t.length()) break;
if (t == topic) {
auto value = relayParsePayload(payload);
if (value == PayloadStatus::Unknown) return;
if ((value == PayloadStatus::On) || (value == PayloadStatus::Off)) {
if (getSetting({"mqttGroupSync", i}, RELAY_GROUP_SYNC_NORMAL) == RELAY_GROUP_SYNC_INVERSE) {
value = _relayStatusInvert(value);
}
}
auto is_relay = t.startsWith(MQTT_TOPIC_RELAY);
auto is_pulse = t.startsWith(MQTT_TOPIC_PULSE);
if (is_relay || is_pulse) {
unsigned char id;
if (!_relayTryParseIdFromPath(t.c_str(), id)) {
return;
}
DEBUG_MSG_P(PSTR("[RELAY] Matched group topic for relayID %d\n"), i);
_relayHandleStatus(i, value);
_relays[i].group_report = false;
if (is_relay) {
_relayHandlePayload(id, payload);
_relays[id].report = mqttForward();
return;
}
if (is_pulse) {
_relayHandlePulsePayload(id, payload);
_relays[id].report = mqttForward();
return;
}
}
_relayMqttHandleCustomTopic(topic, payload);
return;
}
// TODO: safeguard against network issues. this one has good intentions, but we may end up
// switching relays back and forth when connection is unstable but reconnects very fast after the failure
if (type == MQTT_DISCONNECT_EVENT) {
for (unsigned char i=0; i < _relays.size(); i++) {
const auto reaction = getSetting({"relayOnDisc", i}, 0);
bool status;
switch (reaction) {
case 1:
status = false;
break;
case 2:
status = true;
break;
default:
return;
}
DEBUG_MSG_P(PSTR("[RELAY] Turn %s relay #%u due to MQTT disconnection\n"), status ? "ON" : "OFF", i);
relayStatus(i, status);
if (connected) {
connected = false;
_relayMqttHandleDisconnect();
}
return;
}
}
@ -1666,6 +1781,81 @@ void _relayInitCommands() {
#endif // TERMINAL_SUPPORT
//------------------------------------------------------------------------------
void _relayReport(unsigned char id [[gnu::unused]], bool status [[gnu::unused]]) {
#if BROKER_SUPPORT
StatusBroker::Publish(MQTT_TOPIC_RELAY, id, status);
#endif
#if MQTT_SUPPORT
_relayMqttReport(id);
#endif
#if WEB_SUPPORT
_relayWsReport();
#endif
}
/**
* Walks the relay vector processing only those relays
* that have to change to the requested mode
* @bool mode Requested mode
*/
void _relayProcess(bool mode) {
bool changed = false;
for (unsigned char id = 0; id < _relays.size(); id++) {
bool target = _relays[id].target_status;
// Only process the relays we have to change
if (target == _relays[id].current_status) continue;
// Only process the relays we have to change to the requested mode
if (target != mode) continue;
// Only process if the change delay has expired
if (_relays[id].change_delay && (millis() - _relays[id].change_start < _relays[id].change_delay)) continue;
// Purge existing delay in case of cancelation
_relays[id].change_delay = 0;
changed = true;
DEBUG_MSG_P(PSTR("[RELAY] #%d set to %s\n"), id, target ? "ON" : "OFF");
// Call the provider to perform the action
_relays[id].current_status = target;
_relays[id].provider->change(target);
if (_relay_status_change) {
_relay_status_change(id, target);
}
_relayReport(id, target);
if (!_relayRecursive) {
relayPulse(id);
// We will trigger a eeprom save only if
// we care about current relay status on boot
const auto boot_mode = getSetting({"relayBoot", id}, _relayBootMode(id));
const bool save_eeprom = ((RELAY_BOOT_SAME == boot_mode) || (RELAY_BOOT_TOGGLE == boot_mode));
_relay_save_timer.once_ms(RELAY_SAVE_DELAY, relaySave, save_eeprom);
}
_relays[id].report = false;
_relays[id].group_report = false;
}
// Whenever we are using sync modes and any relay had changed the state, check if we can unlock
const bool needs_unlock = ((_relay_sync_mode == RELAY_SYNC_NONE_OR_ONE) || (_relay_sync_mode == RELAY_SYNC_ONE));
if (_relay_sync_locked && needs_unlock && changed) {
_relaySyncUnlock();
}
}
//------------------------------------------------------------------------------
// Setup
//------------------------------------------------------------------------------
@ -1673,12 +1863,12 @@ void _relayInitCommands() {
void _relayLoop() {
_relayProcess(false);
_relayProcess(true);
#if WEB_SUPPORT
if (_relay_report_ws) {
wsPost(_relayWebSocketUpdate);
_relay_report_ws = false;
}
#endif
#if WEB_SUPPORT
if (_relay_report_ws) {
wsPost(_relayWebSocketUpdate);
_relay_report_ws = false;
}
#endif
}
// Dummy relays for virtual light switches (hardware-less), Sonoff Dual, Sonoff RF Bridge and Tuya


+ 5
- 3
code/espurna/relay.h View File

@ -32,6 +32,11 @@ enum class RelayType : int {
LatchedInverse
};
enum class RelayMqttTopicMode : int {
Normal,
Inverse
};
enum class RelayProvider: int {
None,
Dummy,
@ -85,9 +90,6 @@ const String& relayPayloadToggle();
const char* relayPayload(PayloadStatus status);
void relayMQTT(unsigned char id);
void relayMQTT();
void relayPulse(unsigned char id);
void relaySync(unsigned char id);
void relaySave(bool persist);


+ 53
- 1
code/espurna/relay_config.h View File

@ -134,6 +134,58 @@ constexpr RelayProvider _relayProvider(unsigned char index) {
(index == 4) ? (RELAY5_PROVIDER) :
(index == 5) ? (RELAY6_PROVIDER) :
(index == 6) ? (RELAY7_PROVIDER) :
(index == 7) ? (RELAY8_PROVIDER) : RelayProvider::None
(index == 7) ? (RELAY8_PROVIDER) : RELAY_PROVIDER_NONE
);
}
constexpr RelayMqttTopicMode _relayMqttTopicMode(unsigned char index) {
return (
(index == 0) ? (RELAY1_MQTT_TOPIC_MODE) :
(index == 1) ? (RELAY2_MQTT_TOPIC_MODE) :
(index == 2) ? (RELAY3_MQTT_TOPIC_MODE) :
(index == 3) ? (RELAY4_MQTT_TOPIC_MODE) :
(index == 4) ? (RELAY5_MQTT_TOPIC_MODE) :
(index == 5) ? (RELAY6_MQTT_TOPIC_MODE) :
(index == 6) ? (RELAY7_MQTT_TOPIC_MODE) :
(index == 7) ? (RELAY8_MQTT_TOPIC_MODE) : RELAY_MQTT_TOPIC_MODE
);
}
constexpr const char* _relayMqttTopicSub(unsigned char index) {
return (
(index == 0) ? (RELAY1_MQTT_TOPIC_SUB) :
(index == 1) ? (RELAY2_MQTT_TOPIC_SUB) :
(index == 2) ? (RELAY3_MQTT_TOPIC_SUB) :
(index == 3) ? (RELAY4_MQTT_TOPIC_SUB) :
(index == 4) ? (RELAY5_MQTT_TOPIC_SUB) :
(index == 5) ? (RELAY6_MQTT_TOPIC_SUB) :
(index == 6) ? (RELAY7_MQTT_TOPIC_SUB) :
(index == 7) ? (RELAY8_MQTT_TOPIC_SUB) : ""
);
}
constexpr const char* _relayMqttTopicPub(unsigned char index) {
return (
(index == 0) ? (RELAY1_MQTT_TOPIC_PUB) :
(index == 1) ? (RELAY2_MQTT_TOPIC_PUB) :
(index == 2) ? (RELAY3_MQTT_TOPIC_PUB) :
(index == 3) ? (RELAY4_MQTT_TOPIC_PUB) :
(index == 4) ? (RELAY5_MQTT_TOPIC_PUB) :
(index == 5) ? (RELAY6_MQTT_TOPIC_PUB) :
(index == 6) ? (RELAY7_MQTT_TOPIC_PUB) :
(index == 7) ? (RELAY8_MQTT_TOPIC_PUB) : ""
);
}
constexpr PayloadStatus _relayMqttDisconnectionStatus(unsigned char index) {
return (
(index == 0) ? (RELAY1_MQTT_DISCONNECT_STATUS) :
(index == 1) ? (RELAY2_MQTT_DISCONNECT_STATUS) :
(index == 2) ? (RELAY3_MQTT_DISCONNECT_STATUS) :
(index == 3) ? (RELAY4_MQTT_DISCONNECT_STATUS) :
(index == 4) ? (RELAY5_MQTT_DISCONNECT_STATUS) :
(index == 5) ? (RELAY6_MQTT_DISCONNECT_STATUS) :
(index == 6) ? (RELAY7_MQTT_DISCONNECT_STATUS) :
(index == 7) ? (RELAY8_MQTT_DISCONNECT_STATUS) : RELAY_MQTT_DISCONNECT_NONE
);
}

+ 14
- 10
code/html/index.html View File

@ -2138,23 +2138,27 @@
<div><input name="relayLastSch" data-settings-real-name="relayLastSch" type="checkbox" /></div>
</div>
<div class="pure-g module module-mqtt">
<div class="pure-u-1 pure-u-lg-1-4"><label>MQTT group</label></div>
<div class="pure-u-1 pure-u-lg-3-4"><input name="mqttGroup" class="pure-u-1" tabindex="0" data="0" action="reconnect" /></div>
<div class="pure-u-1 pure-u-lg-1-4"><label>MQTT topic subscription</label></div>
<div class="pure-u-1 pure-u-lg-3-4"><input name="relayTopicSub" class="pure-u-1" action="reconnect" /></div>
</div>
<div class="pure-g module module-mqtt">
<div class="pure-u-1 pure-u-lg-1-4"><label>MQTT group sync</label></div>
<select class="pure-u-1 pure-u-lg-3-4" name="mqttGroupSync">
<option value="0">Same</option>
<div class="pure-u-1 pure-u-lg-1-4"><label>MQTT topic publish</label></div>
<div class="pure-u-1 pure-u-lg-3-4"><input name="relayTopicPub" class="pure-u-1" action="reconnect" /></div>
</div>
<div class="pure-g module module-mqtt">
<div class="pure-u-1 pure-u-lg-1-4"><label>MQTT topic mode</label></div>
<select class="pure-u-1 pure-u-lg-3-4" name="relayTopicMode">
<option value="0">Normal</option>
<option value="1">Inverse</option>
<option value="2">Receive Only</option>
</select>
</div>
<div class="pure-g module module-mqtt">
<div class="pure-u-1 pure-u-lg-1-4"><label>On MQTT disconnect</label></div>
<select class="pure-u-1 pure-u-lg-3-4" name="relayOnDisc">
<option value="0">Don't change</option>
<option value="1">Turn the switch OFF</option>
<option value="2">Turn the switch ON</option>
<select class="pure-u-1 pure-u-lg-3-4" name="relayMqttDisc">
<option value="255">Do nothing</option>
<option value="0">Turn OFF</option>
<option value="1">Turn ON</option>
<option value="2">Toggle</option>
</select>
</div>
</div>


Loading…
Cancel
Save