Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1004 lines
29 KiB

8 years ago
8 years ago
7 years ago
6 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. Updated secure client support by Niek van der Maas < mail at niekvandermaas dot nl>
  5. */
  6. #if MQTT_SUPPORT
  7. #include <EEPROM_Rotate.h>
  8. #include <ESP8266WiFi.h>
  9. #include <ESP8266mDNS.h>
  10. #include <ArduinoJson.h>
  11. #include <vector>
  12. #include <utility>
  13. #include <Ticker.h>
  14. #include <TimeLib.h>
  15. #include "libs/SecureClientHelpers.h"
  16. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  17. AsyncMqttClient _mqtt;
  18. #else // MQTT_LIBRARY_ARDUINOMQTT / MQTT_LIBRARY_PUBSUBCLIENT
  19. WiFiClient _mqtt_client;
  20. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  21. std::unique_ptr<SecureClient> _mqtt_client_secure = nullptr;
  22. #if MQTT_SECURE_CLIENT_INCLUDE_CA
  23. #include "static/mqtt_client_trusted_root_ca.h" // Assumes this header file defines a _mqtt_client_trusted_root_ca[] PROGMEM = "...PEM data..."
  24. #else
  25. #include "static/letsencrypt_isrgroot_pem.h" // Default to LetsEncrypt X3 certificate
  26. #define _mqtt_client_trusted_root_ca _ssl_letsencrypt_isrg_x3_ca
  27. #endif // MQTT_SECURE_CLIENT_INCLUDE_CA
  28. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  29. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  30. #ifdef MQTT_MAX_PACKET_SIZE
  31. MQTTClient _mqtt(MQTT_MAX_PACKET_SIZE);
  32. #else
  33. MQTTClient _mqtt;
  34. #endif
  35. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  36. PubSubClient _mqtt;
  37. #endif
  38. #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
  39. bool _mqtt_enabled = MQTT_ENABLED;
  40. bool _mqtt_use_json = false;
  41. unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  42. unsigned long _mqtt_last_connection = 0;
  43. bool _mqtt_connected = false;
  44. bool _mqtt_connecting = false;
  45. unsigned char _mqtt_qos = MQTT_QOS;
  46. bool _mqtt_retain = MQTT_RETAIN;
  47. unsigned long _mqtt_keepalive = MQTT_KEEPALIVE;
  48. String _mqtt_topic;
  49. String _mqtt_topic_json;
  50. String _mqtt_setter;
  51. String _mqtt_getter;
  52. bool _mqtt_forward;
  53. String _mqtt_user;
  54. String _mqtt_pass;
  55. String _mqtt_will;
  56. String _mqtt_server;
  57. uint16_t _mqtt_port;
  58. String _mqtt_clientid;
  59. std::vector<mqtt_callback_f> _mqtt_callbacks;
  60. struct mqtt_message_t {
  61. static const unsigned char END = 255;
  62. unsigned char parent = END;
  63. char * topic;
  64. char * message = NULL;
  65. };
  66. std::vector<mqtt_message_t> _mqtt_queue;
  67. Ticker _mqtt_flush_ticker;
  68. // -----------------------------------------------------------------------------
  69. // Private
  70. // -----------------------------------------------------------------------------
  71. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  72. SecureClientConfig _mqtt_sc_config {
  73. "MQTT",
  74. []() -> String {
  75. return _mqtt_server;
  76. },
  77. []() -> int {
  78. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK).toInt();
  79. },
  80. []() -> String {
  81. return getSetting("mqttfp", MQTT_SSL_FINGERPRINT);
  82. },
  83. true
  84. };
  85. #endif
  86. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  87. SecureClientConfig _mqtt_sc_config {
  88. "MQTT",
  89. []() -> int {
  90. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK).toInt();
  91. },
  92. []() -> PGM_P {
  93. return _mqtt_client_trusted_root_ca;
  94. },
  95. []() -> String {
  96. return getSetting("mqttfp", MQTT_SSL_FINGERPRINT);
  97. },
  98. []() -> uint16_t {
  99. return getSetting("mqttScMFLN", MQTT_SECURE_CLIENT_MFLN).toInt();
  100. },
  101. true
  102. };
  103. #endif
  104. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  105. void _mqttSetupAsyncClient(bool secure = false) {
  106. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  107. _mqtt.setClientId(_mqtt_clientid.c_str());
  108. _mqtt.setKeepAlive(_mqtt_keepalive);
  109. _mqtt.setCleanSession(false);
  110. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, MQTT_STATUS_OFFLINE);
  111. if (_mqtt_user.length() && _mqtt_pass.length()) {
  112. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  113. _mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
  114. }
  115. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  116. if (secure) {
  117. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  118. _mqtt.setSecure(secure);
  119. }
  120. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  121. _mqtt.connect();
  122. }
  123. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  124. #if (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  125. bool _mqttSetupSyncClient(bool secure = false) {
  126. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  127. if (secure) {
  128. if (!_mqtt_client_secure) _mqtt_client_secure = std::make_unique<SecureClient>(_mqtt_sc_config);
  129. return _mqtt_client_secure->beforeConnected();
  130. }
  131. #endif
  132. return true;
  133. }
  134. bool _mqttConnectSyncClient(bool secure = false) {
  135. bool result = false;
  136. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  137. _mqtt.begin(_mqtt_server.c_str(), _mqtt_port, (secure ? _mqtt_client_secure->get() : _mqtt_client));
  138. _mqtt.setWill(_mqtt_will.c_str(), MQTT_STATUS_OFFLINE, _mqtt_qos, _mqtt_retain);
  139. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str());
  140. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  141. _mqtt.setClient(secure ? _mqtt_client_secure->get() : _mqtt_client);
  142. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  143. if (_mqtt_user.length() && _mqtt_pass.length()) {
  144. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  145. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, MQTT_STATUS_OFFLINE);
  146. } else {
  147. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, MQTT_STATUS_OFFLINE);
  148. }
  149. #endif
  150. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  151. if (result && secure) {
  152. result = _mqtt_client_secure->afterConnected();
  153. }
  154. #endif
  155. return result;
  156. }
  157. #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  158. void _mqttConnect() {
  159. // Do not connect if disabled
  160. if (!_mqtt_enabled) return;
  161. // Do not connect if already connected or still trying to connect
  162. if (_mqtt.connected() || _mqtt_connecting) return;
  163. // Check reconnect interval
  164. if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
  165. // Increase the reconnect delay
  166. _mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
  167. if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
  168. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
  169. }
  170. #if MDNS_CLIENT_SUPPORT
  171. _mqtt_server = mdnsResolve(_mqtt_server);
  172. #endif
  173. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%u\n"), _mqtt_server.c_str(), _mqtt_port);
  174. DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
  175. DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
  176. DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
  177. DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
  178. DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
  179. _mqtt_connecting = true;
  180. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  181. const bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  182. #else
  183. const bool secure = false;
  184. #endif
  185. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  186. _mqttSetupAsyncClient(secure);
  187. #elif (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  188. if (_mqttSetupSyncClient(secure) && _mqttConnectSyncClient(secure)) {
  189. _mqttOnConnect();
  190. } else {
  191. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  192. _mqttOnDisconnect();
  193. }
  194. #else
  195. #error "please check that MQTT_LIBRARY is valid"
  196. #endif
  197. }
  198. void _mqttPlaceholders(String& text) {
  199. text.replace("{hostname}", getSetting("hostname"));
  200. text.replace("{magnitude}", "#");
  201. String mac = WiFi.macAddress();
  202. mac.replace(":", "");
  203. text.replace("{mac}", mac);
  204. }
  205. template<typename T>
  206. void _mqttApplySetting(T& current, T& updated) {
  207. if (current != updated) {
  208. current = std::move(updated);
  209. mqttDisconnect();
  210. }
  211. }
  212. template<typename T>
  213. void _mqttApplySetting(T& current, const T& updated) {
  214. if (current != updated) {
  215. current = updated;
  216. mqttDisconnect();
  217. }
  218. }
  219. template<typename T>
  220. void _mqttApplyTopic(T& current, const char* magnitude) {
  221. String updated = mqttTopic(magnitude, false);
  222. if (current != updated) {
  223. mqttFlush();
  224. current = std::move(updated);
  225. }
  226. }
  227. void _mqttConfigure() {
  228. // Enable only when server is set
  229. {
  230. String server = getSetting("mqttServer", MQTT_SERVER);
  231. uint16_t port = getSetting("mqttPort", MQTT_PORT).toInt();
  232. bool enabled = false;
  233. if (server.length()) {
  234. enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
  235. }
  236. _mqttApplySetting(_mqtt_server, server);
  237. _mqttApplySetting(_mqtt_enabled, enabled);
  238. _mqttApplySetting(_mqtt_port, port);
  239. if (!enabled) return;
  240. }
  241. // Get base topic and apply placeholders
  242. {
  243. String topic = getSetting("mqttTopic", MQTT_TOPIC);
  244. if (topic.endsWith("/")) topic.remove(topic.length()-1);
  245. // Replace things inside curly braces (like {hostname}, {mac} etc.)
  246. _mqttPlaceholders(topic);
  247. if (topic.indexOf("#") == -1) topic.concat("/#");
  248. _mqttApplySetting(_mqtt_topic, topic);
  249. _mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
  250. }
  251. // Getter and setter
  252. {
  253. String setter = getSetting("mqttSetter", MQTT_SETTER);
  254. String getter = getSetting("mqttGetter", MQTT_GETTER);
  255. bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS;
  256. _mqttApplySetting(_mqtt_setter, setter);
  257. _mqttApplySetting(_mqtt_getter, getter);
  258. _mqttApplySetting(_mqtt_forward, forward);
  259. }
  260. // MQTT options
  261. {
  262. String user = getSetting("mqttUser", MQTT_USER);
  263. _mqttPlaceholders(user);
  264. String pass = getSetting("mqttPassword", MQTT_PASS);
  265. unsigned char qos = getSetting("mqttQoS", MQTT_QOS).toInt();
  266. bool retain = getSetting("mqttRetain", MQTT_RETAIN).toInt() == 1;
  267. unsigned long keepalive = getSetting("mqttKeep", MQTT_KEEPALIVE).toInt();
  268. String id = getSetting("mqttClientID", getIdentifier());
  269. _mqttPlaceholders(id);
  270. _mqttApplySetting(_mqtt_user, user);
  271. _mqttApplySetting(_mqtt_pass, pass);
  272. _mqttApplySetting(_mqtt_qos, qos);
  273. _mqttApplySetting(_mqtt_retain, retain);
  274. _mqttApplySetting(_mqtt_keepalive, keepalive);
  275. _mqttApplySetting(_mqtt_clientid, id);
  276. }
  277. // MQTT JSON
  278. {
  279. _mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
  280. _mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
  281. }
  282. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  283. }
  284. void _mqttBackwards() {
  285. String mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  286. if (mqttTopic.indexOf("{identifier}") > 0) {
  287. mqttTopic.replace("{identifier}", "{hostname}");
  288. setSetting("mqttTopic", mqttTopic);
  289. }
  290. }
  291. void _mqttInfo() {
  292. DEBUG_MSG_P(PSTR(
  293. "[MQTT] "
  294. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  295. "AsyncMqttClient"
  296. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  297. "Arduino-MQTT"
  298. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  299. "PubSubClient"
  300. #endif
  301. ", SSL "
  302. #if SECURE_CLIENT != SEURE_CLIENT_NONE
  303. "ENABLED"
  304. #else
  305. "DISABLED"
  306. #endif
  307. ", Autoconnect "
  308. #if MQTT_AUTOCONNECT
  309. "ENABLED"
  310. #else
  311. "DISABLED"
  312. #endif
  313. "\n"
  314. ));
  315. DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
  316. _mqtt_enabled ? "ENABLED" : "DISABLED",
  317. _mqtt.connected() ? "CONNECTED" : "DISCONNECTED"
  318. );
  319. DEBUG_MSG_P(PSTR("[MQTT] Retry %s (Now %u, Last %u, Delay %u, Step %u)\n"),
  320. _mqtt_connecting ? "CONNECTING" : "WAITING",
  321. millis(),
  322. _mqtt_last_connection,
  323. _mqtt_reconnect_delay,
  324. MQTT_RECONNECT_DELAY_STEP
  325. );
  326. }
  327. // -----------------------------------------------------------------------------
  328. // WEB
  329. // -----------------------------------------------------------------------------
  330. #if WEB_SUPPORT
  331. bool _mqttWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  332. return (strncmp(key, "mqtt", 3) == 0);
  333. }
  334. void _mqttWebSocketOnVisible(JsonObject& root) {
  335. root["mqttVisible"] = 1;
  336. #if ASYNC_TCP_SSL_ENABLED
  337. root["mqttsslVisible"] = 1;
  338. #endif
  339. }
  340. void _mqttWebSocketOnData(JsonObject& root) {
  341. root["mqttStatus"] = mqttConnected();
  342. }
  343. void _mqttWebSocketOnConnected(JsonObject& root) {
  344. root["mqttEnabled"] = mqttEnabled();
  345. root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
  346. root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
  347. root["mqttUser"] = getSetting("mqttUser", MQTT_USER);
  348. root["mqttClientID"] = getSetting("mqttClientID");
  349. root["mqttPassword"] = getSetting("mqttPassword", MQTT_PASS);
  350. root["mqttKeep"] = _mqtt_keepalive;
  351. root["mqttRetain"] = _mqtt_retain;
  352. root["mqttQoS"] = _mqtt_qos;
  353. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  354. root["mqttUseSSL"] = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  355. root["mqttFP"] = getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  356. #endif
  357. root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
  358. root["mqttUseJson"] = getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1;
  359. }
  360. #endif
  361. // -----------------------------------------------------------------------------
  362. // SETTINGS
  363. // -----------------------------------------------------------------------------
  364. #if TERMINAL_SUPPORT
  365. void _mqttInitCommands() {
  366. terminalRegisterCommand(F("MQTT.RESET"), [](Embedis* e) {
  367. _mqttConfigure();
  368. mqttDisconnect();
  369. terminalOK();
  370. });
  371. terminalRegisterCommand(F("MQTT.INFO"), [](Embedis* e) {
  372. _mqttInfo();
  373. terminalOK();
  374. });
  375. }
  376. #endif // TERMINAL_SUPPORT
  377. // -----------------------------------------------------------------------------
  378. // MQTT Callbacks
  379. // -----------------------------------------------------------------------------
  380. void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
  381. if (type == MQTT_CONNECT_EVENT) {
  382. // Subscribe to internal action topics
  383. mqttSubscribe(MQTT_TOPIC_ACTION);
  384. // Flag system to send heartbeat
  385. systemSendHeartbeat();
  386. }
  387. if (type == MQTT_MESSAGE_EVENT) {
  388. // Match topic
  389. String t = mqttMagnitude((char *) topic);
  390. // Actions
  391. if (t.equals(MQTT_TOPIC_ACTION)) {
  392. if (strcmp(payload, MQTT_ACTION_RESET) == 0) {
  393. deferredReset(100, CUSTOM_RESET_MQTT);
  394. }
  395. }
  396. }
  397. }
  398. void _mqttOnConnect() {
  399. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  400. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  401. _mqtt_last_connection = millis();
  402. _mqtt_connecting = false;
  403. _mqtt_connected = true;
  404. // Clean subscriptions
  405. mqttUnsubscribeRaw("#");
  406. // Send connect event to subscribers
  407. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  408. (_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
  409. }
  410. }
  411. void _mqttOnDisconnect() {
  412. // Reset reconnection delay
  413. _mqtt_last_connection = millis();
  414. _mqtt_connecting = false;
  415. _mqtt_connected = false;
  416. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  417. // Send disconnect event to subscribers
  418. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  419. (_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
  420. }
  421. }
  422. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  423. if (len == 0) return;
  424. char message[len + 1];
  425. strlcpy(message, (char *) payload, len + 1);
  426. #if MQTT_SKIP_RETAINED
  427. if (millis() - _mqtt_last_connection < MQTT_SKIP_TIME) {
  428. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message);
  429. return;
  430. }
  431. #endif
  432. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  433. // Send message event to subscribers
  434. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  435. (_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
  436. }
  437. }
  438. // -----------------------------------------------------------------------------
  439. // Public API
  440. // -----------------------------------------------------------------------------
  441. /**
  442. Returns the magnitude part of a topic
  443. @param topic the full MQTT topic
  444. @return String object with the magnitude part.
  445. */
  446. String mqttMagnitude(char * topic) {
  447. String pattern = _mqtt_topic + _mqtt_setter;
  448. int position = pattern.indexOf("#");
  449. if (position == -1) return String();
  450. String start = pattern.substring(0, position);
  451. String end = pattern.substring(position + 1);
  452. String magnitude = String(topic);
  453. if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
  454. magnitude.replace(start, "");
  455. magnitude.replace(end, "");
  456. } else {
  457. magnitude = String();
  458. }
  459. return magnitude;
  460. }
  461. /**
  462. Returns a full MQTT topic from the magnitude
  463. @param magnitude the magnitude part of the topic.
  464. @param is_set whether to build a command topic (true)
  465. or a state topic (false).
  466. @return String full MQTT topic.
  467. */
  468. String mqttTopic(const char * magnitude, bool is_set) {
  469. String output = _mqtt_topic;
  470. output.replace("#", magnitude);
  471. output += is_set ? _mqtt_setter : _mqtt_getter;
  472. return output;
  473. }
  474. /**
  475. Returns a full MQTT topic from the magnitude
  476. @param magnitude the magnitude part of the topic.
  477. @param index index of the magnitude when more than one such magnitudes.
  478. @param is_set whether to build a command topic (true)
  479. or a state topic (false).
  480. @return String full MQTT topic.
  481. */
  482. String mqttTopic(const char * magnitude, unsigned int index, bool is_set) {
  483. char buffer[strlen(magnitude)+5];
  484. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), magnitude, index);
  485. return mqttTopic(buffer, is_set);
  486. }
  487. // -----------------------------------------------------------------------------
  488. void mqttSendRaw(const char * topic, const char * message, bool retain) {
  489. if (_mqtt.connected()) {
  490. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  491. unsigned int packetId = _mqtt.publish(topic, _mqtt_qos, retain, message);
  492. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId);
  493. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  494. _mqtt.publish(topic, message, retain, _mqtt_qos);
  495. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
  496. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  497. _mqtt.publish(topic, message, retain);
  498. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
  499. #endif
  500. }
  501. }
  502. void mqttSendRaw(const char * topic, const char * message) {
  503. mqttSendRaw (topic, message, _mqtt_retain);
  504. }
  505. void mqttSend(const char * topic, const char * message, bool force, bool retain) {
  506. bool useJson = force ? false : _mqtt_use_json;
  507. // Equeue message
  508. if (useJson) {
  509. // Enqueue new message
  510. mqttEnqueue(topic, message);
  511. // Reset flush timer
  512. _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
  513. // Send it right away
  514. } else {
  515. mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
  516. }
  517. }
  518. void mqttSend(const char * topic, const char * message, bool force) {
  519. mqttSend(topic, message, force, _mqtt_retain);
  520. }
  521. void mqttSend(const char * topic, const char * message) {
  522. mqttSend(topic, message, false);
  523. }
  524. void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
  525. char buffer[strlen(topic)+5];
  526. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
  527. mqttSend(buffer, message, force, retain);
  528. }
  529. void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
  530. mqttSend(topic, index, message, force, _mqtt_retain);
  531. }
  532. void mqttSend(const char * topic, unsigned int index, const char * message) {
  533. mqttSend(topic, index, message, false);
  534. }
  535. // -----------------------------------------------------------------------------
  536. unsigned char _mqttBuildTree(JsonObject& root, char parent) {
  537. unsigned char count = 0;
  538. // Add enqueued messages
  539. for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
  540. mqtt_message_t element = _mqtt_queue[i];
  541. if (element.parent == parent) {
  542. ++count;
  543. JsonObject& elements = root.createNestedObject(element.topic);
  544. unsigned char num = _mqttBuildTree(elements, i);
  545. if (0 == num) {
  546. if (isNumber(element.message)) {
  547. double value = atof(element.message);
  548. if (value == int(value)) {
  549. root.set(element.topic, int(value));
  550. } else {
  551. root.set(element.topic, value);
  552. }
  553. } else {
  554. root.set(element.topic, element.message);
  555. }
  556. }
  557. }
  558. }
  559. return count;
  560. }
  561. void mqttFlush() {
  562. if (!_mqtt.connected()) return;
  563. if (_mqtt_queue.size() == 0) return;
  564. // Build tree recursively
  565. DynamicJsonBuffer jsonBuffer(1024);
  566. JsonObject& root = jsonBuffer.createObject();
  567. _mqttBuildTree(root, mqtt_message_t::END);
  568. // Add extra propeties
  569. #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
  570. if (ntpSynced()) root[MQTT_TOPIC_TIME] = ntpDateTime();
  571. #endif
  572. #if MQTT_ENQUEUE_MAC
  573. root[MQTT_TOPIC_MAC] = WiFi.macAddress();
  574. #endif
  575. #if MQTT_ENQUEUE_HOSTNAME
  576. root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
  577. #endif
  578. #if MQTT_ENQUEUE_IP
  579. root[MQTT_TOPIC_IP] = getIP();
  580. #endif
  581. #if MQTT_ENQUEUE_MESSAGE_ID
  582. root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
  583. #endif
  584. // Send
  585. String output;
  586. root.printTo(output);
  587. jsonBuffer.clear();
  588. mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
  589. // Clear queue
  590. for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
  591. mqtt_message_t element = _mqtt_queue[i];
  592. free(element.topic);
  593. if (element.message) {
  594. free(element.message);
  595. }
  596. }
  597. _mqtt_queue.clear();
  598. }
  599. int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) {
  600. // Queue is not meant to send message "offline"
  601. // We must prevent the queue does not get full while offline
  602. if (!_mqtt.connected()) return -1;
  603. // Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached
  604. if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();
  605. int8_t index = _mqtt_queue.size();
  606. // Enqueue new message
  607. mqtt_message_t element;
  608. element.parent = parent;
  609. element.topic = strdup(topic);
  610. if (NULL != message) {
  611. element.message = strdup(message);
  612. }
  613. _mqtt_queue.push_back(element);
  614. return index;
  615. }
  616. int8_t mqttEnqueue(const char * topic, const char * message) {
  617. return mqttEnqueue(topic, message, mqtt_message_t::END);
  618. }
  619. // -----------------------------------------------------------------------------
  620. void mqttSubscribeRaw(const char * topic) {
  621. if (_mqtt.connected() && (strlen(topic) > 0)) {
  622. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  623. unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
  624. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
  625. #else // Arduino-MQTT or PubSubClient
  626. _mqtt.subscribe(topic, _mqtt_qos);
  627. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
  628. #endif
  629. }
  630. }
  631. void mqttSubscribe(const char * topic) {
  632. mqttSubscribeRaw(mqttTopic(topic, true).c_str());
  633. }
  634. void mqttUnsubscribeRaw(const char * topic) {
  635. if (_mqtt.connected() && (strlen(topic) > 0)) {
  636. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  637. unsigned int packetId = _mqtt.unsubscribe(topic);
  638. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
  639. #else // Arduino-MQTT or PubSubClient
  640. _mqtt.unsubscribe(topic);
  641. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
  642. #endif
  643. }
  644. }
  645. void mqttUnsubscribe(const char * topic) {
  646. mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
  647. }
  648. // -----------------------------------------------------------------------------
  649. void mqttEnabled(bool status) {
  650. _mqtt_enabled = status;
  651. }
  652. bool mqttEnabled() {
  653. return _mqtt_enabled;
  654. }
  655. bool mqttConnected() {
  656. return _mqtt.connected();
  657. }
  658. void mqttDisconnect() {
  659. if (_mqtt.connected()) {
  660. DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
  661. _mqtt.disconnect();
  662. }
  663. }
  664. bool mqttForward() {
  665. return _mqtt_forward;
  666. }
  667. void mqttRegister(mqtt_callback_f callback) {
  668. _mqtt_callbacks.push_back(callback);
  669. }
  670. void mqttSetBroker(IPAddress ip, unsigned int port) {
  671. setSetting("mqttServer", ip.toString());
  672. _mqtt_server = ip.toString();
  673. setSetting("mqttPort", port);
  674. _mqtt_port = port;
  675. mqttEnabled(MQTT_AUTOCONNECT);
  676. }
  677. void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
  678. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) mqttSetBroker(ip, port);
  679. }
  680. // -----------------------------------------------------------------------------
  681. // Initialization
  682. // -----------------------------------------------------------------------------
  683. void mqttSetup() {
  684. _mqttBackwards();
  685. _mqttInfo();
  686. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  687. // XXX: should not place this in config, addServerFingerprint does not check for duplicates
  688. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  689. {
  690. if (_mqtt_sc_config.on_fingerprint) {
  691. const String fingerprint = _mqtt_sc_config.on_fingerprint();
  692. uint8_t buffer[20] = {0};
  693. if (sslFingerPrintArray(fingerprint.c_str(), buffer)) {
  694. _mqtt.addServerFingerprint(buffer);
  695. }
  696. }
  697. }
  698. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  699. _mqtt.onConnect([](bool sessionPresent) {
  700. _mqttOnConnect();
  701. });
  702. _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  703. if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
  704. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  705. }
  706. if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) {
  707. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  708. }
  709. if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) {
  710. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  711. }
  712. if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) {
  713. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  714. }
  715. if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
  716. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  717. }
  718. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  719. if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
  720. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  721. }
  722. #endif
  723. _mqttOnDisconnect();
  724. });
  725. _mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  726. _mqttOnMessage(topic, payload, len);
  727. });
  728. _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
  729. DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId);
  730. });
  731. _mqtt.onPublish([](uint16_t packetId) {
  732. DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId);
  733. });
  734. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  735. _mqtt.onMessageAdvanced([](MQTTClient *client, char topic[], char payload[], int length) {
  736. _mqttOnMessage(topic, payload, length);
  737. });
  738. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  739. _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  740. _mqttOnMessage(topic, (char *) payload, length);
  741. });
  742. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  743. _mqttConfigure();
  744. mqttRegister(_mqttCallback);
  745. #if WEB_SUPPORT
  746. wsRegister()
  747. .onVisible(_mqttWebSocketOnVisible)
  748. .onData(_mqttWebSocketOnData)
  749. .onConnected(_mqttWebSocketOnConnected)
  750. .onKeyCheck(_mqttWebSocketOnKeyCheck);
  751. mqttRegister([](unsigned int type, const char*, const char*) {
  752. if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) wsPost(_mqttWebSocketOnData);
  753. });
  754. #endif
  755. #if TERMINAL_SUPPORT
  756. _mqttInitCommands();
  757. #endif
  758. // Main callbacks
  759. espurnaRegisterLoop(mqttLoop);
  760. espurnaRegisterReload(_mqttConfigure);
  761. }
  762. void mqttLoop() {
  763. if (WiFi.status() != WL_CONNECTED) return;
  764. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  765. _mqttConnect();
  766. #else // MQTT_LIBRARY != MQTT_LIBRARY_ASYNCMQTTCLIENT
  767. if (_mqtt.connected()) {
  768. _mqtt.loop();
  769. } else {
  770. if (_mqtt_connected) {
  771. _mqttOnDisconnect();
  772. }
  773. _mqttConnect();
  774. }
  775. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  776. }
  777. #else
  778. bool mqttForward() {
  779. return false;
  780. }
  781. #endif // MQTT_SUPPORT