Browse Source

Safer settings reload and MQTT change detection (#1701)

* MQTT: config change detection
* Reload settings when config json is uploaded
* Apply only new settings
* Finish config early when not enabled
* Reuse existing buffers from getSetting String using std::move
master
Max Prokhorov 5 years ago
committed by GitHub
parent
commit
334b499024
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 99 deletions
  1. +11
    -0
      code/espurna/espurna.ino
  2. +124
    -87
      code/espurna/mqtt.ino
  3. +0
    -12
      code/espurna/ws.ino

+ 11
- 0
code/espurna/espurna.ino View File

@ -25,6 +25,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
std::vector<void (*)()> _loop_callbacks; std::vector<void (*)()> _loop_callbacks;
std::vector<void (*)()> _reload_callbacks; std::vector<void (*)()> _reload_callbacks;
bool _reload_config = false;
unsigned long _loop_delay = 0; unsigned long _loop_delay = 0;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -40,6 +41,10 @@ void espurnaRegisterReload(void (*callback)()) {
} }
void espurnaReload() { void espurnaReload() {
_reload_config = true;
}
void _espurnaReload() {
for (unsigned char i = 0; i < _reload_callbacks.size(); i++) { for (unsigned char i = 0; i < _reload_callbacks.size(); i++) {
(_reload_callbacks[i])(); (_reload_callbacks[i])();
} }
@ -228,6 +233,12 @@ void setup() {
void loop() { void loop() {
// Reload config before running any callbacks
if (_reload_config) {
_espurnaReload();
_reload_config = false;
}
// Call registered loop callbacks // Call registered loop callbacks
for (unsigned char i = 0; i < _loop_callbacks.size(); i++) { for (unsigned char i = 0; i < _loop_callbacks.size(); i++) {
(_loop_callbacks[i])(); (_loop_callbacks[i])();


+ 124
- 87
code/espurna/mqtt.ino View File

@ -13,6 +13,7 @@ Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
#include <ESP8266mDNS.h> #include <ESP8266mDNS.h>
#include <ArduinoJson.h> #include <ArduinoJson.h>
#include <vector> #include <vector>
#include <utility>
#include <Ticker.h> #include <Ticker.h>
#if MQTT_USE_ASYNC // Using AsyncMqttClient #if MQTT_USE_ASYNC // Using AsyncMqttClient
@ -46,10 +47,12 @@ String _mqtt_topic_json;
String _mqtt_setter; String _mqtt_setter;
String _mqtt_getter; String _mqtt_getter;
bool _mqtt_forward; bool _mqtt_forward;
char *_mqtt_user = 0;
char *_mqtt_pass = 0;
char *_mqtt_will;
char *_mqtt_clientid;
String _mqtt_user;
String _mqtt_pass;
String _mqtt_will;
String _mqtt_server;
uint16_t _mqtt_port;
String _mqtt_clientid;
std::vector<mqtt_callback_f> _mqtt_callbacks; std::vector<mqtt_callback_f> _mqtt_callbacks;
@ -82,41 +85,23 @@ void _mqttConnect() {
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX; _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
} }
String h = getSetting("mqttServer", MQTT_SERVER);
#if MDNS_CLIENT_SUPPORT #if MDNS_CLIENT_SUPPORT
h = mdnsResolve(h);
_mqtt_server = mdnsResolve(_mqtt_server);
#endif #endif
char * host = strdup(h.c_str());
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
if (_mqtt_user) free(_mqtt_user);
if (_mqtt_pass) free(_mqtt_pass);
if (_mqtt_will) free(_mqtt_will);
if (_mqtt_clientid) free(_mqtt_clientid);
String user = getSetting("mqttUser", MQTT_USER);
_mqttPlaceholders(&user);
_mqtt_user = strdup(user.c_str());
_mqtt_pass = strdup(getSetting("mqttPassword", MQTT_PASS).c_str());
_mqtt_will = strdup(mqttTopic(MQTT_TOPIC_STATUS, false).c_str());
String clientid = getSetting("mqttClientID", getIdentifier());
_mqttPlaceholders(&clientid);
_mqtt_clientid = strdup(clientid.c_str());
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%u\n"), _mqtt_server.c_str(), _mqtt_port);
#if MQTT_USE_ASYNC #if MQTT_USE_ASYNC
_mqtt_connecting = true; _mqtt_connecting = true;
_mqtt.setServer(host, port);
_mqtt.setClientId(_mqtt_clientid);
_mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
_mqtt.setClientId(_mqtt_clientid.c_str());
_mqtt.setKeepAlive(_mqtt_keepalive); _mqtt.setKeepAlive(_mqtt_keepalive);
_mqtt.setCleanSession(false); _mqtt.setCleanSession(false);
_mqtt.setWill(_mqtt_will, _mqtt_qos, _mqtt_retain, "0");
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
_mqtt.setCredentials(_mqtt_user, _mqtt_pass);
_mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, "0");
if (_mqtt_user.length() && _mqtt_pass.length()) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
_mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
} }
#if ASYNC_TCP_SSL_ENABLED #if ASYNC_TCP_SSL_ENABLED
@ -135,11 +120,11 @@ void _mqttConnect() {
#endif // ASYNC_TCP_SSL_ENABLED #endif // ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid);
DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos); DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0); DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive); DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
_mqtt.connect(); _mqtt.connect();
@ -152,10 +137,10 @@ void _mqttConnect() {
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
if (secure) { if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
if (_mqtt_client_secure.connect(host, port)) {
if (_mqtt_client_secure.connect(_mqtt_server.c_str(), _mqtt_port)) {
char fp[60] = {0}; char fp[60] = {0};
if (sslFingerPrintChar(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { if (sslFingerPrintChar(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
if (_mqtt_client_secure.verify(fp, host)) {
if (_mqtt_client_secure.verify(fp, _mqtt_server.c_str())) {
_mqtt.setClient(_mqtt_client_secure); _mqtt.setClient(_mqtt_client_secure);
} else { } else {
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n")); DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
@ -184,20 +169,20 @@ void _mqttConnect() {
if (response) { if (response) {
_mqtt.setServer(host, port);
_mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
response = _mqtt.connect(_mqtt_clientid, _mqtt_user, _mqtt_pass, _mqtt_will, _mqtt_qos, _mqtt_retain, "0");
if (_mqtt_user.length() && _mqtt_pass.length()) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
response = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, "0");
} else { } else {
response = _mqtt.connect(_mqtt_clientid, _mqtt_will, _mqtt_qos, _mqtt_retain, "0");
response = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, "0");
} }
DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid);
DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos); DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0); DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive); DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
} }
@ -210,50 +195,114 @@ void _mqttConnect() {
#endif // MQTT_USE_ASYNC #endif // MQTT_USE_ASYNC
free(host);
} }
void _mqttPlaceholders(String *text) {
text->replace("{hostname}", getSetting("hostname"));
text->replace("{magnitude}", "#");
void _mqttPlaceholders(String& text) {
text.replace("{hostname}", getSetting("hostname"));
text.replace("{magnitude}", "#");
String mac = WiFi.macAddress(); String mac = WiFi.macAddress();
mac.replace(":", ""); mac.replace(":", "");
text->replace("{mac}", mac);
text.replace("{mac}", mac);
} }
template<typename T>
void _mqttApplySetting(T& current, T& updated) {
if (current != updated) {
current = std::move(updated);
mqttDisconnect();
}
}
template<typename T>
void _mqttApplySetting(T& current, const T& updated) {
if (current != updated) {
current = updated;
mqttDisconnect();
}
}
template<typename T>
void _mqttApplyTopic(T& current, const char* magnitude) {
String updated = mqttTopic(magnitude, false);
if (current != updated) {
mqttFlush();
current = std::move(updated);
}
}
void _mqttConfigure() { void _mqttConfigure() {
// Get base topic
_mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC);
if (_mqtt_topic.endsWith("/")) _mqtt_topic.remove(_mqtt_topic.length()-1);
// Enable only when server is set
{
String server = getSetting("mqttServer", MQTT_SERVER);
uint16_t port = getSetting("mqttPort", MQTT_PORT).toInt();
bool enabled = false;
if (server.length()) {
enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
}
_mqttApplySetting(_mqtt_server, server);
_mqttApplySetting(_mqtt_enabled, enabled);
_mqttApplySetting(_mqtt_port, port);
if (!enabled) return;
}
// Get base topic and apply placeholders
{
String topic = getSetting("mqttTopic", MQTT_TOPIC);
if (topic.endsWith("/")) topic.remove(_mqtt_topic.length()-1);
// Placeholders
_mqttPlaceholders(&_mqtt_topic);
if (_mqtt_topic.indexOf("#") == -1) _mqtt_topic = _mqtt_topic + "/#";
// Replace things inside curly braces (like {hostname}, {mac} etc.)
_mqttPlaceholders(topic);
// Getters and setters
_mqtt_setter = getSetting("mqttSetter", MQTT_SETTER);
_mqtt_getter = getSetting("mqttGetter", MQTT_GETTER);
_mqtt_forward = !_mqtt_getter.equals(_mqtt_setter) && RELAY_REPORT_STATUS;
if (topic.indexOf("#") == -1) topic.concat("/#");
_mqttApplySetting(_mqtt_topic, topic);
_mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
}
// Getter and setter
{
String setter = getSetting("mqttSetter", MQTT_SETTER);
String getter = getSetting("mqttGetter", MQTT_GETTER);
bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS;
_mqttApplySetting(_mqtt_setter, setter);
_mqttApplySetting(_mqtt_getter, getter);
_mqttApplySetting(_mqtt_forward, forward);
}
// MQTT options // MQTT options
_mqtt_qos = getSetting("mqttQoS", MQTT_QOS).toInt();
_mqtt_retain = getSetting("mqttRetain", MQTT_RETAIN).toInt() == 1;
_mqtt_keepalive = getSetting("mqttKeep", MQTT_KEEPALIVE).toInt();
if (getSetting("mqttClientID").length() == 0) delSetting("mqttClientID");
// Enable
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
mqttEnabled(false);
} else {
_mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
{
String user = getSetting("mqttUser", MQTT_USER);
_mqttPlaceholders(user);
String pass = getSetting("mqttPassword", MQTT_PASS);
unsigned char qos = getSetting("mqttQoS", MQTT_QOS).toInt();
bool retain = getSetting("mqttRetain", MQTT_RETAIN).toInt() == 1;
unsigned long keepalive = getSetting("mqttKeep", MQTT_KEEPALIVE).toInt();
String id = getSetting("mqttClientID", getIdentifier());
_mqttPlaceholders(id);
_mqttApplySetting(_mqtt_user, user);
_mqttApplySetting(_mqtt_pass, pass);
_mqttApplySetting(_mqtt_qos, qos);
_mqttApplySetting(_mqtt_retain, retain);
_mqttApplySetting(_mqtt_keepalive, keepalive);
_mqttApplySetting(_mqtt_clientid, id);
}
// MQTT JSON
{
_mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
_mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
} }
_mqtt_use_json = (getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
mqttQueueTopic(MQTT_TOPIC_JSON);
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
@ -491,9 +540,6 @@ void mqttSend(const char * topic, const char * message, bool force, bool retain)
// Equeue message // Equeue message
if (useJson) { if (useJson) {
// Set default queue topic
mqttQueueTopic(MQTT_TOPIC_JSON);
// Enqueue new message // Enqueue new message
mqttEnqueue(topic, message); mqttEnqueue(topic, message);
@ -608,14 +654,6 @@ void mqttFlush() {
} }
void mqttQueueTopic(const char * topic) {
String t = mqttTopic(topic, false);
if (!t.equals(_mqtt_topic_json)) {
mqttFlush();
_mqtt_topic_json = t;
}
}
int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) { int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) {
// Queue is not meant to send message "offline" // Queue is not meant to send message "offline"
@ -709,7 +747,11 @@ void mqttRegister(mqtt_callback_f callback) {
void mqttSetBroker(IPAddress ip, unsigned int port) { void mqttSetBroker(IPAddress ip, unsigned int port) {
setSetting("mqttServer", ip.toString()); setSetting("mqttServer", ip.toString());
_mqtt_server = ip.toString();
setSetting("mqttPort", port); setSetting("mqttPort", port);
_mqtt_port = port;
mqttEnabled(MQTT_AUTOCONNECT); mqttEnabled(MQTT_AUTOCONNECT);
} }
@ -717,11 +759,6 @@ void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) mqttSetBroker(ip, port); if (getSetting("mqttServer", MQTT_SERVER).length() == 0) mqttSetBroker(ip, port);
} }
void mqttReset() {
_mqttConfigure();
mqttDisconnect();
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Initialization // Initialization
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------


+ 0
- 12
code/espurna/ws.ino View File

@ -212,9 +212,6 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
String adminPass; String adminPass;
bool save = false; bool save = false;
#if MQTT_SUPPORT
bool changedMQTT = false;
#endif
for (auto kv: config) { for (auto kv: config) {
@ -263,9 +260,6 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
// Update flags if value has changed // Update flags if value has changed
if (changed) { if (changed) {
save = true; save = true;
#if MQTT_SUPPORT
if (key.startsWith("mqtt")) changedMQTT = true;
#endif
} }
} }
@ -276,12 +270,6 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
// Callbacks // Callbacks
espurnaReload(); espurnaReload();
// This should got to callback as well
// but first change management has to be in place
#if MQTT_SUPPORT
if (changedMQTT) mqttReset();
#endif
// Persist settings // Persist settings
saveSettings(); saveSettings();


Loading…
Cancel
Save