diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index d5590487..012862cf 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -12,37 +12,35 @@ Copyright (C) 2016-2017 by Xose PĂ©rez #include #include -const char *mqtt_user = 0; -const char *mqtt_pass = 0; - #if MQTT_USE_ASYNC // Using AsyncMqttClient #include -AsyncMqttClient mqtt; +AsyncMqttClient _mqtt; #else // Using PubSubClient #include -PubSubClient mqtt; -bool _mqttConnected = false; +PubSubClient _mqtt; +bool _mqtt_connected = false; -WiFiClient _mqttClient; +WiFiClient _mqtt_client; #if ASYNC_TCP_SSL_ENABLED -WiFiClientSecure _mqttClientSecure; +WiFiClientSecure _mqtt_client_secure; #endif // ASYNC_TCP_SSL_ENABLED #endif // MQTT_USE_ASYNC -bool _mqttEnabled = MQTT_ENABLED; -String _mqttTopic; -String _mqttSetter; -String _mqttGetter; -bool _mqttForward; -char *_mqttUser = 0; -char *_mqttPass = 0; -char *_mqttWill; +bool _mqtt_enabled = MQTT_ENABLED; +unsigned char _mqtt_connection_tries = 0; +String _mqtt_topic; +String _mqtt_setter; +String _mqtt_getter; +bool _mqtt_forward; +char *_mqtt_user = 0; +char *_mqtt_pass = 0; +char *_mqtt_will; #if MQTT_SKIP_RETAINED -unsigned long _mqttConnectedAt = 0; +unsigned long _mqtt_connected_at = 0; #endif std::vector _mqtt_callbacks; @@ -52,51 +50,51 @@ typedef struct { char * message; } mqtt_message_t; std::vector _mqtt_queue; -Ticker _mqttFlushTicker; +Ticker _mqtt_flush_ticker; // ----------------------------------------------------------------------------- // Public API // ----------------------------------------------------------------------------- bool mqttConnected() { - return mqtt.connected(); + return _mqtt.connected(); } void mqttDisconnect() { - if (mqtt.connected()) { + if (_mqtt.connected()) { DEBUG_MSG_P("[MQTT] Disconnecting\n"); - mqtt.disconnect(); + _mqtt.disconnect(); } } bool mqttForward() { - return _mqttForward; + return _mqtt_forward; } String mqttSubtopic(char * topic) { String response; String t = String(topic); - if (t.startsWith(_mqttTopic) && t.endsWith(_mqttSetter)) { - response = t.substring(_mqttTopic.length(), t.length() - _mqttSetter.length()); + if (t.startsWith(_mqtt_topic) && t.endsWith(_mqtt_setter)) { + response = t.substring(_mqtt_topic.length(), t.length() - _mqtt_setter.length()); } return response; } void mqttSendRaw(const char * topic, const char * message) { - if (mqtt.connected()) { + if (_mqtt.connected()) { #if MQTT_USE_ASYNC - unsigned int packetId = mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message); + unsigned int packetId = _mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message); DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId); #else - mqtt.publish(topic, message, MQTT_RETAIN); + _mqtt.publish(topic, message, MQTT_RETAIN); DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message); #endif } } String getTopic(const char * topic, bool set) { - String output = _mqttTopic + String(topic); - if (set) output += _mqttSetter; + String output = _mqtt_topic + String(topic); + if (set) output += _mqtt_setter; return output; } @@ -124,7 +122,7 @@ void _mqttFlush() { String output; root.printTo(output); - String path = _mqttTopic + String(MQTT_TOPIC_JSON); + String path = _mqtt_topic + String(MQTT_TOPIC_JSON); mqttSendRaw(path.c_str(), output.c_str()); for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { @@ -143,9 +141,9 @@ void mqttSend(const char * topic, const char * message, bool force) { element.topic = strdup(topic); element.message = strdup(message); _mqtt_queue.push_back(element); - _mqttFlushTicker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); + _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); } else { - String path = _mqttTopic + String(topic) + _mqttGetter; + String path = _mqtt_topic + String(topic) + _mqtt_getter; mqttSendRaw(path.c_str(), message); } } @@ -165,19 +163,19 @@ void mqttSend(const char * topic, unsigned int index, const char * message) { } void mqttSubscribeRaw(const char * topic) { - if (mqtt.connected() && (strlen(topic) > 0)) { + if (_mqtt.connected() && (strlen(topic) > 0)) { #if MQTT_USE_ASYNC - unsigned int packetId = mqtt.subscribe(topic, MQTT_QOS); + unsigned int packetId = _mqtt.subscribe(topic, MQTT_QOS); DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId); #else - mqtt.subscribe(topic, MQTT_QOS); + _mqtt.subscribe(topic, MQTT_QOS); DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); #endif } } void mqttSubscribe(const char * topic) { - String path = _mqttTopic + String(topic) + _mqttSetter; + String path = _mqtt_topic + String(topic) + _mqtt_setter; mqttSubscribeRaw(path.c_str()); } @@ -220,7 +218,7 @@ void _mqttOnConnect() { DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); #if MQTT_SKIP_RETAINED - _mqttConnectedAt = millis(); + _mqtt_connected_at = millis(); #endif // Send first Heartbeat @@ -252,7 +250,7 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) { strlcpy(message, (char *) payload, len + 1); #if MQTT_SKIP_RETAINED - if (millis() - _mqttConnectedAt < MQTT_SKIP_TIME) { + if (millis() - _mqtt_connected_at < MQTT_SKIP_TIME) { DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message); return; } @@ -308,67 +306,66 @@ bool mqttFormatFP(const char * fingerprint, char * destination) { #endif void mqttEnabled(bool status) { - _mqttEnabled = status; + _mqtt_enabled = status; setSetting("mqttEnabled", status ? 1 : 0); } bool mqttEnabled() { - return _mqttEnabled; + return _mqtt_enabled; } void mqttConnect() { - if (_mqttEnabled & !mqtt.connected()) { + if (_mqtt_enabled & !_mqtt.connected()) { // Disable MQTT after MQTT_MAX_TRIES attemps in a row #if MQTT_MAX_TRIES > 0 - static unsigned int tries = 0; static unsigned long last_try = millis(); if (millis() - last_try < MQTT_TRY_INTERVAL) { - if (++tries > MQTT_MAX_TRIES) { + if (++_mqtt_connection_tries > MQTT_MAX_TRIES) { DEBUG_MSG_P(PSTR("[MQTT] MQTT_MAX_TRIES met, disabling MQTT\n")); mqttEnabled(false); - tries = 0; + _mqtt_connection_tries = 0; return; } } else { - tries = 0; + _mqtt_connection_tries = 0; } last_try = millis(); #endif - if (_mqttUser) free(_mqttUser); - if (_mqttPass) free(_mqttPass); + if (_mqtt_user) free(_mqtt_user); + if (_mqtt_pass) free(_mqtt_pass); char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str()); if (strlen(host) == 0) return; unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt(); - _mqttUser = strdup(getSetting("mqttUser").c_str()); - _mqttPass = strdup(getSetting("mqttPassword").c_str()); - if (_mqttWill) free(_mqttWill); - _mqttWill = strdup((_mqttTopic + MQTT_TOPIC_STATUS).c_str()); + _mqtt_user = strdup(getSetting("mqttUser").c_str()); + _mqtt_pass = strdup(getSetting("mqttPassword").c_str()); + if (_mqtt_will) free(_mqtt_will); + _mqtt_will = strdup((_mqtt_topic + MQTT_TOPIC_STATUS).c_str()); DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port); #if MQTT_USE_ASYNC - mqtt.setServer(host, port); - mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false); - mqtt.setWill(_mqttWill, MQTT_QOS, MQTT_RETAIN, "0"); - if ((strlen(_mqttUser) > 0) && (strlen(_mqttPass) > 0)) { - DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqttUser); - mqtt.setCredentials(_mqttUser, _mqttPass); + _mqtt.setServer(host, port); + _mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false); + _mqtt.setWill(_mqtt_will, MQTT_QOS, MQTT_RETAIN, "0"); + if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) { + DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user); + _mqtt.setCredentials(_mqtt_user, _mqtt_pass); } #if ASYNC_TCP_SSL_ENABLED bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; - mqtt.setSecure(secure); + _mqtt.setSecure(secure); if (secure) { DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); unsigned char fp[20] = {0}; if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { - mqtt.addServerFingerprint(fp); + _mqtt.addServerFingerprint(fp); } else { DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n")); } @@ -376,7 +373,7 @@ void mqttConnect() { #endif // ASYNC_TCP_SSL_ENABLED - mqtt.connect(); + _mqtt.connect(); #else // not MQTT_USE_ASYNC @@ -387,11 +384,11 @@ void mqttConnect() { bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; if (secure) { DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); - if (_mqttClientSecure.connect(host, port)) { + if (_mqtt_client_secure.connect(host, port)) { char fp[60] = {0}; if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { - if (_mqttClientSecure.verify(fp, host)) { - mqtt.setClient(_mqttClientSecure); + if (_mqtt_client_secure.verify(fp, host)) { + _mqtt.setClient(_mqtt_client_secure); } else { DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n")); response = false; @@ -406,31 +403,31 @@ void mqttConnect() { } } else { - mqtt.setClient(_mqttClient); + _mqtt.setClient(_mqtt_client); } #else // not ASYNC_TCP_SSL_ENABLED - mqtt.setClient(_mqttClient); + _mqtt.setClient(_mqtt_client); #endif // ASYNC_TCP_SSL_ENABLED if (response) { - mqtt.setServer(host, port); + _mqtt.setServer(host, port); - if ((strlen(_mqttUser) > 0) && (strlen(_mqttPass) > 0)) { - DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqttUser); - response = mqtt.connect(getIdentifier().c_str(), _mqttUser, _mqttPass, _mqttWill, MQTT_QOS, MQTT_RETAIN, "0"); + if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) { + DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user); + response = _mqtt.connect(getIdentifier().c_str(), _mqtt_user, _mqtt_pass, _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0"); } else { - response = mqtt.connect(getIdentifier().c_str(), _mqttWill, MQTT_QOS, MQTT_RETAIN, "0"); + response = _mqtt.connect(getIdentifier().c_str(), _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0"); } } if (response) { _mqttOnConnect(); - _mqttConnected = true; + _mqtt_connected = true; } else { DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n")); } @@ -446,20 +443,21 @@ void mqttConnect() { void mqttConfigure() { // Replace identifier - _mqttTopic = getSetting("mqttTopic", MQTT_TOPIC); - _mqttTopic.replace("{identifier}", getSetting("hostname")); - if (!_mqttTopic.endsWith("/")) _mqttTopic = _mqttTopic + "/"; + _mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC); + _mqtt_topic.replace("{identifier}", getSetting("hostname")); + if (!_mqtt_topic.endsWith("/")) _mqtt_topic = _mqtt_topic + "/"; // Getters and setters - _mqttSetter = getSetting("mqttSetter", MQTT_USE_SETTER); - _mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER); - _mqttForward = !_mqttGetter.equals(_mqttSetter); + _mqtt_setter = getSetting("mqttSetter", MQTT_USE_SETTER); + _mqtt_getter = getSetting("mqttGetter", MQTT_USE_GETTER); + _mqtt_forward = !_mqtt_getter.equals(_mqtt_setter); // Enable + _mqtt_connection_tries = 0; if (getSetting("mqttServer", MQTT_SERVER).length() == 0) { mqttEnabled(false); } else { - _mqttEnabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1; + _mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1; } } @@ -491,10 +489,10 @@ void mqttSetup() { DEBUG_MSG_P(PSTR("[MQTT] Using ASYNC MQTT library\n")); - mqtt.onConnect([](bool sessionPresent) { + _mqtt.onConnect([](bool sessionPresent) { _mqttOnConnect(); }); - mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { + _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) { DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n")); } @@ -517,13 +515,13 @@ void mqttSetup() { #endif _mqttOnDisconnect(); }); - mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { + _mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { _mqttOnMessage(topic, payload, len); }); - mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) { + _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) { DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId); }); - mqtt.onPublish([](uint16_t packetId) { + _mqtt.onPublish([](uint16_t packetId) { DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId); }); @@ -531,7 +529,7 @@ void mqttSetup() { DEBUG_MSG_P(PSTR("[MQTT] Using SYNC MQTT library\n")); - mqtt.setCallback([](char* topic, byte* payload, unsigned int length) { + _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) { _mqttOnMessage(topic, (char *) payload, length); }); @@ -546,9 +544,9 @@ void mqttLoop() { #if MQTT_USE_ASYNC - if (!_mqttEnabled) return; + if (!_mqtt_enabled) return; if (WiFi.status() != WL_CONNECTED) return; - if (mqtt.connected) return; + if (_mqtt.connected) return; static unsigned long last = 0; if (millis() - last > MQTT_RECONNECT_DELAY) { @@ -560,18 +558,18 @@ void mqttLoop() { if (WiFi.status() != WL_CONNECTED) return; - if (mqtt.connected()) { + if (_mqtt.connected()) { - mqtt.loop(); + _mqtt.loop(); } else { - if (_mqttConnected) { + if (_mqtt_connected) { _mqttOnDisconnect(); - _mqttConnected = false; + _mqtt_connected = false; } - if (_mqttEnabled) { + if (_mqtt_enabled) { static unsigned long last = 0; if (millis() - last > MQTT_RECONNECT_DELAY) { last = millis();