|
|
- /*
-
- MQTT MODULE
-
- Copyright (C) 2016-2017 by Xose Pérez <xose dot perez at gmail dot com>
-
- */
-
- #include <ESP8266WiFi.h>
- #include <vector>
-
- #if MQTT_USE_ASYNC
- #include <AsyncMqttClient.h>
- AsyncMqttClient mqtt;
- #else
- #include <PubSubClient.h>
- WiFiClient mqttWiFiClient;
- PubSubClient mqtt(mqttWiFiClient);
- bool _mqttConnected = false;
- #endif
-
- String mqttTopic;
- bool _mqttForward;
- std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
- #if MQTT_SKIP_RETAINED
- unsigned long mqttConnectedAt = 0;
- #endif
-
- // -----------------------------------------------------------------------------
- // Public API
- // -----------------------------------------------------------------------------
-
- bool mqttConnected() {
- return mqtt.connected();
- }
-
- void mqttDisconnect() {
- mqtt.disconnect();
- }
-
- void buildTopics() {
- // Replace identifier
- mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
- mqttTopic.replace("{identifier}", getSetting("hostname"));
- }
-
- bool mqttForward() {
- return _mqttForward;
- }
-
- String mqttSubtopic(char * topic) {
-
- String response;
-
- String t = String(topic);
- String mqttSetter = getSetting("mqttSetter", MQTT_USE_SETTER);
-
- if (t.startsWith(mqttTopic) && t.endsWith(mqttSetter)) {
- response = t.substring(mqttTopic.length(), t.length() - mqttSetter.length());
- }
-
- return response;
-
- }
-
- void mqttSendRaw(const char * topic, const char * message) {
- if (mqtt.connected()) {
- DEBUG_MSG("[MQTT] Sending %s => %s\n", topic, message);
- #if MQTT_USE_ASYNC
- mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message);
- #else
- mqtt.publish(topic, message, MQTT_RETAIN);
- #endif
- }
- }
-
- void mqttSend(const char * topic, const char * message) {
- String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER);
- String path = mqttTopic + String(topic) + mqttGetter;
- mqttSendRaw(path.c_str(), message);
- }
-
- void mqttSend(const char * topic, unsigned int index, const char * message) {
- String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER);
- String path = mqttTopic + String(topic) + String ("/") + String(index) + mqttGetter;;
- mqttSendRaw(path.c_str(), message);
- }
-
- void mqttSubscribeRaw(const char * topic) {
- if (mqtt.connected() && (strlen(topic) > 0)) {
- DEBUG_MSG("[MQTT] Subscribing to %s\n", topic);
- mqtt.subscribe(topic, MQTT_QOS);
- }
- }
-
- void mqttSubscribe(const char * topic) {
- String mqttSetter = getSetting("mqttSetter", MQTT_USE_SETTER);
- String path = mqttTopic + String(topic) + mqttSetter;
- mqttSubscribeRaw(path.c_str());
- }
-
- // -----------------------------------------------------------------------------
- // Callbacks
- // -----------------------------------------------------------------------------
-
- void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) {
- _mqtt_callbacks.push_back(callback);
- }
-
- void _mqttOnConnect() {
-
- DEBUG_MSG("[MQTT] Connected!\n");
-
- #if MQTT_SKIP_RETAINED
- mqttConnectedAt = millis();
- #endif
-
- // Build MQTT topics
- buildTopics();
-
- // Say hello and report our IP and VERSION
- mqttSend(MQTT_IP_TOPIC, getIP().c_str());
- mqttSend(MQTT_VERSION_TOPIC, APP_VERSION);
- mqttSend(MQTT_STATUS_TOPIC, "1");
-
- // Subscribe to system topics
- mqttSubscribe(MQTT_ACTION_TOPIC);
-
- // Send connect event to subscribers
- for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
- (*_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
- }
-
- }
-
- void _mqttOnDisconnect() {
-
- DEBUG_MSG("[MQTT] Disconnected!\n");
-
- // Send disconnect event to subscribers
- for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
- (*_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
- }
-
- }
-
- void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
-
- char message[len + 1];
- strlcpy(message, (char *) payload, len + 1);
-
- DEBUG_MSG("[MQTT] Received %s => %s", topic, message);
- #if MQTT_SKIP_RETAINED
- if (millis() - mqttConnectedAt < MQTT_SKIP_TIME) {
- DEBUG_MSG(" - SKIPPED\n");
- return;
- }
- #endif
- DEBUG_MSG("\n");
-
- // Check system topics
- String t = mqttSubtopic((char *) topic);
- if (t.equals(MQTT_ACTION_TOPIC)) {
- if (strcmp(message, MQTT_ACTION_RESET) == 0) {
- ESP.restart();
- }
- }
-
- // Send message event to subscribers
- for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
- (*_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
- }
-
- }
-
- void mqttConnect() {
-
- if (!mqtt.connected()) {
-
- if (getSetting("mqttServer", MQTT_SERVER).length() == 0) return;
-
- // Last option: reconnect to wifi after MQTT_MAX_TRIES attemps in a row
- #if MQTT_MAX_TRIES > 0
- static unsigned int tries = 0;
- static unsigned long last_try = millis();
- if (millis() - last_try < MQTT_TRY_INTERVAL) {
- if (++tries > MQTT_MAX_TRIES) {
- DEBUG_MSG("[MQTT] MQTT_MAX_TRIES met, disconnecting from WiFi\n");
- wifiDisconnect();
- tries = 0;
- return;
- }
- } else {
- tries = 0;
- }
- last_try = millis();
- #endif
-
- mqtt.disconnect();
-
- char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
- unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
- char * user = strdup(getSetting("mqttUser").c_str());
- char * pass = strdup(getSetting("mqttPassword").c_str());
-
- DEBUG_MSG("[MQTT] Connecting to broker at %s", host);
- mqtt.setServer(host, port);
-
- #if MQTT_USE_ASYNC
-
- mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
- mqtt.setWill((mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
- if ((strlen(user) > 0) && (strlen(pass) > 0)) {
- DEBUG_MSG(" as user '%s'.", user);
- mqtt.setCredentials(user, pass);
- }
- DEBUG_MSG("\n");
- mqtt.connect();
-
- #else
-
- bool response;
-
- if ((strlen(user) > 0) && (strlen(pass) > 0)) {
- DEBUG_MSG(" as user '%s'\n", user);
- response = mqtt.connect(getIdentifier().c_str(), user, pass, (mqttTopic + MQTT_STATUS_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
- } else {
- DEBUG_MSG("\n");
- response = mqtt.connect(getIdentifier().c_str(), (mqttTopic + MQTT_STATUS_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
- }
-
- if (response) {
- _mqttOnConnect();
- _mqttConnected = true;
- } else {
- DEBUG_MSG("[MQTT] Connection failed\n");
- }
-
- #endif
-
- free(host);
- free(user);
- free(pass);
-
- String mqttSetter = getSetting("mqttSetter", MQTT_USE_SETTER);
- String mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER);
- bool _mqttForward = !mqttGetter.equals(mqttSetter);
-
- }
-
- }
-
- void mqttSetup() {
- #if MQTT_USE_ASYNC
- mqtt.onConnect([](bool sessionPresent) {
- _mqttOnConnect();
- });
- mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
- _mqttOnDisconnect();
- });
- mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
- _mqttOnMessage(topic, payload, len);
- });
- #else
- mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
- _mqttOnMessage(topic, (char *) payload, length);
- });
- #endif
- buildTopics();
- }
-
- void mqttLoop() {
-
- static unsigned long lastPeriod = 0;
-
- if (WiFi.status() == WL_CONNECTED) {
-
- if (!mqtt.connected()) {
-
- #if not MQTT_USE_ASYNC
- if (_mqttConnected) {
- _mqttOnDisconnect();
- _mqttConnected = false;
- }
- #endif
-
- unsigned long currPeriod = millis() / MQTT_RECONNECT_DELAY;
- if (currPeriod != lastPeriod) {
- lastPeriod = currPeriod;
- mqttConnect();
- }
-
- #if not MQTT_USE_ASYNC
- } else {
- mqtt.loop();
- #endif
-
- }
-
- }
-
- }
|