From 8c68a72c8fc502259cc9de6ff5fbf88ce5ec6b51 Mon Sep 17 00:00:00 2001 From: Maxim Prokhorov Date: Wed, 27 Mar 2024 21:24:48 +0300 Subject: [PATCH] ha: wait for sensors and longer retries do not publish until sensor are actually ready to be read some refactoring of internal state to make ptr objects earlier --- code/espurna/homeassistant.cpp | 590 +++++++++++++++++++++++---------- code/espurna/sensor.cpp | 8 + code/espurna/sensor.h | 2 + 3 files changed, 434 insertions(+), 166 deletions(-) diff --git a/code/espurna/homeassistant.cpp b/code/espurna/homeassistant.cpp index 6a897f3e..f22347dc 100644 --- a/code/espurna/homeassistant.cpp +++ b/code/espurna/homeassistant.cpp @@ -179,12 +179,15 @@ struct ConfigStrings { String prefix; }; -ConfigStrings make_config_strings() { - return ConfigStrings{ - .name = normalize_ascii(systemHostname(), false), - .identifier = normalize_ascii(systemIdentifier(), true), - .prefix = settings::prefix(), - }; +using ConfigStringsPtr = std::unique_ptr; + +ConfigStringsPtr make_config_strings() { + return ConfigStringsPtr( + new ConfigStrings{ + .name = normalize_ascii(systemHostname(), false), + .identifier = normalize_ascii(systemIdentifier(), true), + .prefix = settings::prefix(), + }); } // 'build-time' strings, always the same for current build @@ -194,17 +197,17 @@ struct BuildStrings { String device; }; -BuildStrings make_build_strings() { - BuildStrings out; +using BuildStringsPtr = std::unique_ptr; +BuildStringsPtr make_build_strings() { const auto app = buildApp(); - out.version = String(app.version); - const auto hardware = buildHardware(); - out.manufacturer = String(hardware.manufacturer); - out.device = String(hardware.device); - - return out; + return BuildStringsPtr( + new BuildStrings{ + .version = app.version.toString(), + .manufacturer = hardware.manufacturer.toString(), + .device = hardware.device.toString(), + }); } class Device { @@ -229,9 +232,9 @@ public: Device(Device&&) = delete; Device& operator=(Device&&) = delete; - Device(ConfigStrings config, BuildStrings build) : - _config(std::make_unique(std::move(config))), - _build(std::make_unique(std::move(build))), + Device(ConfigStringsPtr config, BuildStringsPtr build) : + _config(std::move(config)), + _build(std::move(build)), _buffer(std::make_unique()), _root(_buffer->createObject()) { @@ -262,10 +265,7 @@ public: } private: - using ConfigStringsPtr = std::unique_ptr; ConfigStringsPtr _config; - - using BuildStringsPtr = std::unique_ptr; BuildStringsPtr _build; BufferPtr _buffer; @@ -339,8 +339,8 @@ String quote(String&& value) { || value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false") || value.equalsIgnoreCase("on") - || value.equalsIgnoreCase("off") - ) { + || value.equalsIgnoreCase("off")) + { String result; result.reserve(value.length() + 2); result += '"'; @@ -363,41 +363,34 @@ String quote(String&& value) { class Discovery { public: - virtual ~Discovery() { - } + virtual ~Discovery(); virtual bool ok() const = 0; + virtual const String& topic() = 0; virtual const String& message() = 0; + + virtual bool prepare(); + virtual bool ready() const; virtual bool next() = 0; }; -#if RELAY_SUPPORT +Discovery::~Discovery() = default; -struct RelayContext { - String availability; - String payload_available; - String payload_not_available; - String payload_on; - String payload_off; -}; +bool Discovery::prepare() { + return true; +} -RelayContext makeRelayContext() { - return { - mqttTopic(MQTT_TOPIC_STATUS), - quote(mqttPayloadStatus(true)), - quote(mqttPayloadStatus(false)), - quote(relayPayload(PayloadStatus::On).toString()), - quote(relayPayload(PayloadStatus::Off).toString()) - }; +bool Discovery::ready() const { + return true; } +#if RELAY_SUPPORT + class RelayDiscovery : public Discovery { public: explicit RelayDiscovery(Context& ctx) : - _ctx(ctx), - _relay(makeRelayContext()), - _relays(relayCount()) + _ctx(ctx) {} JsonObject& root() { @@ -408,15 +401,21 @@ public: return *_root; } + bool ready() const override { + return _ready; + } + bool ok() const override { - return (_relays > 0) - && (_index < _relays); + return _ready + && (_count > 0) + && (_index < _count); } const String& uniqueId() { if (!_unique_id.length()) { - _unique_id = _ctx.identifier() + '_' + F("relay") + '_' + _index; + _unique_id = _ctx.identifier() + '_' + F("relay") + '_' + String(_index, 10); } + return _unique_id; } @@ -427,6 +426,7 @@ public: _topic += uniqueId(); _topic += F("/config"); } + return _topic; } @@ -434,25 +434,37 @@ public: if (!_message.length()) { auto& json = root(); json[F("dev")] = _ctx.device(); - json[F("avty_t")] = _relay.availability.c_str(); - json[F("pl_avail")] = _relay.payload_available.c_str(); - json[F("pl_not_avail")] = _relay.payload_not_available.c_str(); - json[F("pl_on")] = _relay.payload_on.c_str(); - json[F("pl_off")] = _relay.payload_off.c_str(); + json[F("avty_t")] = _info->availability.c_str(); + json[F("pl_avail")] = _info->payload_available.c_str(); + json[F("pl_not_avail")] = _info->payload_not_available.c_str(); + json[F("pl_on")] = _info->payload_on.c_str(); + json[F("pl_off")] = _info->payload_off.c_str(); json[F("uniq_id")] = uniqueId(); - json[F("name")] = _ctx.name() + ' ' + _index; + json[F("name")] = _ctx.name() + ' ' + String(_index, 10); json[F("stat_t")] = mqttTopic(MQTT_TOPIC_RELAY, _index); json[F("cmd_t")] = mqttTopicSetter(MQTT_TOPIC_RELAY, _index); json.printTo(_message); } + return _message; } + bool prepare() override { + if (!_ready) { + _count = relayCount(); + _index = 0; + _info = _makeInfo(); + _ready = true; + } + + return _ready; + } + bool next() override { - if (_index < _relays) { + if (_index < _count) { auto current = _index; ++_index; - if ((_index > current) && (_index < _relays)) { + if ((_index > current) && (_index < _count)) { _unique_id = ""; _topic = ""; _message = ""; @@ -464,18 +476,42 @@ public: } private: + struct Info { + String availability; + String payload_available; + String payload_not_available; + String payload_on; + String payload_off; + }; + + using InfoPtr = std::unique_ptr; + InfoPtr _makeInfo(); + Context& _ctx; JsonObject* _root { nullptr }; - RelayContext _relay; - unsigned char _index { 0u }; - unsigned char _relays { 0u }; + InfoPtr _info; + size_t _index; + size_t _count; + + bool _ready { false }; String _unique_id; String _topic; String _message; }; +RelayDiscovery::InfoPtr RelayDiscovery::_makeInfo() { + return InfoPtr( + new Info{ + .availability = mqttTopic(MQTT_TOPIC_STATUS), + .payload_available = quote(mqttPayloadStatus(true)), + .payload_not_available = quote(mqttPayloadStatus(false)), + .payload_on = quote(relayPayload(PayloadStatus::On).toString()), + .payload_off = quote(relayPayload(PayloadStatus::Off).toString()), + }); +} + #endif // Example payload: @@ -512,8 +548,21 @@ public: return *_root; } + bool ready() const override { + return _ready; + } + + bool prepare() override { + if (!_ready) { + _count = lightChannels(); + _ready = true; + } + + return _ready; + } + bool ok() const override { - return (lightChannels() > 0); + return _ready && (_count > 0); } bool next() override { @@ -611,6 +660,9 @@ private: Context& _ctx; JsonObject* _root { nullptr }; + bool _ready { false }; + size_t _count; + String _unique_id; String _topic; String _message; @@ -759,13 +811,8 @@ void receiveLightJson(StringView payload) { class SensorDiscovery : public Discovery { public: explicit SensorDiscovery(Context& ctx) : - _ctx(ctx), - _magnitudes(magnitudeCount()) - { - if (_magnitudes > 0) { - _info = magnitudeInfo(_index); - } - } + _ctx(ctx) + {} JsonObject& root() { if (!_root) { @@ -775,9 +822,14 @@ public: return *_root; } + bool ready() const override { + return _ready; + } + bool ok() const override { - return (_magnitudes > 0) - && (_index < _magnitudes); + return _ready + && (_count > 0) + && (_index < _count); } const String& topic() override { @@ -815,8 +867,8 @@ public: return _name; } - unsigned char localId() const { - return _info.index; + String localId() const { + return String(_info.index, 10); } const String& uniqueId() { @@ -827,11 +879,29 @@ public: return _unique_id; } + bool prepare() override { + if (!_ready) { + _ready = sensorReady(); + if (!_ready) { + return false; + } + + _count = magnitudeCount(); + _index = 0; + + if (_count > 0) { + _info = magnitudeInfo(_index); + } + } + + return _ready; + } + bool next() override { - if (_index < _magnitudes) { + if (_index < _count) { auto current = _index; ++_index; - if ((_index > current) && (_index < _magnitudes)) { + if ((_index > current) && (_index < _count)) { _info = magnitudeInfo(_index); _unique_id = ""; _name = ""; @@ -848,9 +918,10 @@ private: Context& _ctx; JsonObject* _root { nullptr }; - unsigned char _magnitudes { 0u }; - unsigned char _index { 0u }; sensor::Info _info; + size_t _count; + size_t _index; + bool _ready { false }; String _unique_id; String _name; @@ -870,20 +941,164 @@ Context make_context() { return Context(make_device_ptr(), 2048); } -// Reworked discovery class. Try to send and wait for MQTT QoS 1 publish ACK to continue. +// use 5 retries and set a specific duration for each attempt +using Durations = std::array; + +#if __cplusplus >= 201703L +#define __CONSTEXPR constexpr +#else +#define __CONSTEXPR +#endif + +struct Wait { + using Value = typename Durations::value_type; + + __CONSTEXPR Wait(const Durations& base) : + _begin(std::begin(base)), + _end(std::end(base)), + _it(_begin) + {} + + void reset() { + _it = _begin; + } + + Value value() const { + return *_it; + } + + void change_next() { + _it = next_it(_it); + } + + bool try_next() { + auto current = _it; + change_next(); + return current != _it; + } + + Value next() { + change_next(); + return value(); + } + + __CONSTEXPR Value first() const { + return *_begin; + } + + bool is_first() const { + return _it == _begin; + } + + __CONSTEXPR Value last() const { + return *_end; + } + + bool is_last() const { + return _it == _end; + } + + __CONSTEXPR size_t count() const { + return _end - _begin; + } + +private: + using Iterator = Durations::const_iterator; + + Iterator next_it(Iterator value) { + const auto next = value + 1; + if (next < _end) { + return next; + } + + return value; + } + + Iterator _begin; + Iterator _end; + Iterator _it; +}; + +#undef __CONSTEXPR + +// intervals between send attempts, usually long enough to push data to the network stack +static constexpr Durations ShortDurations{{ + duration::Milliseconds{ 100 }, + duration::Milliseconds{ 500 }, + duration::Milliseconds{ 1000 }, + duration::Milliseconds{ 2500 }, + duration::Milliseconds{ 5000 }, +}}; + +// longer intervals between initialization attempts, give enough time for external things +static constexpr Durations LongDurations{{ + duration::Seconds{ 5 }, + duration::Seconds{ 10 }, + duration::Seconds{ 15 }, + duration::Seconds{ 30 }, + duration::Seconds{ 60 }, +}}; + +struct Result { + using Duration = duration::Milliseconds; + + enum class Value { + Error, + Ok, + Retry, + }; + + Result(Duration wait, Value value) : + _wait(wait), + _value(value) + {} + + explicit Result(Duration wait) : + Result(wait, Value::Ok) + {} + + Result() : + Result(ShortDurations.front()) + {} + + bool ok() const { + return _value == Value::Ok; + } + + bool retry() const { + return _value == Value::Retry; + } + + explicit operator bool() const { + return ok(); + } + + Duration wait() const { + return _wait; + } + +private: + Duration _wait; + Value _value; +}; + +Result next_retry(Wait& wait) { + const auto retry = wait.is_last() + ? Result::Value::Error + : Result::Value::Retry; + + const auto value = wait.value(); + wait.try_next(); + + return Result(value, retry); +} + // Topic and message are generated on demand and most of JSON payload is cached for re-use to save RAM. class DiscoveryTask { public: using Entity = std::unique_ptr; using Entities = std::forward_list; - static constexpr auto WaitRestart = duration::Seconds{ 30 }; - - static constexpr auto WaitShort = duration::Milliseconds{ 100 }; - static constexpr auto WaitLong = duration::Seconds{ 1 }; - - static constexpr int Retries { 5 }; - DiscoveryTask() = delete; DiscoveryTask(const DiscoveryTask&) = delete; @@ -893,8 +1108,8 @@ public: DiscoveryTask& operator=(DiscoveryTask&&) = delete; DiscoveryTask(Context ctx, State state) : - _state(state), - _ctx(std::move(ctx)) + _ctx(std::move(ctx)), + _state(state) {} void add(Entity&& entity) { @@ -906,12 +1121,8 @@ public: _entities.push_front(std::make_unique(_ctx)); } - bool retry() { - if (_retry < 0) { - return false; - } - - return (--_retry > 0); + Result retry_send() { + return next_retry(_wait_short); } Context& context() { @@ -919,75 +1130,108 @@ public: } bool done() const { - return (_retry < 0) || _entities.empty(); + return _entities.empty(); } - bool ok() const { - if (!done()) { - for (auto& entity : _entities) { - if (!entity->ok()) { - return false; - } - } + State state() const { + return _state; + } - return true; - } + bool ok() const; - return false; + template + Result try_send_one(T&& action); + + Result prepare_all(); + +private: + Result next_send() { + _wait_short.reset(); + return Result(_wait_short.value()); } - template - bool send(T&& action) { - while (!_entities.empty()) { - auto& entity = _entities.front(); + Result stop_sending() { + return Result( + _wait_short.first(), + Result::Value::Error); + } + + Context _ctx; + + State _state; + Entities _entities; + + Wait _wait_short { ShortDurations }; + Wait _wait_long { LongDurations }; +}; + +bool DiscoveryTask::ok() const { + if (!done()) { + for (auto& entity : _entities) { if (!entity->ok()) { - _entities.pop_front(); - _ctx.reset(); - continue; + return false; } + } - const auto* topic = entity->topic().c_str(); - const auto* msg = (State::Enabled == _state) - ? entity->message().c_str() - : ""; + return true; + } - if (action(topic, msg)) { - if (!entity->next()) { - _retry = Retries; - _entities.pop_front(); - _ctx.reset(); - } - return true; - } + return false; +} - return false; +Result DiscoveryTask::prepare_all() { + bool prepared { true }; + for (auto& entity : _entities) { + if (!entity->prepare()) { + prepared = false; + break; } - - return false; } - State state() const { - return _state; + if (!prepared) { + return next_retry(_wait_long); } -private: - int _retry { Retries }; - State _state; + return Result(); +} - Entities _entities; - Context _ctx; -}; +template +Result DiscoveryTask::try_send_one(T&& action) { + auto it = _entities.begin(); -constexpr duration::Seconds DiscoveryTask::WaitRestart; + while (it != _entities.end()) { + if (!(*it)->ok()) { + it = _entities.erase_after( + _entities.before_begin()); + _ctx.reset(); + continue; + } -constexpr duration::Milliseconds DiscoveryTask::WaitShort; -constexpr duration::Seconds DiscoveryTask::WaitLong; + const auto* topic = (*it)->topic().c_str(); + const auto* msg = (State::Enabled == _state) + ? (*it)->message().c_str() + : ""; + + if (action(topic, msg)) { + if (!(*it)->next()) { + it = _entities.erase_after( + _entities.before_begin()); + _ctx.reset(); + } + + return next_send(); + } + + return retry_send(); + } + + return stop_sending(); +} using DiscoveryPtr = std::shared_ptr; using FlagPtr = std::shared_ptr; DiscoveryPtr makeDiscovery(State); -void restartDiscoveryForState(State); namespace internal { @@ -1008,54 +1252,73 @@ void schedule(duration::Milliseconds wait, DiscoveryPtr ptr, FlagPtr flag_ptr) { }); } +void stop() { + DEBUG_MSG_P(PSTR("[HA] Stopping discovery\n")); + internal::task.stop(); +} + void send(DiscoveryPtr discovery, FlagPtr flag_ptr) { if (!mqttConnected() || discovery->done()) { - DEBUG_MSG_P(PSTR("[HA] Stopping discovery\n")); - internal::task.stop(); + stop(); + return; + } + + auto ready = discovery->prepare_all(); + if (!ready) { + if (ready.retry()) { + DEBUG_MSG_P(PSTR("[HA] Discovery not ready, retrying in %zu (ms)\n"), + ready.wait().count()); + schedule(ready.wait(), discovery, flag_ptr); + } else { + stop(); + } + return; } auto& flag = *flag_ptr; if (!flag) { - if (discovery->retry()) { - schedule(DiscoveryTask::WaitShort, discovery, flag_ptr); + const auto next_send = discovery->retry_send(); + if (next_send.retry()) { + schedule(next_send.wait(), discovery, flag_ptr); } else { - restartDiscoveryForState(discovery->state()); + stop(); } return; } uint16_t pid { 0u }; - const auto res = discovery->send([&](const char* topic, const char* message) { - pid = ::mqttSendRaw(topic, message, internal::retain, 1); - return pid > 0; - }); + const auto sent = discovery->try_send_one( + [&](const char* topic, const char* message) { + pid = ::mqttSendRaw(topic, message, internal::retain, 1); + return pid > 0; + }); #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT - // - async fails when disconneted and when it's buffers are filled, which should be resolved after $LATENCY - // and the time it takes for the lwip to process it. future versions use queue, but could still fail when low on RAM - // - lwmqtt will fail when disconnected (already checked above) and *will* disconnect in case publish fails. - // ::publish() will wait for the puback, so we don't have to do it ourselves. not tested. - // - pubsub will fail when it can't buffer the payload *or* the underlying WiFiClient calls fail. also not tested. + // Receive acknowledgement from the broker before continuing. + // Usually a good idea in general, to avoid filling network buffers too quickly. + // + // Not needed with LWMQTT, as it is already handled and wrapped in Result + // Not supported by PubSubClient - if (res) { + if (sent) { flag = false; - mqttOnPublish(pid, [flag_ptr]() { - (*flag_ptr) = true; - }); + mqttOnPublish( + pid, + [flag_ptr]() { + (*flag_ptr) = true; + }); } #endif - const auto wait = res - ? DiscoveryTask::WaitShort - : DiscoveryTask::WaitLong; - - if (res || discovery->retry()) { - schedule(wait, discovery, flag_ptr); + if (sent.ok() || sent.retry()) { + schedule(sent.wait(), discovery, flag_ptr); return; } - restartDiscoveryForState(discovery->state()); + if (!sent) { + stop(); + } } } // namespace internal @@ -1078,17 +1341,12 @@ DiscoveryPtr makeDiscovery(State state) { } void scheduleDiscovery(duration::Milliseconds duration, DiscoveryPtr discovery) { - DEBUG_MSG_P(PSTR("[HA] Discovery scheduled in %zu (ms)\n"), duration.count()); + DEBUG_MSG_P(PSTR("[HA] Starting discovery\n")); internal::schedule(duration, discovery, std::make_shared(true)); } void scheduleDiscovery(DiscoveryPtr discovery) { - scheduleDiscovery(DiscoveryTask::WaitShort, discovery); -} - -void restartDiscoveryForState(State state) { - DEBUG_MSG_P(PSTR("[HA] Too many retries, restarting discovery\n")); - scheduleDiscovery(DiscoveryTask::WaitRestart, makeDiscovery(state)); + scheduleDiscovery(ShortDurations.front(), discovery); } void publishDiscoveryForState(State state) { diff --git a/code/espurna/sensor.cpp b/code/espurna/sensor.cpp index 8129cee2..a2fdb9cf 100644 --- a/code/espurna/sensor.cpp +++ b/code/espurna/sensor.cpp @@ -4313,6 +4313,10 @@ void configure_base() { energy::every(sensor::settings::saveEvery()); } +bool ready() { + return State::Reading == internal::state; +} + void configure() { configure_base(); configure_magnitudes(); @@ -4463,6 +4467,10 @@ espurna::sensor::Info magnitudeInfo(unsigned char index) { }; } +bool sensorReady() { + return espurna::sensor::ready(); +} + espurna::StringView sensorList() { return espurna::sensor::List; } diff --git a/code/espurna/sensor.h b/code/espurna/sensor.h index 5ab5d1c7..e9946e06 100644 --- a/code/espurna/sensor.h +++ b/code/espurna/sensor.h @@ -289,5 +289,7 @@ espurna::sensor::Value magnitudeReportValue(unsigned char index); using SensorWebSocketMagnitudesCallback = void(*)(JsonArray&, size_t); void sensorWebSocketMagnitudes(JsonObject& root, espurna::StringView prefix, SensorWebSocketMagnitudesCallback); +bool sensorReady(); + espurna::StringView sensorList(); void sensorSetup();