diff --git a/code/src/config/general.h b/code/src/config/general.h index 8c497c9c..250ef440 100644 --- a/code/src/config/general.h +++ b/code/src/config/general.h @@ -63,6 +63,10 @@ #define MQTT_FSVERSION_TOPIC "/fsversion" #define MQTT_HEARTBEAT_TOPIC "/heartbeat" +#define MQTT_CONNECT_EVENT 0 +#define MQTT_DISCONNECT_EVENT 1 +#define MQTT_MESSAGE_EVENT 2 + // ----------------------------------------------------------------------------- // NTP // ----------------------------------------------------------------------------- diff --git a/code/src/mqtt.ino b/code/src/mqtt.ino index 4f20122f..f1507512 100644 --- a/code/src/mqtt.ino +++ b/code/src/mqtt.ino @@ -9,11 +9,12 @@ Copyright (C) 2016 by Xose PĂ©rez #include #include +#include AsyncMqttClient mqtt; String mqttTopic; -bool isCallbackMessage = false; +std::vector _mqtt_callbacks; // ----------------------------------------------------------------------------- // MQTT @@ -33,22 +34,26 @@ void buildTopics() { mqttTopic.replace("{identifier}", getSetting("hostname")); } -void mqttSend(char * topic, char * message) { +void mqttSend(const char * topic, const char * message) { if (!mqtt.connected()) return; - if (isCallbackMessage) return; String path = mqttTopic + String(topic); DEBUG_MSG("[MQTT] Sending %s %s\n", (char *) path.c_str(), message); mqtt.publish(path.c_str(), MQTT_QOS, MQTT_RETAIN, message); } -void _mqttOnConnect(bool sessionPresent) { +void mqttSubscribe(const char * topic) { + String path = mqttTopic + String(topic); + DEBUG_MSG("[MQTT] Subscribing to %s\n", (char *) path.c_str()); + mqtt.subscribe(path.c_str(), MQTT_QOS); +} - char buffer[50]; +void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) { + _mqtt_callbacks.push_back(callback); +} - DEBUG_MSG("[MQTT] Connected!\n"); +void _mqttOnConnect(bool sessionPresent) { - // Send status via webSocket - wsSend((char *) "{\"mqttStatus\": true}"); + DEBUG_MSG("[MQTT] Connected!\n"); // Build MQTT topics buildTopics(); @@ -56,57 +61,37 @@ void _mqttOnConnect(bool sessionPresent) { // Say hello and report our IP and VERSION mqttSend((char *) MQTT_IP_TOPIC, (char *) getIP().c_str()); mqttSend((char *) MQTT_VERSION_TOPIC, (char *) APP_VERSION); + char buffer[50]; getFSVersion(buffer); mqttSend((char *) MQTT_FSVERSION_TOPIC, buffer); - // Publish current relay status - relayMQTT(); - - // Subscribe to relay topics - sprintf(buffer, "%s/relay/#", mqttTopic.c_str()); - DEBUG_MSG("[MQTT] Subscribing to %s\n", buffer); - mqtt.subscribe(buffer, MQTT_QOS); + // Send connect event to subscribers + for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { + (*_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL); + } } void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) { - // Send status via webSocket - wsSend((char *) "{\"mqttStatus\": false}"); + DEBUG_MSG("[MQTT] Disconnected!\n"); + + // Send disconnect event to subscribers + for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { + (*_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL); + } } void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { - static bool isFirstMessage = true; - DEBUG_MSG("[MQTT] Received %s %c\n", topic, payload[0]); - // If relayMode is not SAME avoid responding to a retained message - if (isFirstMessage) { - isFirstMessage = false; - byte relayMode = getSetting("relayMode", String(RELAY_MODE)).toInt(); - if (relayMode != RELAY_MODE_SAME) return; - } - - // Get relay ID - unsigned int relayID = topic[strlen(topic)-1] - '0'; - if (relayID >= relayCount()) relayID = 0; - - // Action to perform - if ((char)payload[0] == '0') { - isCallbackMessage = true; - relayStatus(relayID, false); - } - if ((char)payload[0] == '1') { - isCallbackMessage = true; - relayStatus(relayID, true); + // Send message event to subscribers + // Topic is set to the specific part each one might be checking + for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) { + (*_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic + mqttTopic.length(), payload); } - if ((char)payload[0] == '2') { - relayToggle(relayID); - } - - isCallbackMessage = false; } diff --git a/code/src/relay.ino b/code/src/relay.ino index ae54048f..f6fdb61b 100644 --- a/code/src/relay.ino +++ b/code/src/relay.ino @@ -50,28 +50,6 @@ void relayWS() { wsSend((char *) output.c_str()); } -void relaySave() { - unsigned char bit = 1; - unsigned char mask = 0; - for (unsigned int i=0; i < _relays.size(); i++) { - if (relayStatus(i)) mask += bit; - bit += bit; - } - EEPROM.write(0, mask); - EEPROM.commit(); -} - -void relayRetrieve() { - recursive = true; - unsigned char bit = 1; - unsigned char mask = EEPROM.read(0); - for (unsigned int i=0; i < _relays.size(); i++) { - relayStatus(i, ((mask & bit) == bit)); - bit += bit; - } - recursive = false; -} - bool relayStatus(unsigned char id) { #ifdef SONOFF_DUAL return ((dualRelayStatus & (1 << id)) > 0); @@ -80,6 +58,42 @@ bool relayStatus(unsigned char id) { #endif } +bool relayStatus(unsigned char id, bool status, bool report = true) { + + bool changed = false; + + if (relayStatus(id) != status) { + + DEBUG_MSG("[RELAY] %d => %s\n", id, status ? "ON" : "OFF"); + changed = true; + + #ifdef SONOFF_DUAL + + dualRelayStatus ^= (1 << id); + Serial.flush(); + Serial.write(0xA0); + Serial.write(0x04); + Serial.write(dualRelayStatus); + Serial.write(0xA1); + Serial.flush(); + + #else + digitalWrite(_relays[id], status); + #endif + + if (!recursive) { + relaySync(id); + relaySave(); + } + + } + + if (report) relayMQTT(id); + if (!recursive) relayWS(); + return changed; + +} + void relaySync(unsigned char id) { if (_relays.size() > 1) { @@ -117,47 +131,74 @@ void relaySync(unsigned char id) { } -bool relayStatus(unsigned char id, bool status) { +void relaySave() { + unsigned char bit = 1; + unsigned char mask = 0; + for (unsigned int i=0; i < _relays.size(); i++) { + if (relayStatus(i)) mask += bit; + bit += bit; + } + EEPROM.write(0, mask); + EEPROM.commit(); +} + +void relayRetrieve() { + recursive = true; + unsigned char bit = 1; + unsigned char mask = EEPROM.read(0); + for (unsigned int i=0; i < _relays.size(); i++) { + relayStatus(i, ((mask & bit) == bit)); + bit += bit; + } + recursive = false; +} + +void relayToggle(unsigned char id) { + relayStatus(id, !relayStatus(id)); +} - bool changed = false; +unsigned char relayCount() { + return _relays.size(); +} - if (relayStatus(id) != status) { +void relayMQTTCallback(unsigned int type, const char * topic, const char * payload) { - DEBUG_MSG("[RELAY] %d => %s\n", id, status ? "ON" : "OFF"); - changed = true; + static bool isFirstMessage = true; - #ifdef SONOFF_DUAL + if (type == MQTT_CONNECT_EVENT) { + relayMQTT(); + mqttSubscribe("/relay/#"); + } - dualRelayStatus ^= (1 << id); - Serial.flush(); - Serial.write(0xA0); - Serial.write(0x04); - Serial.write(dualRelayStatus); - Serial.write(0xA1); - Serial.flush(); + if (type == MQTT_MESSAGE_EVENT) { - #else - digitalWrite(_relays[id], status); - #endif + // Match topic + if (memcmp("/relay/", topic, 7) != 0) return; - if (!recursive) { - relaySync(id); - relaySave(); + // If relayMode is not SAME avoid responding to a retained message + if (isFirstMessage) { + isFirstMessage = false; + byte relayMode = getSetting("relayMode", String(RELAY_MODE)).toInt(); + if (relayMode != RELAY_MODE_SAME) return; } - } + // Get relay ID + unsigned int relayID = topic[strlen(topic)-1] - '0'; + if (relayID >= relayCount()) relayID = 0; - relayMQTT(id); - if (!recursive) relayWS(); - return changed; -} + // Action to perform + if ((char)payload[0] == '0') { + relayStatus(relayID, false, false); + } + if ((char)payload[0] == '1') { + relayStatus(relayID, true, false); + } + if ((char)payload[0] == '2') { + relayToggle(relayID); + } -void relayToggle(unsigned char id) { - relayStatus(id, !relayStatus(id)); -} + } -unsigned char relayCount() { - return _relays.size(); } void relaySetup() { @@ -196,4 +237,6 @@ void relaySetup() { if (relayMode == RELAY_MODE_SAME) relayRetrieve(); + mqttRegister(relayMQTTCallback); + } diff --git a/code/src/web.ino b/code/src/web.ino index d49546be..36a88046 100644 --- a/code/src/web.ino +++ b/code/src/web.ino @@ -39,6 +39,18 @@ bool wsSend(uint32_t client_id, char * payload) { ws.text(client_id, payload); } +void wsMQTTCallback(unsigned int type, const char * topic, const char * payload) { + + if (type == MQTT_CONNECT_EVENT) { + wsSend((char *) "{\"mqttStatus\": true}"); + } + + if (type == MQTT_DISCONNECT_EVENT) { + wsSend((char *) "{\"mqttStatus\": false}"); + } + +} + void _wsParse(uint32_t client_id, uint8_t * payload, size_t length) { // Parse JSON input @@ -448,8 +460,11 @@ ArRequestHandlerFunction _onRelayStatusWrapper(unsigned int relayID) { void webSetup() { - // Setup websocket plugin + // Setup websocket ws.onEvent(_wsEvent); + mqttRegister(wsMQTTCallback); + + // Setup webserver server.addHandler(&ws); // Serve home (basic authentication protection)