Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1165 lines
34 KiB

7 years ago
6 years ago
6 years ago
6 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. Updated secure client support by Niek van der Maas < mail at niekvandermaas dot nl>
  5. */
  6. #include "mqtt.h"
  7. #if MQTT_SUPPORT
  8. #include <vector>
  9. #include <utility>
  10. #include <Ticker.h>
  11. #include "system.h"
  12. #include "mdns.h"
  13. #include "mqtt.h"
  14. #include "ntp.h"
  15. #include "rpc.h"
  16. #include "rtcmem.h"
  17. #include "ws.h"
  18. #include "libs/AsyncClientHelpers.h"
  19. #include "libs/SecureClientHelpers.h"
  20. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  21. AsyncMqttClient _mqtt;
  22. #else // MQTT_LIBRARY_ARDUINOMQTT / MQTT_LIBRARY_PUBSUBCLIENT
  23. WiFiClient _mqtt_client;
  24. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  25. std::unique_ptr<SecureClient> _mqtt_client_secure = nullptr;
  26. #if MQTT_SECURE_CLIENT_INCLUDE_CA
  27. #include "static/mqtt_client_trusted_root_ca.h" // Assumes this header file defines a _mqtt_client_trusted_root_ca[] PROGMEM = "...PEM data..."
  28. #else
  29. #include "static/letsencrypt_isrgroot_pem.h" // Default to LetsEncrypt X3 certificate
  30. #define _mqtt_client_trusted_root_ca _ssl_letsencrypt_isrg_x3_ca
  31. #endif // MQTT_SECURE_CLIENT_INCLUDE_CA
  32. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  33. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  34. MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE);
  35. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  36. PubSubClient _mqtt;
  37. #endif
  38. #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
  39. bool _mqtt_enabled = MQTT_ENABLED;
  40. bool _mqtt_use_json = false;
  41. unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  42. unsigned long _mqtt_last_connection = 0;
  43. AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
  44. bool _mqtt_retain_skipped = false;
  45. bool _mqtt_retain = MQTT_RETAIN;
  46. int _mqtt_qos = MQTT_QOS;
  47. int _mqtt_keepalive = MQTT_KEEPALIVE;
  48. String _mqtt_topic;
  49. String _mqtt_topic_json;
  50. String _mqtt_setter;
  51. String _mqtt_getter;
  52. bool _mqtt_forward;
  53. String _mqtt_user;
  54. String _mqtt_pass;
  55. String _mqtt_will;
  56. String _mqtt_server;
  57. uint16_t _mqtt_port;
  58. String _mqtt_clientid;
  59. String _mqtt_payload_online;
  60. String _mqtt_payload_offline;
  61. std::vector<mqtt_callback_f> _mqtt_callbacks;
  62. struct mqtt_message_t {
  63. static const unsigned char END = 255;
  64. unsigned char parent = END;
  65. char * topic;
  66. char * message = NULL;
  67. };
  68. std::vector<mqtt_message_t> _mqtt_queue;
  69. Ticker _mqtt_flush_ticker;
  70. // -----------------------------------------------------------------------------
  71. // Private
  72. // -----------------------------------------------------------------------------
  73. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  74. SecureClientConfig _mqtt_sc_config {
  75. "MQTT",
  76. []() -> String {
  77. return _mqtt_server;
  78. },
  79. []() -> int {
  80. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK);
  81. },
  82. []() -> String {
  83. return getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  84. },
  85. true
  86. };
  87. #endif
  88. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  89. SecureClientConfig _mqtt_sc_config {
  90. "MQTT",
  91. []() -> int {
  92. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK);
  93. },
  94. []() -> PGM_P {
  95. return _mqtt_client_trusted_root_ca;
  96. },
  97. []() -> String {
  98. return getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  99. },
  100. []() -> uint16_t {
  101. return getSetting("mqttScMFLN", MQTT_SECURE_CLIENT_MFLN);
  102. },
  103. true
  104. };
  105. #endif
  106. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  107. void _mqttSetupAsyncClient(bool secure = false) {
  108. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  109. _mqtt.setClientId(_mqtt_clientid.c_str());
  110. _mqtt.setKeepAlive(_mqtt_keepalive);
  111. _mqtt.setCleanSession(false);
  112. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  113. if (_mqtt_user.length() && _mqtt_pass.length()) {
  114. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  115. _mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
  116. }
  117. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  118. if (secure) {
  119. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  120. _mqtt.setSecure(secure);
  121. }
  122. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  123. _mqtt.connect();
  124. }
  125. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  126. #if (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  127. WiFiClient& _mqttGetClient(bool secure) {
  128. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  129. return (secure ? _mqtt_client_secure->get() : _mqtt_client);
  130. #else
  131. return _mqtt_client;
  132. #endif
  133. }
  134. bool _mqttSetupSyncClient(bool secure = false) {
  135. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  136. if (secure) {
  137. if (!_mqtt_client_secure) _mqtt_client_secure = std::make_unique<SecureClient>(_mqtt_sc_config);
  138. return _mqtt_client_secure->beforeConnected();
  139. }
  140. #endif
  141. return true;
  142. }
  143. bool _mqttConnectSyncClient(bool secure = false) {
  144. bool result = false;
  145. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  146. _mqtt.begin(_mqtt_server.c_str(), _mqtt_port, _mqttGetClient(secure));
  147. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_payload_offline.c_str(), _mqtt_retain, _mqtt_qos);
  148. _mqtt.setKeepAlive(_mqtt_keepalive);
  149. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str());
  150. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  151. _mqtt.setClient(_mqttGetClient(secure));
  152. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  153. if (_mqtt_user.length() && _mqtt_pass.length()) {
  154. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  155. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  156. } else {
  157. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  158. }
  159. #endif
  160. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  161. if (result && secure) {
  162. result = _mqtt_client_secure->afterConnected();
  163. }
  164. #endif
  165. return result;
  166. }
  167. #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  168. void _mqttPlaceholders(String& text) {
  169. text.replace("{hostname}", getSetting("hostname"));
  170. text.replace("{magnitude}", "#");
  171. String mac = WiFi.macAddress();
  172. mac.replace(":", "");
  173. text.replace("{mac}", mac);
  174. }
  175. template<typename T>
  176. void _mqttApplySetting(T& current, T& updated) {
  177. if (current != updated) {
  178. current = std::move(updated);
  179. mqttDisconnect();
  180. }
  181. }
  182. template<typename T>
  183. void _mqttApplySetting(T& current, const T& updated) {
  184. if (current != updated) {
  185. current = updated;
  186. mqttDisconnect();
  187. }
  188. }
  189. template<typename T>
  190. void _mqttApplyTopic(T& current, const char* magnitude) {
  191. String updated = mqttTopic(magnitude, false);
  192. if (current != updated) {
  193. mqttFlush();
  194. current = std::move(updated);
  195. }
  196. }
  197. void _mqttConfigure() {
  198. // Enable only when server is set
  199. {
  200. const String server = getSetting("mqttServer", MQTT_SERVER);
  201. const auto port = getSetting("mqttPort", static_cast<uint16_t>(MQTT_PORT));
  202. bool enabled = false;
  203. if (server.length()) {
  204. enabled = getSetting("mqttEnabled", 1 == MQTT_ENABLED);
  205. }
  206. _mqttApplySetting(_mqtt_server, server);
  207. _mqttApplySetting(_mqtt_enabled, enabled);
  208. _mqttApplySetting(_mqtt_port, port);
  209. if (!enabled) return;
  210. }
  211. // Get base topic and apply placeholders
  212. {
  213. String topic = getSetting("mqttTopic", MQTT_TOPIC);
  214. if (topic.endsWith("/")) topic.remove(topic.length()-1);
  215. // Replace things inside curly braces (like {hostname}, {mac} etc.)
  216. _mqttPlaceholders(topic);
  217. if (topic.indexOf("#") == -1) topic.concat("/#");
  218. _mqttApplySetting(_mqtt_topic, topic);
  219. }
  220. // Getter and setter
  221. {
  222. String setter = getSetting("mqttSetter", MQTT_SETTER);
  223. String getter = getSetting("mqttGetter", MQTT_GETTER);
  224. bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS;
  225. _mqttApplySetting(_mqtt_setter, setter);
  226. _mqttApplySetting(_mqtt_getter, getter);
  227. _mqttApplySetting(_mqtt_forward, forward);
  228. }
  229. // MQTT options
  230. {
  231. String user = getSetting("mqttUser", MQTT_USER);
  232. _mqttPlaceholders(user);
  233. String pass = getSetting("mqttPassword", MQTT_PASS);
  234. const auto qos = getSetting("mqttQoS", MQTT_QOS);
  235. const bool retain = getSetting("mqttRetain", 1 == MQTT_RETAIN);
  236. // Note: MQTT spec defines this as 2 bytes
  237. const auto keepalive = constrain(
  238. getSetting("mqttKeep", MQTT_KEEPALIVE),
  239. 0, std::numeric_limits<uint16_t>::max()
  240. );
  241. String id = getSetting("mqttClientID", getIdentifier());
  242. _mqttPlaceholders(id);
  243. _mqttApplySetting(_mqtt_user, user);
  244. _mqttApplySetting(_mqtt_pass, pass);
  245. _mqttApplySetting(_mqtt_qos, qos);
  246. _mqttApplySetting(_mqtt_retain, retain);
  247. _mqttApplySetting(_mqtt_keepalive, keepalive);
  248. _mqttApplySetting(_mqtt_clientid, id);
  249. }
  250. // MQTT WILL
  251. {
  252. _mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
  253. }
  254. // MQTT JSON
  255. {
  256. _mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", 1 == MQTT_USE_JSON));
  257. _mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
  258. }
  259. // Custom payload strings
  260. settingsProcessConfig({
  261. {_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE},
  262. {_mqtt_payload_offline, "mqttPayloadOffline", MQTT_STATUS_OFFLINE}
  263. });
  264. // Reset reconnect delay to reconnect sooner
  265. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  266. }
  267. void _mqttBackwards() {
  268. String mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  269. if (mqttTopic.indexOf("{identifier}") > 0) {
  270. mqttTopic.replace("{identifier}", "{hostname}");
  271. setSetting("mqttTopic", mqttTopic);
  272. }
  273. }
  274. void _mqttInfo() {
  275. // Build information
  276. {
  277. #define __MQTT_INFO_STR(X) #X
  278. #define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X)
  279. DEBUG_MSG_P(PSTR(
  280. "[MQTT] "
  281. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  282. "AsyncMqttClient"
  283. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  284. "Arduino-MQTT"
  285. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  286. "PubSubClient"
  287. #endif
  288. ", SSL "
  289. #if SECURE_CLIENT != SEURE_CLIENT_NONE
  290. "ENABLED"
  291. #else
  292. "DISABLED"
  293. #endif
  294. ", Autoconnect "
  295. #if MQTT_AUTOCONNECT
  296. "ENABLED"
  297. #else
  298. "DISABLED"
  299. #endif
  300. ", Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " bytes"
  301. "\n"
  302. ));
  303. #undef _MQTT_INFO_STR
  304. #undef __MQTT_INFO_STR
  305. }
  306. // Notify about the general state of the client
  307. {
  308. const __FlashStringHelper* enabled = _mqtt_enabled
  309. ? F("ENABLED")
  310. : F("DISABLED");
  311. const __FlashStringHelper* state = nullptr;
  312. switch (_mqtt_state) {
  313. case AsyncClientState::Connecting:
  314. state = F("CONNECTING");
  315. break;
  316. case AsyncClientState::Connected:
  317. state = F("CONNECTED");
  318. break;
  319. case AsyncClientState::Disconnected:
  320. state = F("DISCONNECTED");
  321. break;
  322. case AsyncClientState::Disconnecting:
  323. state = F("DISCONNECTING");
  324. break;
  325. default:
  326. state = F("WAITING");
  327. break;
  328. }
  329. DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
  330. String(enabled).c_str(),
  331. String(state).c_str()
  332. );
  333. if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) {
  334. DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"),
  335. _mqtt_last_connection,
  336. _mqtt_reconnect_delay,
  337. MQTT_RECONNECT_DELAY_STEP
  338. );
  339. }
  340. }
  341. }
  342. // -----------------------------------------------------------------------------
  343. // WEB
  344. // -----------------------------------------------------------------------------
  345. #if WEB_SUPPORT
  346. bool _mqttWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  347. return (strncmp(key, "mqtt", 3) == 0);
  348. }
  349. void _mqttWebSocketOnVisible(JsonObject& root) {
  350. root["mqttVisible"] = 1;
  351. #if ASYNC_TCP_SSL_ENABLED
  352. root["mqttsslVisible"] = 1;
  353. #endif
  354. }
  355. void _mqttWebSocketOnData(JsonObject& root) {
  356. root["mqttStatus"] = mqttConnected();
  357. }
  358. void _mqttWebSocketOnConnected(JsonObject& root) {
  359. root["mqttEnabled"] = mqttEnabled();
  360. root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
  361. root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
  362. root["mqttUser"] = getSetting("mqttUser", MQTT_USER);
  363. root["mqttClientID"] = getSetting("mqttClientID");
  364. root["mqttPassword"] = getSetting("mqttPassword", MQTT_PASS);
  365. root["mqttKeep"] = _mqtt_keepalive;
  366. root["mqttRetain"] = _mqtt_retain;
  367. root["mqttQoS"] = _mqtt_qos;
  368. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  369. root["mqttUseSSL"] = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
  370. root["mqttFP"] = getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  371. #endif
  372. root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
  373. root["mqttUseJson"] = getSetting("mqttUseJson", 1 == MQTT_USE_JSON);
  374. }
  375. #endif
  376. // -----------------------------------------------------------------------------
  377. // SETTINGS
  378. // -----------------------------------------------------------------------------
  379. #if TERMINAL_SUPPORT
  380. void _mqttInitCommands() {
  381. terminalRegisterCommand(F("MQTT.RESET"), [](Embedis* e) {
  382. _mqttConfigure();
  383. mqttDisconnect();
  384. terminalOK();
  385. });
  386. terminalRegisterCommand(F("MQTT.INFO"), [](Embedis* e) {
  387. _mqttInfo();
  388. terminalOK();
  389. });
  390. }
  391. #endif // TERMINAL_SUPPORT
  392. // -----------------------------------------------------------------------------
  393. // MQTT Callbacks
  394. // -----------------------------------------------------------------------------
  395. void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
  396. if (type == MQTT_CONNECT_EVENT) {
  397. // Subscribe to internal action topics
  398. mqttSubscribe(MQTT_TOPIC_ACTION);
  399. // Flag system to send heartbeat
  400. systemSendHeartbeat();
  401. }
  402. if (type == MQTT_MESSAGE_EVENT) {
  403. // Match topic
  404. String t = mqttMagnitude((char *) topic);
  405. // Actions
  406. if (t.equals(MQTT_TOPIC_ACTION)) {
  407. rpcHandleAction(payload);
  408. }
  409. }
  410. }
  411. void _mqttOnConnect() {
  412. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  413. _mqtt_last_connection = millis();
  414. _mqtt_state = AsyncClientState::Connected;
  415. _mqtt_retain_skipped = false;
  416. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  417. // Clean subscriptions
  418. mqttUnsubscribeRaw("#");
  419. // Notify all subscribers about the connection
  420. for (auto& callback : _mqtt_callbacks) {
  421. callback(MQTT_CONNECT_EVENT, nullptr, nullptr);
  422. }
  423. }
  424. void _mqttOnDisconnect() {
  425. // Reset reconnection delay
  426. _mqtt_last_connection = millis();
  427. _mqtt_state = AsyncClientState::Disconnected;
  428. _mqtt_retain_skipped = false;
  429. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  430. // Notify all subscribers about the disconnect
  431. for (auto& callback : _mqtt_callbacks) {
  432. callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr);
  433. }
  434. }
  435. // Force-skip everything received in a short window right after connecting to avoid syncronization issues.
  436. bool _mqttMaybeSkipRetained(char* topic) {
  437. #if MQTT_SKIP_RETAINED
  438. if (!_mqtt_retain_skipped && (millis() - _mqtt_last_connection < MQTT_SKIP_TIME)) {
  439. DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
  440. return true;
  441. }
  442. #endif
  443. _mqtt_retain_skipped = true;
  444. return false;
  445. }
  446. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  447. // MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could*
  448. // receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the
  449. // data until `(len + index) == total`.
  450. // TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String.
  451. // In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages.
  452. // TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks.
  453. void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  454. if (!len || (len > MQTT_BUFFER_MAX_SIZE) || (total > MQTT_BUFFER_MAX_SIZE)) return;
  455. if (_mqttMaybeSkipRetained(topic)) return;
  456. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  457. memmove(message + index, (char *) payload, len);
  458. // Not done yet
  459. if (total != (len + index)) {
  460. DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total);
  461. return;
  462. }
  463. message[len + index] = '\0';
  464. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  465. // Call subscribers with the message buffer
  466. for (auto& callback : _mqtt_callbacks) {
  467. callback(MQTT_MESSAGE_EVENT, topic, message);
  468. }
  469. }
  470. #else
  471. // Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/
  472. // TODO: consider reworking this (and async counterpart), giving callback func length of the message.
  473. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  474. if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return;
  475. if (_mqttMaybeSkipRetained(topic)) return;
  476. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  477. memmove(message, (char *) payload, len);
  478. message[len] = '\0';
  479. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  480. // Call subscribers with the message buffer
  481. for (auto& callback : _mqtt_callbacks) {
  482. callback(MQTT_MESSAGE_EVENT, topic, message);
  483. }
  484. }
  485. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  486. // -----------------------------------------------------------------------------
  487. // Public API
  488. // -----------------------------------------------------------------------------
  489. /**
  490. Returns the magnitude part of a topic
  491. @param topic the full MQTT topic
  492. @return String object with the magnitude part.
  493. */
  494. String mqttMagnitude(char * topic) {
  495. String pattern = _mqtt_topic + _mqtt_setter;
  496. int position = pattern.indexOf("#");
  497. if (position == -1) return String();
  498. String start = pattern.substring(0, position);
  499. String end = pattern.substring(position + 1);
  500. String magnitude = String(topic);
  501. if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
  502. magnitude.replace(start, "");
  503. magnitude.replace(end, "");
  504. } else {
  505. magnitude = String();
  506. }
  507. return magnitude;
  508. }
  509. /**
  510. Returns a full MQTT topic from the magnitude
  511. @param magnitude the magnitude part of the topic.
  512. @param is_set whether to build a command topic (true)
  513. or a state topic (false).
  514. @return String full MQTT topic.
  515. */
  516. String mqttTopic(const char * magnitude, bool is_set) {
  517. String output = _mqtt_topic;
  518. output.replace("#", magnitude);
  519. output += is_set ? _mqtt_setter : _mqtt_getter;
  520. return output;
  521. }
  522. /**
  523. Returns a full MQTT topic from the magnitude
  524. @param magnitude the magnitude part of the topic.
  525. @param index index of the magnitude when more than one such magnitudes.
  526. @param is_set whether to build a command topic (true)
  527. or a state topic (false).
  528. @return String full MQTT topic.
  529. */
  530. String mqttTopic(const char * magnitude, unsigned int index, bool is_set) {
  531. char buffer[strlen(magnitude)+5];
  532. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), magnitude, index);
  533. return mqttTopic(buffer, is_set);
  534. }
  535. // -----------------------------------------------------------------------------
  536. bool mqttSendRaw(const char * topic, const char * message, bool retain) {
  537. if (!_mqtt.connected()) return false;
  538. const unsigned int packetId(
  539. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  540. _mqtt.publish(topic, _mqtt_qos, retain, message)
  541. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  542. _mqtt.publish(topic, message, retain, _mqtt_qos)
  543. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  544. _mqtt.publish(topic, message, retain)
  545. #endif
  546. );
  547. const size_t message_len = strlen(message);
  548. if (message_len > 128) {
  549. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, message_len, packetId);
  550. } else {
  551. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
  552. }
  553. return (packetId > 0);
  554. }
  555. bool mqttSendRaw(const char * topic, const char * message) {
  556. return mqttSendRaw (topic, message, _mqtt_retain);
  557. }
  558. void mqttSend(const char * topic, const char * message, bool force, bool retain) {
  559. bool useJson = force ? false : _mqtt_use_json;
  560. // Equeue message
  561. if (useJson) {
  562. // Enqueue new message
  563. mqttEnqueue(topic, message);
  564. // Reset flush timer
  565. _mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
  566. // Send it right away
  567. } else {
  568. mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
  569. }
  570. }
  571. void mqttSend(const char * topic, const char * message, bool force) {
  572. mqttSend(topic, message, force, _mqtt_retain);
  573. }
  574. void mqttSend(const char * topic, const char * message) {
  575. mqttSend(topic, message, false);
  576. }
  577. void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
  578. char buffer[strlen(topic)+5];
  579. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
  580. mqttSend(buffer, message, force, retain);
  581. }
  582. void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
  583. mqttSend(topic, index, message, force, _mqtt_retain);
  584. }
  585. void mqttSend(const char * topic, unsigned int index, const char * message) {
  586. mqttSend(topic, index, message, false);
  587. }
  588. // -----------------------------------------------------------------------------
  589. unsigned char _mqttBuildTree(JsonObject& root, char parent) {
  590. unsigned char count = 0;
  591. // Add enqueued messages
  592. for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
  593. mqtt_message_t element = _mqtt_queue[i];
  594. if (element.parent == parent) {
  595. ++count;
  596. JsonObject& elements = root.createNestedObject(element.topic);
  597. unsigned char num = _mqttBuildTree(elements, i);
  598. if (0 == num) {
  599. if (isNumber(element.message)) {
  600. double value = atof(element.message);
  601. if (value == int(value)) {
  602. root.set(element.topic, int(value));
  603. } else {
  604. root.set(element.topic, value);
  605. }
  606. } else {
  607. root.set(element.topic, element.message);
  608. }
  609. }
  610. }
  611. }
  612. return count;
  613. }
  614. void mqttFlush() {
  615. if (!_mqtt.connected()) return;
  616. if (_mqtt_queue.size() == 0) return;
  617. // Build tree recursively
  618. DynamicJsonBuffer jsonBuffer(1024);
  619. JsonObject& root = jsonBuffer.createObject();
  620. _mqttBuildTree(root, mqtt_message_t::END);
  621. // Add extra propeties
  622. #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
  623. if (ntpSynced()) root[MQTT_TOPIC_TIME] = ntpDateTime();
  624. #endif
  625. #if MQTT_ENQUEUE_MAC
  626. root[MQTT_TOPIC_MAC] = WiFi.macAddress();
  627. #endif
  628. #if MQTT_ENQUEUE_HOSTNAME
  629. root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
  630. #endif
  631. #if MQTT_ENQUEUE_IP
  632. root[MQTT_TOPIC_IP] = getIP();
  633. #endif
  634. #if MQTT_ENQUEUE_MESSAGE_ID
  635. root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
  636. #endif
  637. // Send
  638. String output;
  639. root.printTo(output);
  640. jsonBuffer.clear();
  641. mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
  642. // Clear queue
  643. for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
  644. mqtt_message_t element = _mqtt_queue[i];
  645. free(element.topic);
  646. if (element.message) {
  647. free(element.message);
  648. }
  649. }
  650. _mqtt_queue.clear();
  651. }
  652. int8_t mqttEnqueue(const char * topic, const char * message, unsigned char parent) {
  653. // Queue is not meant to send message "offline"
  654. // We must prevent the queue does not get full while offline
  655. if (!_mqtt.connected()) return -1;
  656. // Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached
  657. if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();
  658. int8_t index = _mqtt_queue.size();
  659. // Enqueue new message
  660. mqtt_message_t element;
  661. element.parent = parent;
  662. element.topic = strdup(topic);
  663. if (NULL != message) {
  664. element.message = strdup(message);
  665. }
  666. _mqtt_queue.push_back(element);
  667. return index;
  668. }
  669. int8_t mqttEnqueue(const char * topic, const char * message) {
  670. return mqttEnqueue(topic, message, mqtt_message_t::END);
  671. }
  672. // -----------------------------------------------------------------------------
  673. void mqttSubscribeRaw(const char * topic) {
  674. if (_mqtt.connected() && (strlen(topic) > 0)) {
  675. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  676. unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
  677. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
  678. #else // Arduino-MQTT or PubSubClient
  679. _mqtt.subscribe(topic, _mqtt_qos);
  680. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
  681. #endif
  682. }
  683. }
  684. void mqttSubscribe(const char * topic) {
  685. mqttSubscribeRaw(mqttTopic(topic, true).c_str());
  686. }
  687. void mqttUnsubscribeRaw(const char * topic) {
  688. if (_mqtt.connected() && (strlen(topic) > 0)) {
  689. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  690. unsigned int packetId = _mqtt.unsubscribe(topic);
  691. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
  692. #else // Arduino-MQTT or PubSubClient
  693. _mqtt.unsubscribe(topic);
  694. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
  695. #endif
  696. }
  697. }
  698. void mqttUnsubscribe(const char * topic) {
  699. mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
  700. }
  701. // -----------------------------------------------------------------------------
  702. void mqttEnabled(bool status) {
  703. _mqtt_enabled = status;
  704. }
  705. bool mqttEnabled() {
  706. return _mqtt_enabled;
  707. }
  708. bool mqttConnected() {
  709. return _mqtt.connected();
  710. }
  711. void mqttDisconnect() {
  712. if (_mqtt.connected()) {
  713. DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
  714. _mqtt.disconnect();
  715. }
  716. }
  717. bool mqttForward() {
  718. return _mqtt_forward;
  719. }
  720. void mqttRegister(mqtt_callback_f callback) {
  721. _mqtt_callbacks.push_back(callback);
  722. }
  723. void mqttSetBroker(IPAddress ip, uint16_t port) {
  724. setSetting("mqttServer", ip.toString());
  725. _mqtt_server = ip.toString();
  726. setSetting("mqttPort", port);
  727. _mqtt_port = port;
  728. mqttEnabled(1 == MQTT_AUTOCONNECT);
  729. }
  730. void mqttSetBrokerIfNone(IPAddress ip, uint16_t port) {
  731. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
  732. mqttSetBroker(ip, port);
  733. }
  734. }
  735. const String& mqttPayloadOnline() {
  736. return _mqtt_payload_online;
  737. }
  738. const String& mqttPayloadOffline() {
  739. return _mqtt_payload_offline;
  740. }
  741. const char* mqttPayloadStatus(bool status) {
  742. return status ? _mqtt_payload_online.c_str() : _mqtt_payload_offline.c_str();
  743. }
  744. void mqttSendStatus() {
  745. mqttSend(MQTT_TOPIC_STATUS, _mqtt_payload_online.c_str(), true);
  746. }
  747. // -----------------------------------------------------------------------------
  748. // Initialization
  749. // -----------------------------------------------------------------------------
  750. void _mqttConnect() {
  751. // Do not connect if disabled
  752. if (!_mqtt_enabled) return;
  753. // Do not connect if already connected or still trying to connect
  754. if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return;
  755. // Check reconnect interval
  756. if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
  757. // Increase the reconnect delay
  758. _mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
  759. if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
  760. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
  761. }
  762. #if MDNS_CLIENT_SUPPORT
  763. _mqtt_server = mdnsResolve(_mqtt_server);
  764. #endif
  765. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%u\n"), _mqtt_server.c_str(), _mqtt_port);
  766. DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
  767. DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
  768. DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), _mqtt_retain ? 1 : 0);
  769. DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %ds\n"), _mqtt_keepalive);
  770. DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
  771. _mqtt_state = AsyncClientState::Connecting;
  772. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  773. const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
  774. #else
  775. const bool secure = false;
  776. #endif
  777. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  778. _mqttSetupAsyncClient(secure);
  779. #elif (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  780. if (_mqttSetupSyncClient(secure) && _mqttConnectSyncClient(secure)) {
  781. _mqttOnConnect();
  782. } else {
  783. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  784. _mqttOnDisconnect();
  785. }
  786. #else
  787. #error "please check that MQTT_LIBRARY is valid"
  788. #endif
  789. }
  790. void mqttLoop() {
  791. if (WiFi.status() != WL_CONNECTED) return;
  792. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  793. _mqttConnect();
  794. #else // MQTT_LIBRARY != MQTT_LIBRARY_ASYNCMQTTCLIENT
  795. if (_mqtt.connected()) {
  796. _mqtt.loop();
  797. } else {
  798. if (_mqtt_state != AsyncClientState::Disconnected) {
  799. _mqttOnDisconnect();
  800. }
  801. _mqttConnect();
  802. }
  803. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  804. }
  805. void mqttSetup() {
  806. _mqttBackwards();
  807. _mqttInfo();
  808. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  809. // XXX: should not place this in config, addServerFingerprint does not check for duplicates
  810. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  811. {
  812. if (_mqtt_sc_config.on_fingerprint) {
  813. const String fingerprint = _mqtt_sc_config.on_fingerprint();
  814. uint8_t buffer[20] = {0};
  815. if (sslFingerPrintArray(fingerprint.c_str(), buffer)) {
  816. _mqtt.addServerFingerprint(buffer);
  817. }
  818. }
  819. }
  820. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  821. _mqtt.onMessage(_mqttOnMessageAsync);
  822. _mqtt.onConnect([](bool) {
  823. _mqttOnConnect();
  824. });
  825. _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
  826. DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %u\n"), packetId);
  827. });
  828. _mqtt.onPublish([](uint16_t packetId) {
  829. DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %u\n"), packetId);
  830. });
  831. _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  832. switch (reason) {
  833. case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED:
  834. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  835. break;
  836. case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED:
  837. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  838. break;
  839. case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE:
  840. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  841. break;
  842. case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS:
  843. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  844. break;
  845. case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED:
  846. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  847. break;
  848. case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT:
  849. #if ASYNC_TCP_SSL_ENABLED
  850. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  851. #endif
  852. break;
  853. case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
  854. // This is never used by the AsyncMqttClient source
  855. #if 0
  856. DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n"));
  857. #endif
  858. break;
  859. case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE:
  860. DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n"));
  861. break;
  862. }
  863. _mqttOnDisconnect();
  864. });
  865. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  866. _mqtt.onMessageAdvanced([](MQTTClient *client, char topic[], char payload[], int length) {
  867. _mqttOnMessage(topic, payload, length);
  868. });
  869. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  870. _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  871. _mqttOnMessage(topic, (char *) payload, length);
  872. });
  873. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  874. _mqttConfigure();
  875. mqttRegister(_mqttCallback);
  876. #if WEB_SUPPORT
  877. wsRegister()
  878. .onVisible(_mqttWebSocketOnVisible)
  879. .onData(_mqttWebSocketOnData)
  880. .onConnected(_mqttWebSocketOnConnected)
  881. .onKeyCheck(_mqttWebSocketOnKeyCheck);
  882. mqttRegister([](unsigned int type, const char*, const char*) {
  883. if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) wsPost(_mqttWebSocketOnData);
  884. });
  885. #endif
  886. #if TERMINAL_SUPPORT
  887. _mqttInitCommands();
  888. #endif
  889. // Main callbacks
  890. espurnaRegisterLoop(mqttLoop);
  891. espurnaRegisterReload(_mqttConfigure);
  892. }
  893. #endif // MQTT_SUPPORT