Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

276 lines
7.3 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2017 by Xose Pérez <xose dot perez at gmail dot com>
  4. */
  5. #include <ESP8266WiFi.h>
  6. #include <vector>
  7. #if MQTT_USE_ASYNC
  8. #include <AsyncMqttClient.h>
  9. AsyncMqttClient mqtt;
  10. #else
  11. #include <PubSubClient.h>
  12. WiFiClient mqttWiFiClient;
  13. PubSubClient mqtt(mqttWiFiClient);
  14. bool _mqttConnected = false;
  15. #endif
  16. String mqttTopic;
  17. std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
  18. #if MQTT_SKIP_RETAINED
  19. unsigned long mqttConnectedAt = 0;
  20. #endif
  21. // -----------------------------------------------------------------------------
  22. // Public API
  23. // -----------------------------------------------------------------------------
  24. bool mqttConnected() {
  25. return mqtt.connected();
  26. }
  27. void mqttDisconnect() {
  28. mqtt.disconnect();
  29. }
  30. void buildTopics() {
  31. // Replace identifier
  32. mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  33. mqttTopic.replace("{identifier}", getSetting("hostname"));
  34. }
  35. char * mqttSubtopic(char * topic) {
  36. int pos = min(mqttTopic.length(), strlen(topic));
  37. return topic + pos;
  38. }
  39. void mqttSendRaw(const char * topic, const char * message) {
  40. if (mqtt.connected()) {
  41. DEBUG_MSG("[MQTT] Sending %s => %s\n", topic, message);
  42. #if MQTT_USE_ASYNC
  43. mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message);
  44. #else
  45. mqtt.publish(topic, message, MQTT_RETAIN);
  46. #endif
  47. }
  48. }
  49. void mqttSend(const char * topic, const char * message) {
  50. String path = mqttTopic + String(topic);
  51. mqttSendRaw(path.c_str(), message);
  52. }
  53. void mqttSubscribeRaw(const char * topic) {
  54. if (mqtt.connected()) {
  55. DEBUG_MSG("[MQTT] Subscribing to %s\n", topic);
  56. mqtt.subscribe(topic, MQTT_QOS);
  57. }
  58. }
  59. void mqttSubscribe(const char * topic) {
  60. String path = mqttTopic + String(topic);
  61. mqttSubscribeRaw(path.c_str());
  62. }
  63. // -----------------------------------------------------------------------------
  64. // Callbacks
  65. // -----------------------------------------------------------------------------
  66. void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) {
  67. _mqtt_callbacks.push_back(callback);
  68. }
  69. void _mqttOnConnect() {
  70. DEBUG_MSG("[MQTT] Connected!\n");
  71. #if MQTT_SKIP_RETAINED
  72. mqttConnectedAt = millis();
  73. #endif
  74. // Build MQTT topics
  75. buildTopics();
  76. // Say hello and report our IP and VERSION
  77. mqttSend(MQTT_IP_TOPIC, getIP().c_str());
  78. mqttSend(MQTT_VERSION_TOPIC, APP_VERSION);
  79. mqttSend(MQTT_STATUS_TOPIC, "1");
  80. // Subscribe to system topics
  81. mqttSubscribe(MQTT_ACTION_TOPIC);
  82. // Send connect event to subscribers
  83. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  84. (*_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
  85. }
  86. }
  87. void _mqttOnDisconnect() {
  88. DEBUG_MSG("[MQTT] Disconnected!\n");
  89. // Send disconnect event to subscribers
  90. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  91. (*_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
  92. }
  93. }
  94. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  95. char message[len + 1];
  96. strlcpy(message, (char *) payload, len + 1);
  97. DEBUG_MSG("[MQTT] Received %s => %s", topic, message);
  98. #if MQTT_SKIP_RETAINED
  99. if (millis() - mqttConnectedAt < MQTT_SKIP_TIME) {
  100. DEBUG_MSG(" - SKIPPED\n");
  101. return;
  102. }
  103. #endif
  104. DEBUG_MSG("\n");
  105. // Check system topics
  106. char * p = mqttSubtopic(topic);
  107. if (strcmp(p, MQTT_ACTION_TOPIC) == 0) {
  108. if (strcmp(message, MQTT_ACTION_RESET) == 0) {
  109. ESP.restart();
  110. }
  111. }
  112. // Send message event to subscribers
  113. // Topic is set to the specific part each one might be checking
  114. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  115. (*_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
  116. }
  117. }
  118. void mqttConnect() {
  119. if (!mqtt.connected()) {
  120. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) return;
  121. // Last option: reconnect to wifi after MQTT_MAX_TRIES attemps in a row
  122. #if MQTT_MAX_TRIES > 0
  123. static unsigned int tries = 0;
  124. static unsigned long last_try = millis();
  125. if (millis() - last_try < MQTT_TRY_INTERVAL) {
  126. if (++tries > MQTT_MAX_TRIES) {
  127. DEBUG_MSG("[MQTT] MQTT_MAX_TRIES met, disconnecting from WiFi\n");
  128. wifiDisconnect();
  129. tries = 0;
  130. return;
  131. }
  132. } else {
  133. tries = 0;
  134. }
  135. last_try = millis();
  136. #endif
  137. mqtt.disconnect();
  138. char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
  139. unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
  140. char * user = strdup(getSetting("mqttUser").c_str());
  141. char * pass = strdup(getSetting("mqttPassword").c_str());
  142. DEBUG_MSG("[MQTT] Connecting to broker at %s", host);
  143. mqtt.setServer(host, port);
  144. #if MQTT_USE_ASYNC
  145. mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
  146. mqtt.setWill((mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
  147. if ((strlen(user) > 0) && (strlen(pass) > 0)) {
  148. DEBUG_MSG(" as user '%s'.", user);
  149. mqtt.setCredentials(user, pass);
  150. }
  151. DEBUG_MSG("\n");
  152. mqtt.connect();
  153. #else
  154. bool response;
  155. if ((strlen(user) > 0) && (strlen(pass) > 0)) {
  156. DEBUG_MSG(" as user '%s'\n", user);
  157. response = mqtt.connect(getIdentifier().c_str(), user, pass, (mqttTopic + MQTT_STATUS_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
  158. } else {
  159. DEBUG_MSG("\n");
  160. response = mqtt.connect(getIdentifier().c_str(), (mqttTopic + MQTT_STATUS_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
  161. }
  162. if (response) {
  163. _mqttOnConnect();
  164. _mqttConnected = true;
  165. } else {
  166. DEBUG_MSG("[MQTT] Connection failed\n");
  167. }
  168. #endif
  169. free(host);
  170. free(user);
  171. free(pass);
  172. }
  173. }
  174. void mqttSetup() {
  175. #if MQTT_USE_ASYNC
  176. mqtt.onConnect([](bool sessionPresent) {
  177. _mqttOnConnect();
  178. });
  179. mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  180. _mqttOnDisconnect();
  181. });
  182. mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  183. _mqttOnMessage(topic, payload, len);
  184. });
  185. #else
  186. mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  187. _mqttOnMessage(topic, (char *) payload, length);
  188. });
  189. #endif
  190. buildTopics();
  191. }
  192. void mqttLoop() {
  193. static unsigned long lastPeriod = 0;
  194. if (WiFi.status() == WL_CONNECTED) {
  195. if (!mqtt.connected()) {
  196. #if not MQTT_USE_ASYNC
  197. if (_mqttConnected) {
  198. _mqttOnDisconnect();
  199. _mqttConnected = false;
  200. }
  201. #endif
  202. unsigned long currPeriod = millis() / MQTT_RECONNECT_DELAY;
  203. if (currPeriod != lastPeriod) {
  204. lastPeriod = currPeriod;
  205. mqttConnect();
  206. }
  207. #if not MQTT_USE_ASYNC
  208. } else {
  209. mqtt.loop();
  210. #endif
  211. }
  212. }
  213. }