Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1004 lines
32 KiB

8 years ago
8 years ago
7 years ago
6 years ago
7 years ago
6 years ago
6 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. */
  5. #if MQTT_SUPPORT
  6. #include <EEPROM_Rotate.h>
  7. #include <ESP8266WiFi.h>
  8. #include <ESP8266mDNS.h>
  9. #include <ArduinoJson.h>
  10. #include <vector>
  11. #include <utility>
  12. #include <Ticker.h>
  13. #include <TimeLib.h>
  14. #if MQTT_LIBRARY == MQTT_ASYNC // AsyncMqttClient
  15. #include <AsyncMqttClient.h>
  16. AsyncMqttClient _mqtt;
  17. #else // Arduino-MQTT or PubSubClient
  18. WiFiClient _mqtt_client;
  19. bool _mqtt_connected = false;
  20. #include "WiFiClientSecure.h"
  21. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  22. using namespace axTLS;
  23. #elif SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  24. using namespace BearSSL;
  25. BearSSL::X509List *_ca_list = nullptr;
  26. #endif
  27. WiFiClientSecure _mqtt_client_secure;
  28. #if MQTT_LIBRARY == MQTT_ARDUINO // Using Arduino-MQTT
  29. #include <MQTTClient.h>
  30. #ifdef MQTT_MAX_PACKET_SIZE
  31. MQTTClient _mqtt(MQTT_MAX_PACKET_SIZE);
  32. #else
  33. MQTTClient _mqtt();
  34. #endif
  35. #else // Using PubSubClient
  36. #include <PubSubClient.h>
  37. PubSubClient _mqtt;
  38. #endif
  39. #endif
  40. bool _mqtt_enabled = MQTT_ENABLED;
  41. bool _mqtt_use_json = false;
  42. unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  43. unsigned long _mqtt_last_connection = 0;
  44. bool _mqtt_connecting = false;
  45. unsigned char _mqtt_qos = MQTT_QOS;
  46. bool _mqtt_retain = MQTT_RETAIN;
  47. unsigned long _mqtt_keepalive = MQTT_KEEPALIVE;
  48. String _mqtt_topic;
  49. String _mqtt_topic_json;
  50. String _mqtt_setter;
  51. String _mqtt_getter;
  52. bool _mqtt_forward;
  53. String _mqtt_user;
  54. String _mqtt_pass;
  55. String _mqtt_will;
  56. String _mqtt_server;
  57. uint16_t _mqtt_port;
  58. String _mqtt_clientid;
  59. std::vector<mqtt_callback_f> _mqtt_callbacks;
  60. struct mqtt_message_t {
  61. static const unsigned char END = 255;
  62. unsigned char parent = END;
  63. char * topic;
  64. char * message = NULL;
  65. };
  66. std::vector<mqtt_message_t> _mqtt_queue;
  67. Ticker _mqtt_flush_ticker;
  68. // -----------------------------------------------------------------------------
  69. // Private
  70. // -----------------------------------------------------------------------------
  71. void _mqttConnect() {
  72. // Do not connect if disabled
  73. if (!_mqtt_enabled) return;
  74. // Do not connect if already connected or still trying to connect
  75. if (_mqtt.connected() || _mqtt_connecting) return;
  76. // Check reconnect interval
  77. if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
  78. // Increase the reconnect delay
  79. _mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
  80. if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
  81. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
  82. }
  83. #if MDNS_CLIENT_SUPPORT
  84. _mqtt_server = mdnsResolve(_mqtt_server);
  85. #endif
  86. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%u\n"), _mqtt_server.c_str(), _mqtt_port);
  87. DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
  88. DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
  89. DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
  90. DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
  91. DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
  92. _mqtt_connecting = true;
  93. #if MQTT_LIBRARY == MQTT_ASYNC
  94. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  95. _mqtt.setClientId(_mqtt_clientid.c_str());
  96. _mqtt.setKeepAlive(_mqtt_keepalive);
  97. _mqtt.setCleanSession(false);
  98. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, MQTT_STATUS_OFFLINE);
  99. if (_mqtt_user.length() && _mqtt_pass.length()) {
  100. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  101. _mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
  102. }
  103. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  104. bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  105. _mqtt.setSecure(secure);
  106. if (secure) {
  107. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  108. int check = getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK).toInt();
  109. if (check == SECURE_CLIENT_CHECK_FINGERPRINT) {
  110. DEBUG_MSG_P(PSTR("[MQTT] Using fingerprint verification, trying to connect\n"));
  111. unsigned char fp[20] = {0};
  112. if (sslFingerPrintArray(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
  113. _mqtt.addServerFingerprint(fp);
  114. } else {
  115. DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint, cannot connect\n"));
  116. _mqtt_last_connection = millis();
  117. return;
  118. }
  119. } else if (check == SECURE_CLIENT_CHECK_CA) {
  120. DEBUG_MSG_P(PSTR("[MQTT] CA verification is not supported with AsyncMqttClient, cannot connect\n"));
  121. _mqtt_last_connection = millis();
  122. return;
  123. } else {
  124. DEBUG_MSG_P(PSTR("[MQTT] !!! SSL connection will not be validated !!!\n"));
  125. }
  126. }
  127. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  128. _mqtt.connect();
  129. #else // Using PubSubClient or Arduino-MQTT
  130. bool verified = true; // Connection verified
  131. bool secure = false; // Whether to use SSL
  132. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  133. secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  134. if (secure) {
  135. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  136. // Use MFLN if configured and on BearSSL
  137. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  138. uint16_t requested_mfln = getSetting("mqttScMFLN", MQTT_SECURE_CLIENT_MFLN).toInt();
  139. if (requested_mfln) {
  140. bool supported = _mqtt_client_secure.probeMaxFragmentLength(_mqtt_server.c_str(), _mqtt_port, requested_mfln);
  141. DEBUG_MSG_P(PSTR("[MQTT] MFLN buffer size %u supported: %s\n"), requested_mfln, supported ? "YES" : "NO");
  142. if (supported) {
  143. _mqtt_client_secure.setBufferSizes(requested_mfln, requested_mfln);
  144. }
  145. }
  146. #endif
  147. // Default verification: CA in case of BearSSL, fingerprint otherwise
  148. int check = getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK).toInt();
  149. if (check == SECURE_CLIENT_CHECK_FINGERPRINT) {
  150. DEBUG_MSG_P(PSTR("[MQTT] Using fingerprint verification, trying to connect\n"));
  151. char fp[60] = {0};
  152. if (sslFingerPrintChar(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
  153. // BearSSL needs to have the fingerprint set prior to connecting
  154. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  155. _mqtt_client_secure.setFingerprint(fp); // Always returns true
  156. #endif
  157. if (_mqtt_client_secure.connect(_mqtt_server.c_str(), _mqtt_port)) {
  158. // AxTLS does the fingerprint check *after* connecting
  159. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  160. if (!_mqtt_client_secure.verify(fp, _mqtt_server.c_str())) {
  161. DEBUG_MSG_P(PSTR("[MQTT] Fingerprint did not match\n"));
  162. verified = false;
  163. }
  164. #endif
  165. } else {
  166. DEBUG_MSG_P(PSTR("[MQTT] Client connection failed, incorrect fingerprint?\n"));
  167. verified = false;
  168. }
  169. } else {
  170. DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint, cannot connect\n"));
  171. verified = false;
  172. }
  173. } else if (check == SECURE_CLIENT_CHECK_CA) {
  174. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  175. #ifndef _mqtt_client_ca
  176. DEBUG_MSG_P(PSTR("[MQTT] No CA certificate defined, cannot connect\n"));
  177. verified = false;
  178. #else
  179. DEBUG_MSG_P(PSTR("[MQTT] Using CA verification, trying to connect\n"));
  180. // We need to allocate using new in order to keep the list in memory
  181. _ca_list = new BearSSL::X509List(_mqtt_client_ca);
  182. // If NTP is not synced yet, the connect() call may fail.
  183. // This is not an issue, MQTT will reconnect after MQTT_RECONNECT_DELAY_MIN
  184. #if NTP_SUPPORT
  185. _mqtt_client_secure.setX509Time(ntpLocal2UTC(now()));
  186. #else
  187. _mqtt_client_secure.setX509Time(now());
  188. #endif
  189. _mqtt_client_secure.setTrustAnchors(_ca_list);
  190. if (!_mqtt_client_secure.connect(_mqtt_server.c_str(), _mqtt_port)) {
  191. DEBUG_MSG_P(PSTR("[MQTT] CA verification failed - possible reasons are an incorrect certificate or unsynced clock\n"));
  192. verified = false;
  193. }
  194. #endif // defined _mqtt_client_ca
  195. #else
  196. DEBUG_MSG_P(PSTR("[MQTT] CA verification is not supported with AxTLS client, cannot connect\n"));
  197. verified = false;
  198. #endif
  199. } else {
  200. DEBUG_MSG_P(PSTR("[MQTT] !!! SSL connection will not be validated !!!\n"));
  201. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  202. _mqtt_client_secure.setInsecure();
  203. #endif
  204. }
  205. }
  206. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  207. if (verified) {
  208. #if MQTT_LIBRARY == MQTT_ARDUINO // Arduino-MQTT
  209. _mqtt.begin(_mqtt_server.c_str(), _mqtt_port, (secure ? _mqtt_client_secure : _mqtt_client));
  210. _mqtt.setWill(_mqtt_will.c_str(), MQTT_STATUS_OFFLINE, _mqtt_qos, _mqtt_retain);
  211. verified = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str());
  212. #else // PubSubClient
  213. _mqtt.setClient(secure ? _mqtt_client_secure : _mqtt_client);
  214. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  215. if (_mqtt_user.length() && _mqtt_pass.length()) {
  216. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  217. verified = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, MQTT_STATUS_OFFLINE);
  218. } else {
  219. verified = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, MQTT_STATUS_OFFLINE);
  220. }
  221. #endif
  222. }
  223. if (verified) {
  224. _mqttOnConnect();
  225. } else {
  226. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  227. mqttDisconnect(); // Clean up
  228. _mqtt_last_connection = millis();
  229. }
  230. #endif // MQTT_LIBRARY == MQTT_ASYNC
  231. }
  232. void _mqttPlaceholders(String& text) {
  233. text.replace("{hostname}", getSetting("hostname"));
  234. text.replace("{magnitude}", "#");
  235. String mac = WiFi.macAddress();
  236. mac.replace(":", "");
  237. text.replace("{mac}", mac);
  238. }
  239. template<typename T>
  240. void _mqttApplySetting(T& current, T& updated) {
  241. if (current != updated) {
  242. current = std::move(updated);
  243. mqttDisconnect();
  244. }
  245. }
  246. template<typename T>
  247. void _mqttApplySetting(T& current, const T& updated) {
  248. if (current != updated) {
  249. current = updated;
  250. mqttDisconnect();
  251. }
  252. }
  253. template<typename T>
  254. void _mqttApplyTopic(T& current, const char* magnitude) {
  255. String updated = mqttTopic(magnitude, false);
  256. if (current != updated) {
  257. mqttFlush();
  258. current = std::move(updated);
  259. }
  260. }
  261. void _mqttConfigure() {
  262. // Enable only when server is set
  263. {
  264. String server = getSetting("mqttServer", MQTT_SERVER);
  265. uint16_t port = getSetting("mqttPort", MQTT_PORT).toInt();
  266. bool enabled = false;
  267. if (server.length()) {
  268. enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
  269. }
  270. _mqttApplySetting(_mqtt_server, server);
  271. _mqttApplySetting(_mqtt_enabled, enabled);
  272. _mqttApplySetting(_mqtt_port, port);
  273. if (!enabled) return;
  274. }
  275. // Get base topic and apply placeholders
  276. {
  277. String topic = getSetting("mqttTopic", MQTT_TOPIC);
  278. if (topic.endsWith("/")) topic.remove(topic.length()-1);
  279. // Replace things inside curly braces (like {hostname}, {mac} etc.)
  280. _mqttPlaceholders(topic);
  281. if (topic.indexOf("#") == -1) topic.concat("/#");
  282. _mqttApplySetting(_mqtt_topic, topic);
  283. _mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
  284. }
  285. // Getter and setter
  286. {
  287. String setter = getSetting("mqttSetter", MQTT_SETTER);
  288. String getter = getSetting("mqttGetter", MQTT_GETTER);
  289. bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS;
  290. _mqttApplySetting(_mqtt_setter, setter);
  291. _mqttApplySetting(_mqtt_getter, getter);
  292. _mqttApplySetting(_mqtt_forward, forward);
  293. }
  294. // MQTT options
  295. {
  296. String user = getSetting("mqttUser", MQTT_USER);
  297. _mqttPlaceholders(user);
  298. String pass = getSetting("mqttPassword", MQTT_PASS);
  299. unsigned char qos = getSetting("mqttQoS", MQTT_QOS).toInt();
  300. bool retain = getSetting("mqttRetain", MQTT_RETAIN).toInt() == 1;
  301. unsigned long keepalive = getSetting("mqttKeep", MQTT_KEEPALIVE).toInt();
  302. String id = getSetting("mqttClientID", getIdentifier());
  303. _mqttPlaceholders(id);
  304. _mqttApplySetting(_mqtt_user, user);
  305. _mqttApplySetting(_mqtt_pass, pass);
  306. _mqttApplySetting(_mqtt_qos, qos);
  307. _mqttApplySetting(_mqtt_retain, retain);
  308. _mqttApplySetting(_mqtt_keepalive, keepalive);
  309. _mqttApplySetting(_mqtt_clientid, id);
  310. }
  311. // MQTT JSON
  312. {
  313. _mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
  314. _mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
  315. }
  316. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  317. }
  318. void _mqttBackwards() {
  319. String mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  320. if (mqttTopic.indexOf("{identifier}") > 0) {
  321. mqttTopic.replace("{identifier}", "{hostname}");
  322. setSetting("mqttTopic", mqttTopic);
  323. }
  324. }
  325. void _mqttInfo() {
  326. DEBUG_MSG_P(PSTR("[MQTT] Library %s, SSL %s, Autoconnect %s\n"),
  327. (MQTT_LIBRARY == MQTT_ASYNC ? "AsyncMqttClient" : (MQTT_LIBRARY == MQTT_ARDUINO ? "Arduino-MQTT" : "PubSubClient")),
  328. SECURE_CLIENT == SECURE_CLIENT_NONE ? "DISABLED" : "ENABLED",
  329. MQTT_AUTOCONNECT ? "ENABLED" : "DISABLED"
  330. );
  331. DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
  332. _mqtt_enabled ? "ENABLED" : "DISABLED",
  333. _mqtt.connected() ? "CONNECTED" : "DISCONNECTED"
  334. );
  335. DEBUG_MSG_P(PSTR("[MQTT] Retry %s (Now %u, Last %u, Delay %u, Step %u)\n"),
  336. _mqtt_connecting ? "CONNECTING" : "WAITING",
  337. millis(),
  338. _mqtt_last_connection,
  339. _mqtt_reconnect_delay,
  340. MQTT_RECONNECT_DELAY_STEP
  341. );
  342. }
  343. // -----------------------------------------------------------------------------
  344. // WEB
  345. // -----------------------------------------------------------------------------
  346. #if WEB_SUPPORT
  347. bool _mqttWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  348. return (strncmp(key, "mqtt", 3) == 0);
  349. }
  350. void _mqttWebSocketOnVisible(JsonObject& root) {
  351. root["mqttVisible"] = 1;
  352. #if ASYNC_TCP_SSL_ENABLED
  353. root["mqttsslVisible"] = 1;
  354. #endif
  355. }
  356. void _mqttWebSocketOnData(JsonObject& root) {
  357. root["mqttStatus"] = mqttConnected();
  358. }
  359. void _mqttWebSocketOnConnected(JsonObject& root) {
  360. root["mqttEnabled"] = mqttEnabled();
  361. root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
  362. root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
  363. root["mqttUser"] = getSetting("mqttUser", MQTT_USER);
  364. root["mqttClientID"] = getSetting("mqttClientID");
  365. root["mqttPassword"] = getSetting("mqttPassword", MQTT_PASS);
  366. root["mqttKeep"] = _mqtt_keepalive;
  367. root["mqttRetain"] = _mqtt_retain;
  368. root["mqttQoS"] = _mqtt_qos;
  369. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  370. root["mqttUseSSL"] = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  371. root["mqttFP"] = getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  372. #endif
  373. root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
  374. root["mqttUseJson"] = getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1;
  375. }
  376. #endif
  377. // -----------------------------------------------------------------------------
  378. // SETTINGS
  379. // -----------------------------------------------------------------------------
  380. #if TERMINAL_SUPPORT
  381. void _mqttInitCommands() {
  382. terminalRegisterCommand(F("MQTT.RESET"), [](Embedis* e) {
  383. _mqttConfigure();
  384. mqttDisconnect();
  385. terminalOK();
  386. });
  387. terminalRegisterCommand(F("MQTT.INFO"), [](Embedis* e) {
  388. _mqttInfo();
  389. terminalOK();
  390. });
  391. }
  392. #endif // TERMINAL_SUPPORT
  393. // -----------------------------------------------------------------------------
  394. // MQTT Callbacks
  395. // -----------------------------------------------------------------------------
  396. void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
  397. if (type == MQTT_CONNECT_EVENT) {
  398. // Subscribe to internal action topics
  399. mqttSubscribe(MQTT_TOPIC_ACTION);
  400. // Flag system to send heartbeat
  401. systemSendHeartbeat();
  402. }
  403. if (type == MQTT_MESSAGE_EVENT) {
  404. // Match topic
  405. String t = mqttMagnitude((char *) topic);
  406. // Actions
  407. if (t.equals(MQTT_TOPIC_ACTION)) {
  408. if (strcmp(payload, MQTT_ACTION_RESET) == 0) {
  409. deferredReset(100, CUSTOM_RESET_MQTT);
  410. }
  411. }
  412. }
  413. }
  414. void _mqttOnConnect() {
  415. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  416. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  417. _mqtt_last_connection = millis();
  418. _mqtt_connecting = false;
  419. // Clean subscriptions
  420. mqttUnsubscribeRaw("#");
  421. // Send connect event to subscribers
  422. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  423. (_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
  424. }
  425. }
  426. void _mqttOnDisconnect() {
  427. // Reset reconnection delay
  428. _mqtt_last_connection = millis();
  429. _mqtt_connecting = false;
  430. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  431. // Send disconnect event to subscribers
  432. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  433. (_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
  434. }
  435. }
  436. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  437. if (len == 0) return;
  438. char message[len + 1];
  439. strlcpy(message, (char *) payload, len + 1);
  440. #if MQTT_SKIP_RETAINED
  441. if (millis() - _mqtt_last_connection < MQTT_SKIP_TIME) {
  442. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message);
  443. return;
  444. }
  445. #endif
  446. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  447. // Send message event to subscribers
  448. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  449. (_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
  450. }
  451. }
  452. // -----------------------------------------------------------------------------
  453. // Public API
  454. // -----------------------------------------------------------------------------
  455. /**
  456. Returns the magnitude part of a topic
  457. @param topic the full MQTT topic
  458. @return String object with the magnitude part.
  459. */
  460. String mqttMagnitude(char * topic) {
  461. String pattern = _mqtt_topic + _mqtt_setter;
  462. int position = pattern.indexOf("#");
  463. if (position == -1) return String();
  464. String start = pattern.substring(0, position);
  465. String end = pattern.substring(position + 1);
  466. String magnitude = String(topic);
  467. if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
  468. magnitude.replace(start, "");
  469. magnitude.replace(end, "");
  470. } else {
  471. magnitude = String();
  472. }
  473. return magnitude;
  474. }
  475. /**
  476. Returns a full MQTT topic from the magnitude
  477. @param magnitude the magnitude part of the topic.
  478. @param is_set whether to build a command topic (true)
  479. or a state topic (false).
  480. @return String full MQTT topic.
  481. */
  482. String mqttTopic(const char * magnitude, bool is_set) {
  483. String output = _mqtt_topic;
  484. output.replace("#", magnitude);
  485. output += is_set ? _mqtt_setter : _mqtt_getter;
  486. return output;
  487. }
  488. /**
  489. Returns a full MQTT topic from the magnitude
  490. @param magnitude the magnitude part of the topic.
  491. @param index index of the magnitude when more than one such magnitudes.
  492. @param is_set whether to build a command topic (true)
  493. or a state topic (false).
  494. @return String full MQTT topic.
  495. */
  496. String mqttTopic(const char * magnitude, unsigned int index, bool is_set) {
  497. char buffer[strlen(magnitude)+5];
  498. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), magnitude, index);
  499. return mqttTopic(buffer, is_set);
  500. }
  501. // -----------------------------------------------------------------------------
  502. void mqttSendRaw(const char * topic, const char * message, bool retain) {
  503. if (_mqtt.connected()) {
  504. #if MQTT_LIBRARY == MQTT_ASYNC // AsyncMqttClient
  505. unsigned int packetId = _mqtt.publish(topic, _mqtt_qos, retain, message);
  506. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId);
  507. #elif MQTT_LIBRARY == MQTT_ARDUINO // Arduino-MQTT
  508. _mqtt.publish(topic, message, retain, _mqtt_qos);
  509. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
  510. #else // PubSubClient
  511. _mqtt.publish(topic, message, retain);
  512. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
  513. #endif
  514. }
  515. }
  516. void mqttSendRaw(const char * topic, const char * message) {
  517. mqttSendRaw (topic, message, _mqtt_retain);
  518. }
  519. void mqttSend(const char * topic, const char * message, bool force, bool retain) {
  520. bool useJson = force ? false : _mqtt_use_json;
  521. // Equeue message
  522. if (useJson) {
  523. // Enqueue new message
  524. mqttEnqueue(topic, message);
  525. // Reset flush timer
  526. _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
  527. // Send it right away
  528. } else {
  529. mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
  530. }
  531. }
  532. void mqttSend(const char * topic, const char * message, bool force) {
  533. mqttSend(topic, message, force, _mqtt_retain);
  534. }
  535. void mqttSend(const char * topic, const char * message) {
  536. mqttSend(topic, message, false);
  537. }
  538. void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
  539. char buffer[strlen(topic)+5];
  540. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
  541. mqttSend(buffer, message, force, retain);
  542. }
  543. void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
  544. mqttSend(topic, index, message, force, _mqtt_retain);
  545. }
  546. void mqttSend(const char * topic, unsigned int index, const char * message) {
  547. mqttSend(topic, index, message, false);
  548. }
  549. // -----------------------------------------------------------------------------
  550. unsigned char _mqttBuildTree(JsonObject& root, char parent) {
  551. unsigned char count = 0;
  552. // Add enqueued messages
  553. for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
  554. mqtt_message_t element = _mqtt_queue[i];
  555. if (element.parent == parent) {
  556. ++count;
  557. JsonObject& elements = root.createNestedObject(element.topic);
  558. unsigned char num = _mqttBuildTree(elements, i);
  559. if (0 == num) {
  560. if (isNumber(element.message)) {
  561. double value = atof(element.message);
  562. if (value == int(value)) {
  563. root.set(element.topic, int(value));
  564. } else {
  565. root.set(element.topic, value);
  566. }
  567. } else {
  568. root.set(element.topic, element.message);
  569. }
  570. }
  571. }
  572. }
  573. return count;
  574. }
  575. void mqttFlush() {
  576. if (!_mqtt.connected()) return;
  577. if (_mqtt_queue.size() == 0) return;
  578. // Build tree recursively
  579. DynamicJsonBuffer jsonBuffer(1024);
  580. JsonObject& root = jsonBuffer.createObject();
  581. _mqttBuildTree(root, mqtt_message_t::END);
  582. // Add extra propeties
  583. #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
  584. if (ntpSynced()) root[MQTT_TOPIC_TIME] = ntpDateTime();
  585. #endif
  586. #if MQTT_ENQUEUE_MAC
  587. root[MQTT_TOPIC_MAC] = WiFi.macAddress();
  588. #endif
  589. #if MQTT_ENQUEUE_HOSTNAME
  590. root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
  591. #endif
  592. #if MQTT_ENQUEUE_IP
  593. root[MQTT_TOPIC_IP] = getIP();
  594. #endif
  595. #if MQTT_ENQUEUE_MESSAGE_ID
  596. root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
  597. #endif
  598. // Send
  599. String output;
  600. root.printTo(output);
  601. jsonBuffer.clear();
  602. mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
  603. // Clear queue
  604. for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
  605. mqtt_message_t element = _mqtt_queue[i];
  606. free(element.topic);
  607. if (element.message) {
  608. free(element.message);
  609. }
  610. }
  611. _mqtt_queue.clear();
  612. }
  613. int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) {
  614. // Queue is not meant to send message "offline"
  615. // We must prevent the queue does not get full while offline
  616. if (!_mqtt.connected()) return -1;
  617. // Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached
  618. if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();
  619. int8_t index = _mqtt_queue.size();
  620. // Enqueue new message
  621. mqtt_message_t element;
  622. element.parent = parent;
  623. element.topic = strdup(topic);
  624. if (NULL != message) {
  625. element.message = strdup(message);
  626. }
  627. _mqtt_queue.push_back(element);
  628. return index;
  629. }
  630. int8_t mqttEnqueue(const char * topic, const char * message) {
  631. return mqttEnqueue(topic, message, mqtt_message_t::END);
  632. }
  633. // -----------------------------------------------------------------------------
  634. void mqttSubscribeRaw(const char * topic) {
  635. if (_mqtt.connected() && (strlen(topic) > 0)) {
  636. #if MQTT_LIBRARY == MQTT_ASYNC // AsyncMqttClient
  637. unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
  638. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
  639. #else // Arduino-MQTT or PubSubClient
  640. _mqtt.subscribe(topic, _mqtt_qos);
  641. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
  642. #endif
  643. }
  644. }
  645. void mqttSubscribe(const char * topic) {
  646. mqttSubscribeRaw(mqttTopic(topic, true).c_str());
  647. }
  648. void mqttUnsubscribeRaw(const char * topic) {
  649. if (_mqtt.connected() && (strlen(topic) > 0)) {
  650. #if MQTT_LIBRARY == MQTT_ASYNC // AsyncMqttClient
  651. unsigned int packetId = _mqtt.unsubscribe(topic);
  652. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
  653. #else // Arduino-MQTT or PubSubClient
  654. _mqtt.unsubscribe(topic);
  655. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
  656. #endif
  657. }
  658. }
  659. void mqttUnsubscribe(const char * topic) {
  660. mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
  661. }
  662. // -----------------------------------------------------------------------------
  663. void mqttEnabled(bool status) {
  664. _mqtt_enabled = status;
  665. }
  666. bool mqttEnabled() {
  667. return _mqtt_enabled;
  668. }
  669. bool mqttConnected() {
  670. return _mqtt.connected();
  671. }
  672. void mqttDisconnect() {
  673. if (_mqtt.connected()) {
  674. DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
  675. _mqtt.disconnect();
  676. }
  677. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  678. delete _ca_list;
  679. #endif
  680. }
  681. bool mqttForward() {
  682. return _mqtt_forward;
  683. }
  684. void mqttRegister(mqtt_callback_f callback) {
  685. _mqtt_callbacks.push_back(callback);
  686. }
  687. void mqttSetBroker(IPAddress ip, unsigned int port) {
  688. setSetting("mqttServer", ip.toString());
  689. _mqtt_server = ip.toString();
  690. setSetting("mqttPort", port);
  691. _mqtt_port = port;
  692. mqttEnabled(MQTT_AUTOCONNECT);
  693. }
  694. void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
  695. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) mqttSetBroker(ip, port);
  696. }
  697. // -----------------------------------------------------------------------------
  698. // Initialization
  699. // -----------------------------------------------------------------------------
  700. void mqttSetup() {
  701. _mqttBackwards();
  702. _mqttInfo();
  703. #if MQTT_LIBRARY == MQTT_ASYNC // AsyncMqttClient
  704. _mqtt.onConnect([](bool sessionPresent) {
  705. _mqttOnConnect();
  706. });
  707. _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  708. if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
  709. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  710. }
  711. if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) {
  712. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  713. }
  714. if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) {
  715. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  716. }
  717. if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) {
  718. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  719. }
  720. if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
  721. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  722. }
  723. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  724. if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
  725. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  726. }
  727. #endif
  728. _mqttOnDisconnect();
  729. });
  730. _mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  731. _mqttOnMessage(topic, payload, len);
  732. });
  733. _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
  734. DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId);
  735. });
  736. _mqtt.onPublish([](uint16_t packetId) {
  737. DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId);
  738. });
  739. #elif MQTT_LIBRARY == MQTT_ARDUINO // Arduino-MQTT
  740. _mqtt.onMessageAdvanced([](MQTTClient *client, char topic[], char payload[], int length) {
  741. _mqttOnMessage(topic, payload, length);
  742. });
  743. #else // PubSubClient
  744. _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  745. _mqttOnMessage(topic, (char *) payload, length);
  746. });
  747. #endif // MQTT_LIBRARY == MQTT_ASYNC
  748. _mqttConfigure();
  749. mqttRegister(_mqttCallback);
  750. #if WEB_SUPPORT
  751. wsRegister()
  752. .onVisible(_mqttWebSocketOnVisible)
  753. .onData(_mqttWebSocketOnData)
  754. .onConnected(_mqttWebSocketOnConnected)
  755. .onKeyCheck(_mqttWebSocketOnKeyCheck);
  756. mqttRegister([](unsigned int type, const char*, const char*) {
  757. if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) wsPost(_mqttWebSocketOnData);
  758. });
  759. #endif
  760. #if TERMINAL_SUPPORT
  761. _mqttInitCommands();
  762. #endif
  763. // Main callbacks
  764. espurnaRegisterLoop(mqttLoop);
  765. espurnaRegisterReload(_mqttConfigure);
  766. }
  767. void mqttLoop() {
  768. if (WiFi.status() != WL_CONNECTED) return;
  769. #if MQTT_LIBRARY == MQTT_ASYNC
  770. _mqttConnect();
  771. #else // MQTT_LIBRARY != MQTT_ASYNC
  772. if (_mqtt.connected()) {
  773. _mqtt.loop();
  774. } else {
  775. if (_mqtt_connected) {
  776. _mqttOnDisconnect();
  777. _mqtt_connected = false;
  778. }
  779. _mqttConnect();
  780. }
  781. #endif // MQTT_LIBRARY == MQTT_ASYNC
  782. }
  783. #else
  784. bool mqttForward() {
  785. return false;
  786. }
  787. #endif // MQTT_SUPPORT