From 04ba42567847b8475a0b4445e77ba1e2ef78e2b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xose=20P=C3=A9rez?= Date: Tue, 21 Feb 2017 19:12:48 +0100 Subject: [PATCH] Using PubSubClient by default --- code/espurna/config/general.h | 2 + code/espurna/mqtt.ino | 103 +++++++++++++++++++++++++++------- code/platformio.ini | 1 + 3 files changed, 86 insertions(+), 20 deletions(-) diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index df79c99d..77da0758 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -119,6 +119,8 @@ // MQTT // ----------------------------------------------------------------------------- +#define MQTT_USE_ASYNC 0 + #define MQTT_SERVER "" #define MQTT_PORT 1883 #define MQTT_TOPIC "/test/switch/{identifier}" diff --git a/code/espurna/mqtt.ino b/code/espurna/mqtt.ino index 33a6ad2a..f4bc5c51 100644 --- a/code/espurna/mqtt.ino +++ b/code/espurna/mqtt.ino @@ -7,10 +7,17 @@ Copyright (C) 2016-2017 by Xose PĂ©rez */ #include -#include #include +#if MQTT_USE_ASYNC +#include AsyncMqttClient mqtt; +#else +#include +WiFiClient mqttWiFiClient; +PubSubClient mqtt(mqttWiFiClient); +bool _mqttConnected = false; +#endif String mqttTopic; std::vector _mqtt_callbacks; @@ -19,7 +26,7 @@ std::vector _mqtt_callbacks; #endif // ----------------------------------------------------------------------------- -// MQTT +// Public API // ----------------------------------------------------------------------------- bool mqttConnected() { @@ -44,7 +51,11 @@ char * mqttSubtopic(char * topic) { void mqttSendRaw(const char * topic, const char * message) { if (mqtt.connected()) { DEBUG_MSG("[MQTT] Sending %s => %s\n", topic, message); - mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message); + #if MQTT_USE_ASYNC + mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message); + #else + mqtt.publish(topic, message, MQTT_RETAIN); + #endif } } @@ -65,11 +76,15 @@ void mqttSubscribe(const char * topic) { mqttSubscribeRaw(path.c_str()); } +// ----------------------------------------------------------------------------- +// Callbacks +// ----------------------------------------------------------------------------- + void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) { _mqtt_callbacks.push_back(callback); } -void _mqttOnConnect(bool sessionPresent) { +void _mqttOnConnect() { DEBUG_MSG("[MQTT] Connected!\n"); @@ -94,7 +109,7 @@ void _mqttOnConnect(bool sessionPresent) { } -void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) { +void _mqttOnDisconnect() { DEBUG_MSG("[MQTT] Disconnected!\n"); @@ -105,10 +120,10 @@ void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) { } -void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { +void _mqttOnMessage(char* topic, char* payload, unsigned int len) { - char message[len+1]; - strlcpy(message, payload, len+1); + char message[len + 1]; + strlcpy(message, (char *) payload, len + 1); DEBUG_MSG("[MQTT] Received %s => %s", topic, message); #if MQTT_SKIP_RETAINED @@ -122,7 +137,7 @@ void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties // Check system topics char * p = mqttSubtopic(topic); if (strcmp(p, MQTT_ACTION_TOPIC) == 0) { - if (strcmp(payload, MQTT_ACTION_RESET) == 0) { + if (strcmp(message, MQTT_ACTION_RESET) == 0) { ESP.reset(); } } @@ -167,14 +182,38 @@ void mqttConnect() { DEBUG_MSG("[MQTT] Connecting to broker at %s", host); mqtt.setServer(host, port); - mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false); - mqtt.setWill((mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0"); - if ((strlen(user) > 0) && (strlen(pass) > 0)) { - DEBUG_MSG(" as user '%s'.", user); - mqtt.setCredentials(user, pass); - } - DEBUG_MSG("\n"); - mqtt.connect(); + + #if MQTT_USE_ASYNC + + mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false); + mqtt.setWill((mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0"); + if ((strlen(user) > 0) && (strlen(pass) > 0)) { + DEBUG_MSG(" as user '%s'.", user); + mqtt.setCredentials(user, pass); + } + DEBUG_MSG("\n"); + mqtt.connect(); + + #else + + bool response; + + if ((strlen(user) > 0) && (strlen(pass) > 0)) { + DEBUG_MSG(" as user '%s'\n", user); + response = mqtt.connect(getIdentifier().c_str(), user, pass, (mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0"); + } else { + DEBUG_MSG("\n"); + response = mqtt.connect(getIdentifier().c_str(), (mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0"); + } + + if (response) { + _mqttOnConnect(); + _mqttConnected = true; + } else { + DEBUG_MSG("[MQTT] Connection failed\n"); + } + + #endif free(host); free(user); @@ -185,9 +224,21 @@ void mqttConnect() { } void mqttSetup() { - mqtt.onConnect(_mqttOnConnect); - mqtt.onDisconnect(_mqttOnDisconnect); - mqtt.onMessage(_mqttOnMessage); + #if MQTT_USE_ASYNC + mqtt.onConnect([](bool sessionPresent) { + _mqttOnConnect(); + }); + mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { + _mqttOnDisconnect(); + }); + mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { + _mqttOnMessage(topic, payload, len); + }); + #else + mqtt.setCallback([](char* topic, byte* payload, unsigned int length) { + _mqttOnMessage(topic, (char *) payload, length); + }); + #endif buildTopics(); } @@ -199,12 +250,24 @@ void mqttLoop() { if (!mqtt.connected()) { + #if not MQTT_USE_ASYNC + if (_mqttConnected) { + _mqttOnDisconnect(); + _mqttConnected = false; + } + #endif + unsigned long currPeriod = millis() / MQTT_RECONNECT_DELAY; if (currPeriod != lastPeriod) { lastPeriod = currPeriod; mqttConnect(); } + #if not MQTT_USE_ASYNC + } else { + mqtt.loop(); + #endif + } } diff --git a/code/platformio.ini b/code/platformio.ini index d89e3ff3..96976503 100644 --- a/code/platformio.ini +++ b/code/platformio.ini @@ -12,6 +12,7 @@ lib_deps = https://github.com/me-no-dev/ESPAsyncTCP#36b6b5a https://github.com/me-no-dev/ESPAsyncWebServer#bab5457 https://github.com/marvinroger/async-mqtt-client#f1b4576 + PubSubClient Embedis NtpClientLib OneWire