Browse Source

ha: wait for sensors and longer retries

do not publish until sensor are actually ready to be read
some refactoring of internal state to make ptr objects earlier
test/dev
Maxim Prokhorov 1 month ago
parent
commit
8c68a72c8f
3 changed files with 434 additions and 166 deletions
  1. +424
    -166
      code/espurna/homeassistant.cpp
  2. +8
    -0
      code/espurna/sensor.cpp
  3. +2
    -0
      code/espurna/sensor.h

+ 424
- 166
code/espurna/homeassistant.cpp View File

@ -179,12 +179,15 @@ struct ConfigStrings {
String prefix;
};
ConfigStrings make_config_strings() {
return ConfigStrings{
.name = normalize_ascii(systemHostname(), false),
.identifier = normalize_ascii(systemIdentifier(), true),
.prefix = settings::prefix(),
};
using ConfigStringsPtr = std::unique_ptr<ConfigStrings>;
ConfigStringsPtr make_config_strings() {
return ConfigStringsPtr(
new ConfigStrings{
.name = normalize_ascii(systemHostname(), false),
.identifier = normalize_ascii(systemIdentifier(), true),
.prefix = settings::prefix(),
});
}
// 'build-time' strings, always the same for current build
@ -194,17 +197,17 @@ struct BuildStrings {
String device;
};
BuildStrings make_build_strings() {
BuildStrings out;
using BuildStringsPtr = std::unique_ptr<BuildStrings>;
BuildStringsPtr make_build_strings() {
const auto app = buildApp();
out.version = String(app.version);
const auto hardware = buildHardware();
out.manufacturer = String(hardware.manufacturer);
out.device = String(hardware.device);
return out;
return BuildStringsPtr(
new BuildStrings{
.version = app.version.toString(),
.manufacturer = hardware.manufacturer.toString(),
.device = hardware.device.toString(),
});
}
class Device {
@ -229,9 +232,9 @@ public:
Device(Device&&) = delete;
Device& operator=(Device&&) = delete;
Device(ConfigStrings config, BuildStrings build) :
_config(std::make_unique<ConfigStrings>(std::move(config))),
_build(std::make_unique<BuildStrings>(std::move(build))),
Device(ConfigStringsPtr config, BuildStringsPtr build) :
_config(std::move(config)),
_build(std::move(build)),
_buffer(std::make_unique<Buffer>()),
_root(_buffer->createObject())
{
@ -262,10 +265,7 @@ public:
}
private:
using ConfigStringsPtr = std::unique_ptr<ConfigStrings>;
ConfigStringsPtr _config;
using BuildStringsPtr = std::unique_ptr<BuildStrings>;
BuildStringsPtr _build;
BufferPtr _buffer;
@ -339,8 +339,8 @@ String quote(String&& value) {
|| value.equalsIgnoreCase("true")
|| value.equalsIgnoreCase("false")
|| value.equalsIgnoreCase("on")
|| value.equalsIgnoreCase("off")
) {
|| value.equalsIgnoreCase("off"))
{
String result;
result.reserve(value.length() + 2);
result += '"';
@ -363,41 +363,34 @@ String quote(String&& value) {
class Discovery {
public:
virtual ~Discovery() {
}
virtual ~Discovery();
virtual bool ok() const = 0;
virtual const String& topic() = 0;
virtual const String& message() = 0;
virtual bool prepare();
virtual bool ready() const;
virtual bool next() = 0;
};
#if RELAY_SUPPORT
Discovery::~Discovery() = default;
struct RelayContext {
String availability;
String payload_available;
String payload_not_available;
String payload_on;
String payload_off;
};
bool Discovery::prepare() {
return true;
}
RelayContext makeRelayContext() {
return {
mqttTopic(MQTT_TOPIC_STATUS),
quote(mqttPayloadStatus(true)),
quote(mqttPayloadStatus(false)),
quote(relayPayload(PayloadStatus::On).toString()),
quote(relayPayload(PayloadStatus::Off).toString())
};
bool Discovery::ready() const {
return true;
}
#if RELAY_SUPPORT
class RelayDiscovery : public Discovery {
public:
explicit RelayDiscovery(Context& ctx) :
_ctx(ctx),
_relay(makeRelayContext()),
_relays(relayCount())
_ctx(ctx)
{}
JsonObject& root() {
@ -408,15 +401,21 @@ public:
return *_root;
}
bool ready() const override {
return _ready;
}
bool ok() const override {
return (_relays > 0)
&& (_index < _relays);
return _ready
&& (_count > 0)
&& (_index < _count);
}
const String& uniqueId() {
if (!_unique_id.length()) {
_unique_id = _ctx.identifier() + '_' + F("relay") + '_' + _index;
_unique_id = _ctx.identifier() + '_' + F("relay") + '_' + String(_index, 10);
}
return _unique_id;
}
@ -427,6 +426,7 @@ public:
_topic += uniqueId();
_topic += F("/config");
}
return _topic;
}
@ -434,25 +434,37 @@ public:
if (!_message.length()) {
auto& json = root();
json[F("dev")] = _ctx.device();
json[F("avty_t")] = _relay.availability.c_str();
json[F("pl_avail")] = _relay.payload_available.c_str();
json[F("pl_not_avail")] = _relay.payload_not_available.c_str();
json[F("pl_on")] = _relay.payload_on.c_str();
json[F("pl_off")] = _relay.payload_off.c_str();
json[F("avty_t")] = _info->availability.c_str();
json[F("pl_avail")] = _info->payload_available.c_str();
json[F("pl_not_avail")] = _info->payload_not_available.c_str();
json[F("pl_on")] = _info->payload_on.c_str();
json[F("pl_off")] = _info->payload_off.c_str();
json[F("uniq_id")] = uniqueId();
json[F("name")] = _ctx.name() + ' ' + _index;
json[F("name")] = _ctx.name() + ' ' + String(_index, 10);
json[F("stat_t")] = mqttTopic(MQTT_TOPIC_RELAY, _index);
json[F("cmd_t")] = mqttTopicSetter(MQTT_TOPIC_RELAY, _index);
json.printTo(_message);
}
return _message;
}
bool prepare() override {
if (!_ready) {
_count = relayCount();
_index = 0;
_info = _makeInfo();
_ready = true;
}
return _ready;
}
bool next() override {
if (_index < _relays) {
if (_index < _count) {
auto current = _index;
++_index;
if ((_index > current) && (_index < _relays)) {
if ((_index > current) && (_index < _count)) {
_unique_id = "";
_topic = "";
_message = "";
@ -464,18 +476,42 @@ public:
}
private:
struct Info {
String availability;
String payload_available;
String payload_not_available;
String payload_on;
String payload_off;
};
using InfoPtr = std::unique_ptr<Info>;
InfoPtr _makeInfo();
Context& _ctx;
JsonObject* _root { nullptr };
RelayContext _relay;
unsigned char _index { 0u };
unsigned char _relays { 0u };
InfoPtr _info;
size_t _index;
size_t _count;
bool _ready { false };
String _unique_id;
String _topic;
String _message;
};
RelayDiscovery::InfoPtr RelayDiscovery::_makeInfo() {
return InfoPtr(
new Info{
.availability = mqttTopic(MQTT_TOPIC_STATUS),
.payload_available = quote(mqttPayloadStatus(true)),
.payload_not_available = quote(mqttPayloadStatus(false)),
.payload_on = quote(relayPayload(PayloadStatus::On).toString()),
.payload_off = quote(relayPayload(PayloadStatus::Off).toString()),
});
}
#endif
// Example payload:
@ -512,8 +548,21 @@ public:
return *_root;
}
bool ready() const override {
return _ready;
}
bool prepare() override {
if (!_ready) {
_count = lightChannels();
_ready = true;
}
return _ready;
}
bool ok() const override {
return (lightChannels() > 0);
return _ready && (_count > 0);
}
bool next() override {
@ -611,6 +660,9 @@ private:
Context& _ctx;
JsonObject* _root { nullptr };
bool _ready { false };
size_t _count;
String _unique_id;
String _topic;
String _message;
@ -759,13 +811,8 @@ void receiveLightJson(StringView payload) {
class SensorDiscovery : public Discovery {
public:
explicit SensorDiscovery(Context& ctx) :
_ctx(ctx),
_magnitudes(magnitudeCount())
{
if (_magnitudes > 0) {
_info = magnitudeInfo(_index);
}
}
_ctx(ctx)
{}
JsonObject& root() {
if (!_root) {
@ -775,9 +822,14 @@ public:
return *_root;
}
bool ready() const override {
return _ready;
}
bool ok() const override {
return (_magnitudes > 0)
&& (_index < _magnitudes);
return _ready
&& (_count > 0)
&& (_index < _count);
}
const String& topic() override {
@ -815,8 +867,8 @@ public:
return _name;
}
unsigned char localId() const {
return _info.index;
String localId() const {
return String(_info.index, 10);
}
const String& uniqueId() {
@ -827,11 +879,29 @@ public:
return _unique_id;
}
bool prepare() override {
if (!_ready) {
_ready = sensorReady();
if (!_ready) {
return false;
}
_count = magnitudeCount();
_index = 0;
if (_count > 0) {
_info = magnitudeInfo(_index);
}
}
return _ready;
}
bool next() override {
if (_index < _magnitudes) {
if (_index < _count) {
auto current = _index;
++_index;
if ((_index > current) && (_index < _magnitudes)) {
if ((_index > current) && (_index < _count)) {
_info = magnitudeInfo(_index);
_unique_id = "";
_name = "";
@ -848,9 +918,10 @@ private:
Context& _ctx;
JsonObject* _root { nullptr };
unsigned char _magnitudes { 0u };
unsigned char _index { 0u };
sensor::Info _info;
size_t _count;
size_t _index;
bool _ready { false };
String _unique_id;
String _name;
@ -870,20 +941,164 @@ Context make_context() {
return Context(make_device_ptr(), 2048);
}
// Reworked discovery class. Try to send and wait for MQTT QoS 1 publish ACK to continue.
// use 5 retries and set a specific duration for each attempt
using Durations = std::array<duration::Milliseconds, 5>;
#if __cplusplus >= 201703L
#define __CONSTEXPR constexpr
#else
#define __CONSTEXPR
#endif
struct Wait {
using Value = typename Durations::value_type;
__CONSTEXPR Wait(const Durations& base) :
_begin(std::begin(base)),
_end(std::end(base)),
_it(_begin)
{}
void reset() {
_it = _begin;
}
Value value() const {
return *_it;
}
void change_next() {
_it = next_it(_it);
}
bool try_next() {
auto current = _it;
change_next();
return current != _it;
}
Value next() {
change_next();
return value();
}
__CONSTEXPR Value first() const {
return *_begin;
}
bool is_first() const {
return _it == _begin;
}
__CONSTEXPR Value last() const {
return *_end;
}
bool is_last() const {
return _it == _end;
}
__CONSTEXPR size_t count() const {
return _end - _begin;
}
private:
using Iterator = Durations::const_iterator;
Iterator next_it(Iterator value) {
const auto next = value + 1;
if (next < _end) {
return next;
}
return value;
}
Iterator _begin;
Iterator _end;
Iterator _it;
};
#undef __CONSTEXPR
// intervals between send attempts, usually long enough to push data to the network stack
static constexpr Durations ShortDurations{{
duration::Milliseconds{ 100 },
duration::Milliseconds{ 500 },
duration::Milliseconds{ 1000 },
duration::Milliseconds{ 2500 },
duration::Milliseconds{ 5000 },
}};
// longer intervals between initialization attempts, give enough time for external things
static constexpr Durations LongDurations{{
duration::Seconds{ 5 },
duration::Seconds{ 10 },
duration::Seconds{ 15 },
duration::Seconds{ 30 },
duration::Seconds{ 60 },
}};
struct Result {
using Duration = duration::Milliseconds;
enum class Value {
Error,
Ok,
Retry,
};
Result(Duration wait, Value value) :
_wait(wait),
_value(value)
{}
explicit Result(Duration wait) :
Result(wait, Value::Ok)
{}
Result() :
Result(ShortDurations.front())
{}
bool ok() const {
return _value == Value::Ok;
}
bool retry() const {
return _value == Value::Retry;
}
explicit operator bool() const {
return ok();
}
Duration wait() const {
return _wait;
}
private:
Duration _wait;
Value _value;
};
Result next_retry(Wait& wait) {
const auto retry = wait.is_last()
? Result::Value::Error
: Result::Value::Retry;
const auto value = wait.value();
wait.try_next();
return Result(value, retry);
}
// Topic and message are generated on demand and most of JSON payload is cached for re-use to save RAM.
class DiscoveryTask {
public:
using Entity = std::unique_ptr<Discovery>;
using Entities = std::forward_list<Entity>;
static constexpr auto WaitRestart = duration::Seconds{ 30 };
static constexpr auto WaitShort = duration::Milliseconds{ 100 };
static constexpr auto WaitLong = duration::Seconds{ 1 };
static constexpr int Retries { 5 };
DiscoveryTask() = delete;
DiscoveryTask(const DiscoveryTask&) = delete;
@ -893,8 +1108,8 @@ public:
DiscoveryTask& operator=(DiscoveryTask&&) = delete;
DiscoveryTask(Context ctx, State state) :
_state(state),
_ctx(std::move(ctx))
_ctx(std::move(ctx)),
_state(state)
{}
void add(Entity&& entity) {
@ -906,12 +1121,8 @@ public:
_entities.push_front(std::make_unique<T>(_ctx));
}
bool retry() {
if (_retry < 0) {
return false;
}
return (--_retry > 0);
Result retry_send() {
return next_retry(_wait_short);
}
Context& context() {
@ -919,75 +1130,108 @@ public:
}
bool done() const {
return (_retry < 0) || _entities.empty();
return _entities.empty();
}
bool ok() const {
if (!done()) {
for (auto& entity : _entities) {
if (!entity->ok()) {
return false;
}
}
State state() const {
return _state;
}
return true;
}
bool ok() const;
return false;
template <typename T>
Result try_send_one(T&& action);
Result prepare_all();
private:
Result next_send() {
_wait_short.reset();
return Result(_wait_short.value());
}
template <typename T>
bool send(T&& action) {
while (!_entities.empty()) {
auto& entity = _entities.front();
Result stop_sending() {
return Result(
_wait_short.first(),
Result::Value::Error);
}
Context _ctx;
State _state;
Entities _entities;
Wait _wait_short { ShortDurations };
Wait _wait_long { LongDurations };
};
bool DiscoveryTask::ok() const {
if (!done()) {
for (auto& entity : _entities) {
if (!entity->ok()) {
_entities.pop_front();
_ctx.reset();
continue;
return false;
}
}
const auto* topic = entity->topic().c_str();
const auto* msg = (State::Enabled == _state)
? entity->message().c_str()
: "";
return true;
}
if (action(topic, msg)) {
if (!entity->next()) {
_retry = Retries;
_entities.pop_front();
_ctx.reset();
}
return true;
}
return false;
}
return false;
Result DiscoveryTask::prepare_all() {
bool prepared { true };
for (auto& entity : _entities) {
if (!entity->prepare()) {
prepared = false;
break;
}
return false;
}
State state() const {
return _state;
if (!prepared) {
return next_retry(_wait_long);
}
private:
int _retry { Retries };
State _state;
return Result();
}
Entities _entities;
Context _ctx;
};
template <typename T>
Result DiscoveryTask::try_send_one(T&& action) {
auto it = _entities.begin();
constexpr duration::Seconds DiscoveryTask::WaitRestart;
while (it != _entities.end()) {
if (!(*it)->ok()) {
it = _entities.erase_after(
_entities.before_begin());
_ctx.reset();
continue;
}
constexpr duration::Milliseconds DiscoveryTask::WaitShort;
constexpr duration::Seconds DiscoveryTask::WaitLong;
const auto* topic = (*it)->topic().c_str();
const auto* msg = (State::Enabled == _state)
? (*it)->message().c_str()
: "";
if (action(topic, msg)) {
if (!(*it)->next()) {
it = _entities.erase_after(
_entities.before_begin());
_ctx.reset();
}
return next_send();
}
return retry_send();
}
return stop_sending();
}
using DiscoveryPtr = std::shared_ptr<DiscoveryTask>;
using FlagPtr = std::shared_ptr<bool>;
DiscoveryPtr makeDiscovery(State);
void restartDiscoveryForState(State);
namespace internal {
@ -1008,54 +1252,73 @@ void schedule(duration::Milliseconds wait, DiscoveryPtr ptr, FlagPtr flag_ptr) {
});
}
void stop() {
DEBUG_MSG_P(PSTR("[HA] Stopping discovery\n"));
internal::task.stop();
}
void send(DiscoveryPtr discovery, FlagPtr flag_ptr) {
if (!mqttConnected() || discovery->done()) {
DEBUG_MSG_P(PSTR("[HA] Stopping discovery\n"));
internal::task.stop();
stop();
return;
}
auto ready = discovery->prepare_all();
if (!ready) {
if (ready.retry()) {
DEBUG_MSG_P(PSTR("[HA] Discovery not ready, retrying in %zu (ms)\n"),
ready.wait().count());
schedule(ready.wait(), discovery, flag_ptr);
} else {
stop();
}
return;
}
auto& flag = *flag_ptr;
if (!flag) {
if (discovery->retry()) {
schedule(DiscoveryTask::WaitShort, discovery, flag_ptr);
const auto next_send = discovery->retry_send();
if (next_send.retry()) {
schedule(next_send.wait(), discovery, flag_ptr);
} else {
restartDiscoveryForState(discovery->state());
stop();
}
return;
}
uint16_t pid { 0u };
const auto res = discovery->send([&](const char* topic, const char* message) {
pid = ::mqttSendRaw(topic, message, internal::retain, 1);
return pid > 0;
});
const auto sent = discovery->try_send_one(
[&](const char* topic, const char* message) {
pid = ::mqttSendRaw(topic, message, internal::retain, 1);
return pid > 0;
});
#if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
// - async fails when disconneted and when it's buffers are filled, which should be resolved after $LATENCY
// and the time it takes for the lwip to process it. future versions use queue, but could still fail when low on RAM
// - lwmqtt will fail when disconnected (already checked above) and *will* disconnect in case publish fails.
// ::publish() will wait for the puback, so we don't have to do it ourselves. not tested.
// - pubsub will fail when it can't buffer the payload *or* the underlying WiFiClient calls fail. also not tested.
// Receive acknowledgement from the broker before continuing.
// Usually a good idea in general, to avoid filling network buffers too quickly.
//
// Not needed with LWMQTT, as it is already handled and wrapped in Result
// Not supported by PubSubClient
if (res) {
if (sent) {
flag = false;
mqttOnPublish(pid, [flag_ptr]() {
(*flag_ptr) = true;
});
mqttOnPublish(
pid,
[flag_ptr]() {
(*flag_ptr) = true;
});
}
#endif
const auto wait = res
? DiscoveryTask::WaitShort
: DiscoveryTask::WaitLong;
if (res || discovery->retry()) {
schedule(wait, discovery, flag_ptr);
if (sent.ok() || sent.retry()) {
schedule(sent.wait(), discovery, flag_ptr);
return;
}
restartDiscoveryForState(discovery->state());
if (!sent) {
stop();
}
}
} // namespace internal
@ -1078,17 +1341,12 @@ DiscoveryPtr makeDiscovery(State state) {
}
void scheduleDiscovery(duration::Milliseconds duration, DiscoveryPtr discovery) {
DEBUG_MSG_P(PSTR("[HA] Discovery scheduled in %zu (ms)\n"), duration.count());
DEBUG_MSG_P(PSTR("[HA] Starting discovery\n"));
internal::schedule(duration, discovery, std::make_shared<bool>(true));
}
void scheduleDiscovery(DiscoveryPtr discovery) {
scheduleDiscovery(DiscoveryTask::WaitShort, discovery);
}
void restartDiscoveryForState(State state) {
DEBUG_MSG_P(PSTR("[HA] Too many retries, restarting discovery\n"));
scheduleDiscovery(DiscoveryTask::WaitRestart, makeDiscovery(state));
scheduleDiscovery(ShortDurations.front(), discovery);
}
void publishDiscoveryForState(State state) {


+ 8
- 0
code/espurna/sensor.cpp View File

@ -4313,6 +4313,10 @@ void configure_base() {
energy::every(sensor::settings::saveEvery());
}
bool ready() {
return State::Reading == internal::state;
}
void configure() {
configure_base();
configure_magnitudes();
@ -4463,6 +4467,10 @@ espurna::sensor::Info magnitudeInfo(unsigned char index) {
};
}
bool sensorReady() {
return espurna::sensor::ready();
}
espurna::StringView sensorList() {
return espurna::sensor::List;
}


+ 2
- 0
code/espurna/sensor.h View File

@ -289,5 +289,7 @@ espurna::sensor::Value magnitudeReportValue(unsigned char index);
using SensorWebSocketMagnitudesCallback = void(*)(JsonArray&, size_t);
void sensorWebSocketMagnitudes(JsonObject& root, espurna::StringView prefix, SensorWebSocketMagnitudesCallback);
bool sensorReady();
espurna::StringView sensorList();
void sensorSetup();

Loading…
Cancel
Save