From a716633c1ba366177a2ef06bcc54e1a9e236fb21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xose=20P=C3=A9rez?= Date: Sat, 13 Jan 2018 15:34:42 +0100 Subject: [PATCH] Refactor MQTT module --- code/espurna/homeassitant.ino | 20 +- code/espurna/mqtt.ino | 509 +++++++++++++++++----------------- code/espurna/ws.ino | 5 +- 3 files changed, 272 insertions(+), 262 deletions(-) diff --git a/code/espurna/homeassitant.ino b/code/espurna/homeassitant.ino index e14d61a3..b440d075 100644 --- a/code/espurna/homeassitant.ino +++ b/code/espurna/homeassitant.ino @@ -41,11 +41,11 @@ void _haSend() { root["platform"] = "mqtt"; if (relayCount()) { - root["state_topic"] = getTopic(MQTT_TOPIC_RELAY, 0, false); - root["command_topic"] = getTopic(MQTT_TOPIC_RELAY, 0, true); + root["state_topic"] = mqttGetTopic(MQTT_TOPIC_RELAY, 0, false); + root["command_topic"] = mqttGetTopic(MQTT_TOPIC_RELAY, 0, true); root["payload_on"] = String("1"); root["payload_off"] = String("0"); - root["availability_topic"] = getTopic(MQTT_TOPIC_STATUS, false); + root["availability_topic"] = mqttGetTopic(MQTT_TOPIC_STATUS, false); root["payload_available"] = String("1"); root["payload_not_available"] = String("0"); } @@ -53,16 +53,16 @@ void _haSend() { #if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE if (lightHasColor()) { - root["brightness_state_topic"] = getTopic(MQTT_TOPIC_BRIGHTNESS, false); - root["brightness_command_topic"] = getTopic(MQTT_TOPIC_BRIGHTNESS, true); - root["rgb_state_topic"] = getTopic(MQTT_TOPIC_COLOR_RGB, false); - root["rgb_command_topic"] = getTopic(MQTT_TOPIC_COLOR_RGB, true); - root["color_temp_command_topic"] = getTopic(MQTT_TOPIC_MIRED, true); + root["brightness_state_topic"] = mqttGetTopic(MQTT_TOPIC_BRIGHTNESS, false); + root["brightness_command_topic"] = mqttGetTopic(MQTT_TOPIC_BRIGHTNESS, true); + root["rgb_state_topic"] = mqttGetTopic(MQTT_TOPIC_COLOR_RGB, false); + root["rgb_command_topic"] = mqttGetTopic(MQTT_TOPIC_COLOR_RGB, true); + root["color_temp_command_topic"] = mqttGetTopic(MQTT_TOPIC_MIRED, true); } if (lightChannels() > 3) { - root["white_value_state_topic"] = getTopic(MQTT_TOPIC_CHANNEL, 3, false); - root["white_value_command_topic"] = getTopic(MQTT_TOPIC_CHANNEL, 3, true); + root["white_value_state_topic"] = mqttGetTopic(MQTT_TOPIC_CHANNEL, 3, false); + root["white_value_command_topic"] = mqttGetTopic(MQTT_TOPIC_CHANNEL, 3, true); } #endif // LIGHT_PROVIDER != LIGHT_PROVIDER_NONE diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index 98bc7d07..1bb7dd72 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -60,57 +60,9 @@ std::vector _mqtt_queue; Ticker _mqtt_flush_ticker; // ----------------------------------------------------------------------------- -// Public API +// Private // ----------------------------------------------------------------------------- -bool mqttConnected() { - return _mqtt.connected(); -} - -void mqttDisconnect() { - if (_mqtt.connected()) { - DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n")); - _mqtt.disconnect(); - } -} - -bool mqttForward() { - return _mqtt_forward; -} - -String mqttSubtopic(char * topic) { - String response; - String t = String(topic); - if (t.startsWith(_mqtt_topic) && t.endsWith(_mqtt_setter)) { - response = t.substring(_mqtt_topic.length(), t.length() - _mqtt_setter.length()); - } - return response; -} - -void mqttSendRaw(const char * topic, const char * message) { - if (_mqtt.connected()) { - #if MQTT_USE_ASYNC - unsigned int packetId = _mqtt.publish(topic, _mqtt_qos, _mqtt_retain, message); - DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId); - #else - _mqtt.publish(topic, message, _mqtt_retain); - DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message); - #endif - } -} - -String getTopic(const char * topic, bool set) { - String output = _mqtt_topic + String(topic); - if (set) output += _mqtt_setter; - return output; -} - -String getTopic(const char * topic, unsigned int index, bool set) { - char buffer[strlen(topic)+5]; - snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); - return getTopic(buffer, set); -} - void _mqttFlush() { if (_mqtt_queue.size() == 0) return; @@ -141,190 +93,7 @@ void _mqttFlush() { } -void mqttSend(const char * topic, const char * message, bool force) { - bool useJson = force ? false : _mqtt_use_json; - if (useJson) { - - if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) _mqttFlush(); - - mqtt_message_t element; - element.topic = strdup(topic); - element.message = strdup(message); - _mqtt_queue.push_back(element); - _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); - - } else { - String path = _mqtt_topic + String(topic) + _mqtt_getter; - mqttSendRaw(path.c_str(), message); - } -} - -void mqttSend(const char * topic, const char * message) { - mqttSend(topic, message, false); -} - -void mqttSend(const char * topic, unsigned int index, const char * message, bool force) { - char buffer[strlen(topic)+5]; - snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); - mqttSend(buffer, message, force); -} - -void mqttSend(const char * topic, unsigned int index, const char * message) { - mqttSend(topic, index, message, false); -} - -void mqttSubscribeRaw(const char * topic) { - if (_mqtt.connected() && (strlen(topic) > 0)) { - #if MQTT_USE_ASYNC - unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos); - DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId); - #else - _mqtt.subscribe(topic, _mqtt_qos); - DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); - #endif - } -} - -void mqttSubscribe(const char * topic) { - String path = _mqtt_topic + String(topic) + _mqtt_setter; - mqttSubscribeRaw(path.c_str()); -} - -void mqttUnsubscribeRaw(const char * topic) { - if (_mqtt.connected() && (strlen(topic) > 0)) { - #if MQTT_USE_ASYNC - unsigned int packetId = _mqtt.unsubscribe(topic); - DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId); - #else - _mqtt.unsubscribe(topic); - DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic); - #endif - } -} - -void mqttRegister(mqtt_callback_f callback) { - _mqtt_callbacks.push_back(callback); -} - -// ----------------------------------------------------------------------------- -// Callbacks -// ----------------------------------------------------------------------------- - -#if WEB_SUPPORT - -void _mqttWebSocketOnSend(JsonObject& root) { - root["mqttVisible"] = 1; - root["mqttStatus"] = mqttConnected(); - root["mqttEnabled"] = mqttEnabled(); - root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER); - root["mqttPort"] = getSetting("mqttPort", MQTT_PORT); - root["mqttUser"] = getSetting("mqttUser"); - root["mqttClientID"] = getSetting("mqttClientID"); - root["mqttPassword"] = getSetting("mqttPassword"); - root["mqttKeep"] = _mqtt_keepalive; - root["mqttRetain"] = _mqtt_retain; - root["mqttQoS"] = _mqtt_qos; - #if ASYNC_TCP_SSL_ENABLED - root["mqttsslVisible"] = 1; - root["mqttUseSSL"] = getSetting("mqttUseSSL", 0).toInt() == 1; - root["mqttFP"] = getSetting("mqttFP"); - #endif - root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC); - root["mqttUseJson"] = getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1; -} - -#endif - -void _mqttCallback(unsigned int type, const char * topic, const char * payload) { - - if (type == MQTT_CONNECT_EVENT) { - - // Subscribe to internal action topics - mqttSubscribe(MQTT_TOPIC_ACTION); - - // Send heartbeat messages - heartbeat(); - - } - - if (type == MQTT_MESSAGE_EVENT) { - - // Match topic - String t = mqttSubtopic((char *) topic); - - // Actions - if (t.equals(MQTT_TOPIC_ACTION)) { - if (strcmp(payload, MQTT_ACTION_RESET) == 0) { - deferredReset(100, CUSTOM_RESET_MQTT); - } - } - - } - -} - -void _mqttOnConnect() { - - DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); - _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; - - #if MQTT_SKIP_RETAINED - _mqtt_connected_at = millis(); - #endif - - // Clean subscriptions - mqttUnsubscribeRaw("#"); - - // Send connect event to subscribers - for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { - (_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL); - } - -} - -void _mqttOnDisconnect() { - - DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); - - // Send disconnect event to subscribers - for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { - (_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL); - } - -} - -void _mqttOnMessage(char* topic, char* payload, unsigned int len) { - - if (len == 0) return; - - char message[len + 1]; - strlcpy(message, (char *) payload, len + 1); - - #if MQTT_SKIP_RETAINED - if (millis() - _mqtt_connected_at < MQTT_SKIP_TIME) { - DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message); - return; - } - #endif - DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message); - - // Send message event to subscribers - for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { - (_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message); - } - -} - -void mqttEnabled(bool status) { - _mqtt_enabled = status; - setSetting("mqttEnabled", status ? 1 : 0); -} - -bool mqttEnabled() { - return _mqtt_enabled; -} - -void mqttConnect() { +void _mqttConnect() { // Do not connect if disabled if (!_mqtt_enabled) return; @@ -467,7 +236,7 @@ void mqttConnect() { } -void mqttConfigure() { +void _mqttConfigure() { // Replace identifier _mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC); @@ -497,18 +266,37 @@ void mqttConfigure() { } -void mqttSetBroker(IPAddress ip, unsigned int port) { - setSetting("mqttServer", ip.toString()); - setSetting("mqttPort", port); - mqttEnabled(MQTT_AUTOCONNECT); -} +// ----------------------------------------------------------------------------- +// WEB +// ----------------------------------------------------------------------------- -void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) { - if (!hasSetting("mqttServer")) mqttSetBroker(ip, port); +#if WEB_SUPPORT + +void _mqttWebSocketOnSend(JsonObject& root) { + root["mqttVisible"] = 1; + root["mqttStatus"] = mqttConnected(); + root["mqttEnabled"] = mqttEnabled(); + root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER); + root["mqttPort"] = getSetting("mqttPort", MQTT_PORT); + root["mqttUser"] = getSetting("mqttUser"); + root["mqttClientID"] = getSetting("mqttClientID"); + root["mqttPassword"] = getSetting("mqttPassword"); + root["mqttKeep"] = _mqtt_keepalive; + root["mqttRetain"] = _mqtt_retain; + root["mqttQoS"] = _mqtt_qos; + #if ASYNC_TCP_SSL_ENABLED + root["mqttsslVisible"] = 1; + root["mqttUseSSL"] = getSetting("mqttUseSSL", 0).toInt() == 1; + root["mqttFP"] = getSetting("mqttFP"); + #endif + root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC); + root["mqttUseJson"] = getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1; } +#endif + // ----------------------------------------------------------------------------- -// Comands +// SETTINGS // ----------------------------------------------------------------------------- #if TERMINAL_SUPPORT @@ -516,7 +304,7 @@ void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) { void _mqttInitCommands() { settingsRegisterCommand(F("MQTT.RESET"), [](Embedis* e) { - mqttConfigure(); + _mqttConfigure(); mqttDisconnect(); DEBUG_MSG_P(PSTR("+OK\n")); }); @@ -526,7 +314,232 @@ void _mqttInitCommands() { #endif // TERMINAL_SUPPORT // ----------------------------------------------------------------------------- -// Setup +// MQTT Callbacks +// ----------------------------------------------------------------------------- + +void _mqttCallback(unsigned int type, const char * topic, const char * payload) { + + if (type == MQTT_CONNECT_EVENT) { + + // Subscribe to internal action topics + mqttSubscribe(MQTT_TOPIC_ACTION); + + // Send heartbeat messages + heartbeat(); + + } + + if (type == MQTT_MESSAGE_EVENT) { + + // Match topic + String t = mqttSubtopic((char *) topic); + + // Actions + if (t.equals(MQTT_TOPIC_ACTION)) { + if (strcmp(payload, MQTT_ACTION_RESET) == 0) { + deferredReset(100, CUSTOM_RESET_MQTT); + } + } + + } + +} + +void _mqttOnConnect() { + + DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); + _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; + + #if MQTT_SKIP_RETAINED + _mqtt_connected_at = millis(); + #endif + + // Clean subscriptions + mqttUnsubscribeRaw("#"); + + // Send connect event to subscribers + for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { + (_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL); + } + +} + +void _mqttOnDisconnect() { + + DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); + + // Send disconnect event to subscribers + for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { + (_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL); + } + +} + +void _mqttOnMessage(char* topic, char* payload, unsigned int len) { + + if (len == 0) return; + + char message[len + 1]; + strlcpy(message, (char *) payload, len + 1); + + #if MQTT_SKIP_RETAINED + if (millis() - _mqtt_connected_at < MQTT_SKIP_TIME) { + DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message); + return; + } + #endif + DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message); + + // Send message event to subscribers + for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { + (_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message); + } + +} + +// ----------------------------------------------------------------------------- +// Public API +// ----------------------------------------------------------------------------- + +void mqttEnabled(bool status) { + _mqtt_enabled = status; + setSetting("mqttEnabled", status ? 1 : 0); +} + +bool mqttEnabled() { + return _mqtt_enabled; +} + +bool mqttConnected() { + return _mqtt.connected(); +} + +void mqttDisconnect() { + if (_mqtt.connected()) { + DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n")); + _mqtt.disconnect(); + } +} + +bool mqttForward() { + return _mqtt_forward; +} + +String mqttSubtopic(char * topic) { + String response; + String t = String(topic); + if (t.startsWith(_mqtt_topic) && t.endsWith(_mqtt_setter)) { + response = t.substring(_mqtt_topic.length(), t.length() - _mqtt_setter.length()); + } + return response; +} + +String mqttGetTopic(const char * topic, bool set) { + String output = _mqtt_topic + String(topic); + if (set) output += _mqtt_setter; + return output; +} + +String mqttGetTopic(const char * topic, unsigned int index, bool set) { + char buffer[strlen(topic)+5]; + snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); + return mqttGetTopic(buffer, set); +} + +void mqttSendRaw(const char * topic, const char * message) { + if (_mqtt.connected()) { + #if MQTT_USE_ASYNC + unsigned int packetId = _mqtt.publish(topic, _mqtt_qos, _mqtt_retain, message); + DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId); + #else + _mqtt.publish(topic, message, _mqtt_retain); + DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message); + #endif + } +} + +void mqttSend(const char * topic, const char * message, bool force) { + bool useJson = force ? false : _mqtt_use_json; + if (useJson) { + + if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) _mqttFlush(); + + mqtt_message_t element; + element.topic = strdup(topic); + element.message = strdup(message); + _mqtt_queue.push_back(element); + _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); + + } else { + String path = _mqtt_topic + String(topic) + _mqtt_getter; + mqttSendRaw(path.c_str(), message); + } +} + +void mqttSend(const char * topic, const char * message) { + mqttSend(topic, message, false); +} + +void mqttSend(const char * topic, unsigned int index, const char * message, bool force) { + char buffer[strlen(topic)+5]; + snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); + mqttSend(buffer, message, force); +} + +void mqttSend(const char * topic, unsigned int index, const char * message) { + mqttSend(topic, index, message, false); +} + +void mqttSubscribeRaw(const char * topic) { + if (_mqtt.connected() && (strlen(topic) > 0)) { + #if MQTT_USE_ASYNC + unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos); + DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId); + #else + _mqtt.subscribe(topic, _mqtt_qos); + DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); + #endif + } +} + +void mqttSubscribe(const char * topic) { + String path = _mqtt_topic + String(topic) + _mqtt_setter; + mqttSubscribeRaw(path.c_str()); +} + +void mqttUnsubscribeRaw(const char * topic) { + if (_mqtt.connected() && (strlen(topic) > 0)) { + #if MQTT_USE_ASYNC + unsigned int packetId = _mqtt.unsubscribe(topic); + DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId); + #else + _mqtt.unsubscribe(topic); + DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic); + #endif + } +} + +void mqttRegister(mqtt_callback_f callback) { + _mqtt_callbacks.push_back(callback); +} + +void mqttSetBroker(IPAddress ip, unsigned int port) { + setSetting("mqttServer", ip.toString()); + setSetting("mqttPort", port); + mqttEnabled(MQTT_AUTOCONNECT); +} + +void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) { + if (!hasSetting("mqttServer")) mqttSetBroker(ip, port); +} + +void mqttReset() { + _mqttConfigure(); + mqttDisconnect(); +} + +// ----------------------------------------------------------------------------- +// Initialization // ----------------------------------------------------------------------------- void mqttSetup() { @@ -583,12 +596,12 @@ void mqttSetup() { #endif // MQTT_USE_ASYNC - mqttConfigure(); + _mqttConfigure(); mqttRegister(_mqttCallback); #if WEB_SUPPORT wsOnSendRegister(_mqttWebSocketOnSend); - wsOnAfterParseRegister(mqttConfigure); + wsOnAfterParseRegister(_mqttConfigure); #endif #if TERMINAL_SUPPORT @@ -603,7 +616,7 @@ void mqttLoop() { #if MQTT_USE_ASYNC - mqttConnect(); + _mqttConnect(); #else // not MQTT_USE_ASYNC @@ -618,7 +631,7 @@ void mqttLoop() { _mqtt_connected = false; } - mqttConnect(); + _mqttConnect(); } diff --git a/code/espurna/ws.ino b/code/espurna/ws.ino index 0989c4b7..c4f2e71f 100644 --- a/code/espurna/ws.ino +++ b/code/espurna/ws.ino @@ -182,10 +182,7 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) { // This should got to callback as well // but first change management has to be in place #if MQTT_SUPPORT - if (changedMQTT) { - mqttConfigure(); - mqttDisconnect(); - } + if (changedMQTT) mqttReset(); #endif // Persist settings