Browse Source

rpn: rfbridge operators and mqtt fixes (#2302)

- cache received rfbridge codes in the internal list, allow to operate on it via the rpn operators
- add `<N> <proto> <code> rfb_match`, matching when we receive specified protocol + code string at least N times
- add `<proto> <code> <proto> <code> rfb_sequence`, checking if specified protocol + code pairs happen in sequence
- add `<TIME> <N> <proto> <code> rfb_match_wait` - similar to `rfb_match`, but waiting for at least `TIME` (ms) via oneshot runner
- add `<proto> <code> rfb_info`, pushes code's latest timestamp and it's counter on the stack
- add `<proto> <code> rfb_pop`, which removes the specified protocol + code from the internal cache
- fix MQTT skip setting making RPN variables absent on initial connection
- default to no skip when receiving MQTT
(small issue still stands with us having non-clean MQTT session, broker will persist variable subscriptions even after unsubscribe event)
mcspr-patch-1
Max Prokhorov 4 years ago
committed by GitHub
parent
commit
1f9479b943
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 246 additions and 22 deletions
  1. +1
    -5
      code/espurna/config/general.h
  2. +16
    -13
      code/espurna/mqtt.cpp
  3. +229
    -4
      code/espurna/rpnrules.cpp

+ 1
- 5
code/espurna/config/general.h View File

@ -1098,12 +1098,8 @@
#endif
#ifndef MQTT_SKIP_RETAINED
#define MQTT_SKIP_RETAINED 1 // Skip retained messages on connection
#endif
#ifndef MQTT_SKIP_TIME
#define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection
#define MQTT_SKIP_TIME 0 // Skip messages for N ms after connection. Disabled by default
#endif
#ifndef MQTT_USE_JSON


+ 16
- 13
code/espurna/mqtt.cpp View File

@ -70,12 +70,13 @@ Updated secure client support by Niek van der Maas < mail at niekvandermaas dot
#endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
unsigned long _mqtt_last_connection = 0;
AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
bool _mqtt_retain_skipped = false;
bool _mqtt_skip_messages = false;
unsigned long _mqtt_skip_time = MQTT_SKIP_TIME;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
bool _mqtt_enabled = MQTT_ENABLED;
bool _mqtt_use_json = false;
bool _mqtt_retain = MQTT_RETAIN;
int _mqtt_qos = MQTT_QOS;
int _mqtt_keepalive = MQTT_KEEPALIVE;
@ -343,6 +344,9 @@ void _mqttConfigure() {
_mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
}
// Skip messages in a small window right after the connection
_mqtt_skip_time = getSetting("mqttSkipTime", MQTT_SKIP_TIME);
// Custom payload strings
settingsProcessConfig({
{_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE},
@ -433,6 +437,7 @@ void _mqttInfo() {
);
}
}
}
// -----------------------------------------------------------------------------
@ -535,7 +540,6 @@ void _mqttOnConnect() {
_mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Connected;
_mqtt_retain_skipped = false;
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
@ -554,7 +558,6 @@ void _mqttOnDisconnect() {
// Reset reconnection delay
_mqtt_last_connection = millis();
_mqtt_state = AsyncClientState::Disconnected;
_mqtt_retain_skipped = false;
DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
@ -568,14 +571,12 @@ void _mqttOnDisconnect() {
// Force-skip everything received in a short window right after connecting to avoid syncronization issues.
bool _mqttMaybeSkipRetained(char* topic) {
#if MQTT_SKIP_RETAINED
if (!_mqtt_retain_skipped && (millis() - _mqtt_last_connection < MQTT_SKIP_TIME)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}
#endif
if (_mqtt_skip_messages && (millis() - _mqtt_last_connection < _mqtt_skip_time)) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
return true;
}
_mqtt_retain_skipped = true;
_mqtt_skip_messages = false;
return false;
}
@ -1006,6 +1007,8 @@ void _mqttConnect() {
_mqtt_state = AsyncClientState::Connecting;
_mqtt_skip_messages = (_mqtt_skip_time > 0);
#if SECURE_CLIENT != SECURE_CLIENT_NONE
const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
#else


+ 229
- 4
code/espurna/rpnrules.cpp View File

@ -16,10 +16,13 @@ Copyright (C) 2019 by Xose Pérez <xose dot perez at gmail dot com>
#include "relay.h"
#include "rpc.h"
#include "sensor.h"
#include "rfbridge.h"
#include "terminal.h"
#include "ws.h"
#include <list>
#include <vector>
// -----------------------------------------------------------------------------
// Custom commands
// -----------------------------------------------------------------------------
@ -236,6 +239,214 @@ rpn_error _rpnRelayStatus(rpn_context & ctxt, bool force) {
#endif // RELAY_SUPPORT
#if RFB_SUPPORT
struct rpn_rfbridge_code {
unsigned char protocol;
String raw;
size_t count;
decltype(millis()) last;
};
// TODO: in theory, we could do with forward_list. however, this would require a more complicated removal process,
// as we would no longer know the previous element and would need to track 2 elements at a time
static std::list<rpn_rfbridge_code> _rfb_codes;
static uint32_t _rfb_code_repeat_window;
static uint32_t _rfb_code_stale_delay;
static uint32_t _rfb_code_match_window;
struct rpn_rfbridge_match {
unsigned char protocol;
String raw;
};
rpn_error _rpnRfbSequence(rpn_context& ctxt) {
auto raw_second = rpn_stack_pop(ctxt);
auto proto_second = rpn_stack_pop(ctxt);
auto raw_first = rpn_stack_pop(ctxt);
auto proto_first = rpn_stack_pop(ctxt);
// find 2 codes in the same order and save pointers
rpn_rfbridge_match match[2] {
{static_cast<unsigned char>(proto_first.toUint()), raw_first.toString()},
{static_cast<unsigned char>(proto_second.toUint()), raw_second.toString()}
};
rpn_rfbridge_code* refs[2] {nullptr, nullptr};
for (auto& recent : _rfb_codes) {
if ((refs[0] != nullptr) && (refs[1] != nullptr)) {
break;
}
for (int index = 0; index < 2; ++index) {
if ((refs[index] == nullptr)
&& (match[index].protocol == recent.protocol)
&& (match[index].raw == recent.raw)) {
refs[index] = &recent;
}
}
}
if ((refs[0] == nullptr) || (refs[1] == nullptr)) {
return rpn_operator_error::CannotContinue;
}
// purge codes to avoid matching again on the next rules run
if ((millis() - refs[0]->last) > (millis() - refs[1]->last)) {
_rfb_codes.remove_if([&refs](rpn_rfbridge_code& code) {
return (refs[0] == &code) || (refs[1] == &code);
});
return rpn_operator_error::Ok;
}
return rpn_operator_error::CannotContinue;
}
decltype(_rfb_codes)::iterator _rpnRfbFindCode(unsigned char protocol, const String& match) {
return std::find_if(_rfb_codes.begin(), _rfb_codes.end(), [protocol, &match](const rpn_rfbridge_code& code) {
return (code.protocol == protocol) && (code.raw == match);
});
}
rpn_error _rpnRfbPop(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}
rpn_error _rpnRfbInfo(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}
rpn_stack_push(ctxt, rpn_value(
static_cast<rpn_uint>((*result).count)));
rpn_stack_push(ctxt, rpn_value(
static_cast<rpn_uint>((*result).last)));
return rpn_operator_error::Ok;
}
rpn_error _rpnRfbWaitMatch(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto count = rpn_stack_pop(ctxt);
auto time = rpn_stack_pop(ctxt);
auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}
if ((*result).count < count.toUint()) {
return rpn_operator_error::CannotContinue;
}
// purge code to avoid matching again on the next rules run
if (rpn_operator_error::Ok == _rpnRunnerHandler(ctxt, RpnRunner::Policy::OneShot, time.toUint())) {
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}
return rpn_operator_error::CannotContinue;
}
rpn_error _rpnRfbMatcher(rpn_context& ctxt) {
auto code = rpn_stack_pop(ctxt);
auto proto = rpn_stack_pop(ctxt);
auto count = rpn_stack_pop(ctxt);
auto result = _rpnRfbFindCode(proto.toUint(), code.toString());
if (result == _rfb_codes.end()) {
return rpn_operator_error::CannotContinue;
}
// only process recent codes, ignore when rule is processing outside of this small window
if (millis() - (*result).last >= _rfb_code_match_window) {
return rpn_operator_error::CannotContinue;
}
// purge code to avoid matching again on the next rules run
if ((*result).count == count.toUint()) {
_rfb_codes.erase(result);
return rpn_operator_error::Ok;
}
return rpn_operator_error::CannotContinue;
}
void _rpnBrokerRfbridgeCallback(unsigned char protocol, const char* raw_code) {
// remove really old codes that we have not seen in a while to avoid memory exhaustion
auto ts = millis();
auto old = std::remove_if(_rfb_codes.begin(), _rfb_codes.end(), [ts](rpn_rfbridge_code& code) {
return (ts - code.last) >= _rfb_code_stale_delay;
});
if (old != _rfb_codes.end()) {
_rfb_codes.erase(old, _rfb_codes.end());
}
auto result = _rpnRfbFindCode(protocol, raw_code);
if (result != _rfb_codes.end()) {
// we also need to reset the counter at a certain point to allow next batch of repeats to go through
if (millis() - (*result).last >= _rfb_code_repeat_window) {
(*result).count = 0;
}
(*result).last = millis();
(*result).count += 1u;
} else {
_rfb_codes.push_back({protocol, raw_code, 1u, millis()});
}
_rpn_run = true;
}
void _rpnRfbSetup() {
// - Repeat window is an arbitrary time, just about 3-4 more times it takes for
// a code to be sent again when holding a generic remote button
// Code counter is reset to 0 when outside of the window.
// - Stale delay allows broker callback to remove really old codes.
// (TODO: can this happen in loop() cb instead?)
_rfb_code_repeat_window = getSetting("rfbRepeatWindow", 2000ul);
_rfb_code_match_window = getSetting("rfbMatchWindow", 2000ul);
_rfb_code_stale_delay = getSetting("rfbStaleDelay", 10000ul);
#if TERMINAL_SUPPORT
terminalRegisterCommand(F("RFB.CODES"), [](const terminal::CommandContext& ctx) {
for (auto& code : _rfb_codes) {
char buffer[128] = {0};
snprintf_P(buffer, sizeof(buffer),
PSTR("\"%s\" proto=%u count=%u last=%u"),
code.protocol,
code.raw.c_str(),
code.count,
code.last
);
ctx.output.println(buffer);
}
});
#endif
// Main bulk of the processing goes on in here
RfbridgeBroker::Register(_rpnBrokerRfbridgeCallback);
}
#endif // RFB_SUPPORT
void _rpnShowStack(Print& print) {
print.println(F("Stack:"));
@ -369,6 +580,14 @@ void _rpnInit() {
#endif
#if RFB_SUPPORT
rpn_operator_set(_rpn_ctxt, "rfb_pop", 2, _rpnRfbPop);
rpn_operator_set(_rpn_ctxt, "rfb_info", 2, _rpnRfbInfo);
rpn_operator_set(_rpn_ctxt, "rfb_sequence", 4, _rpnRfbSequence);
rpn_operator_set(_rpn_ctxt, "rfb_match", 3, _rpnRfbMatcher);
rpn_operator_set(_rpn_ctxt, "rfb_match_wait", 4, _rpnRfbWaitMatch);
#endif
#if MQTT_SUPPORT
rpn_operator_set(_rpn_ctxt, "mqtt_send", 2, [](rpn_context & ctxt) -> rpn_error {
rpn_value message;
@ -384,10 +603,12 @@ void _rpnInit() {
#endif
// Some debugging. Dump stack contents
rpn_operator_set(_rpn_ctxt, "showstack", 0, [](rpn_context & ctxt) -> rpn_error {
_rpnShowStack(terminalDefaultStream());
return 0;
});
#if TERMINAL_SUPPORT
rpn_operator_set(_rpn_ctxt, "showstack", 0, [](rpn_context & ctxt) -> rpn_error {
_rpnShowStack(terminalDefaultStream());
return 0;
});
#endif
// And, simple string logging
#if DEBUG_SUPPORT
@ -591,6 +812,10 @@ void rpnSetup() {
StatusBroker::Register(_rpnBrokerStatus);
#if RFB_SUPPORT
_rpnRfbSetup();
#endif
#if SENSOR_SUPPORT
SensorReadBroker::Register(_rpnBrokerCallback);
#endif


Loading…
Cancel
Save