Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1250 lines
38 KiB

7 years ago
Terminal: change command-line parser (#2247) Change the underlying command line handling: - switch to a custom parser, inspired by redis / sds - update terminalRegisterCommand signature, pass only bare minimum - clean-up `help` & `commands`. update settings `set`, `get` and `del` - allow our custom test suite to run command-line tests - clean-up Stream IO to allow us to print large things into debug stream (for example, `eeprom.dump`) - send parsing errors to the debug log As a proof of concept, introduce `TERMINAL_MQTT_SUPPORT` and `TERMINAL_WEB_API_SUPPORT` - MQTT subscribes to the `<root>/cmd/set` and sends response to the `<root>/cmd`. We can't output too much, as we don't have any large-send API. - Web API listens to the `/api/cmd?apikey=...&line=...` (or PUT, params inside the body). This one is intended as a possible replacement of the `API_SUPPORT`. Internals introduce a 'task' around the AsyncWebServerRequest object that will simulate what WiFiClient does and push data into it continuously, switching between CONT and SYS. Both are experimental. We only accept a single command and not every command is updated to use Print `ctx.output` object. We are also somewhat limited by the Print / Stream overall, perhaps I am overestimating the usefulness of Arduino compatibility to such an extent :) Web API handler can also sometimes show only part of the result, whenever the command tries to yield() by itself waiting for something. Perhaps we would need to create a custom request handler for that specific use-case.
4 years ago
Terminal: change command-line parser (#2247) Change the underlying command line handling: - switch to a custom parser, inspired by redis / sds - update terminalRegisterCommand signature, pass only bare minimum - clean-up `help` & `commands`. update settings `set`, `get` and `del` - allow our custom test suite to run command-line tests - clean-up Stream IO to allow us to print large things into debug stream (for example, `eeprom.dump`) - send parsing errors to the debug log As a proof of concept, introduce `TERMINAL_MQTT_SUPPORT` and `TERMINAL_WEB_API_SUPPORT` - MQTT subscribes to the `<root>/cmd/set` and sends response to the `<root>/cmd`. We can't output too much, as we don't have any large-send API. - Web API listens to the `/api/cmd?apikey=...&line=...` (or PUT, params inside the body). This one is intended as a possible replacement of the `API_SUPPORT`. Internals introduce a 'task' around the AsyncWebServerRequest object that will simulate what WiFiClient does and push data into it continuously, switching between CONT and SYS. Both are experimental. We only accept a single command and not every command is updated to use Print `ctx.output` object. We are also somewhat limited by the Print / Stream overall, perhaps I am overestimating the usefulness of Arduino compatibility to such an extent :) Web API handler can also sometimes show only part of the result, whenever the command tries to yield() by itself waiting for something. Perhaps we would need to create a custom request handler for that specific use-case.
4 years ago
7 years ago
7 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. Updated secure client support by Niek van der Maas < mail at niekvandermaas dot nl>
  5. */
  6. #include "mqtt.h"
  7. #if MQTT_SUPPORT
  8. #include <forward_list>
  9. #include <utility>
  10. #include <Ticker.h>
  11. #include "system.h"
  12. #include "mdns.h"
  13. #include "mqtt.h"
  14. #include "ntp.h"
  15. #include "rpc.h"
  16. #include "rtcmem.h"
  17. #include "ws.h"
  18. #include "libs/AsyncClientHelpers.h"
  19. #include "libs/SecureClientHelpers.h"
  20. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  21. #include <ESPAsyncTCP.h>
  22. #include <AsyncMqttClient.h>
  23. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  24. #include <MQTTClient.h>
  25. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  26. #include <PubSubClient.h>
  27. #endif
  28. // -----------------------------------------------------------------------------
  29. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  30. AsyncMqttClient _mqtt;
  31. #else // MQTT_LIBRARY_ARDUINOMQTT / MQTT_LIBRARY_PUBSUBCLIENT
  32. WiFiClient _mqtt_client;
  33. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  34. std::unique_ptr<SecureClient> _mqtt_client_secure = nullptr;
  35. #if MQTT_SECURE_CLIENT_INCLUDE_CA
  36. #include "static/mqtt_client_trusted_root_ca.h" // Assumes this header file defines a _mqtt_client_trusted_root_ca[] PROGMEM = "...PEM data..."
  37. #else
  38. #include "static/letsencrypt_isrgroot_pem.h" // Default to LetsEncrypt X3 certificate
  39. #define _mqtt_client_trusted_root_ca _ssl_letsencrypt_isrg_x3_ca
  40. #endif // MQTT_SECURE_CLIENT_INCLUDE_CA
  41. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  42. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  43. MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE);
  44. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  45. PubSubClient _mqtt;
  46. #endif
  47. #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
  48. unsigned long _mqtt_last_connection = 0;
  49. AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
  50. bool _mqtt_skip_messages = false;
  51. unsigned long _mqtt_skip_time = MQTT_SKIP_TIME;
  52. unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  53. bool _mqtt_enabled = MQTT_ENABLED;
  54. bool _mqtt_use_json = false;
  55. bool _mqtt_retain = MQTT_RETAIN;
  56. int _mqtt_qos = MQTT_QOS;
  57. int _mqtt_keepalive = MQTT_KEEPALIVE;
  58. String _mqtt_topic;
  59. String _mqtt_topic_json;
  60. String _mqtt_setter;
  61. String _mqtt_getter;
  62. bool _mqtt_forward;
  63. String _mqtt_user;
  64. String _mqtt_pass;
  65. String _mqtt_will;
  66. String _mqtt_server;
  67. uint16_t _mqtt_port;
  68. String _mqtt_clientid;
  69. std::forward_list<heartbeat::Callback> _mqtt_heartbeat_callbacks;
  70. heartbeat::Mode _mqtt_heartbeat_mode;
  71. heartbeat::Seconds _mqtt_heartbeat_interval;
  72. String _mqtt_payload_online;
  73. String _mqtt_payload_offline;
  74. std::forward_list<mqtt_callback_f> _mqtt_callbacks;
  75. // -----------------------------------------------------------------------------
  76. // JSON payload
  77. // -----------------------------------------------------------------------------
  78. struct MqttPayload {
  79. MqttPayload() = delete;
  80. MqttPayload(const MqttPayload&) = default;
  81. // TODO: replace String implementation with Core v3 (or just use newer Core)
  82. // 2.7.x still has basic Arduino String move ctor that is not noexcept
  83. MqttPayload(MqttPayload&& other) noexcept :
  84. _topic(std::move(other._topic)),
  85. _message(std::move(other._message))
  86. {}
  87. template <typename Topic, typename Message>
  88. MqttPayload(Topic&& topic, Message&& message) :
  89. _topic(std::forward<Topic>(topic)),
  90. _message(std::forward<Message>(message))
  91. {}
  92. const String& topic() const {
  93. return _topic;
  94. }
  95. const String& message() const {
  96. return _message;
  97. }
  98. private:
  99. String _topic;
  100. String _message;
  101. };
  102. size_t _mqtt_json_payload_count { 0ul };
  103. std::forward_list<MqttPayload> _mqtt_json_payload;
  104. Ticker _mqtt_json_payload_flush;
  105. // -----------------------------------------------------------------------------
  106. // Secure client handlers
  107. // -----------------------------------------------------------------------------
  108. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  109. SecureClientConfig _mqtt_sc_config {
  110. "MQTT",
  111. []() -> String {
  112. return _mqtt_server;
  113. },
  114. []() -> int {
  115. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK);
  116. },
  117. []() -> String {
  118. return getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  119. },
  120. true
  121. };
  122. #endif
  123. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  124. SecureClientConfig _mqtt_sc_config {
  125. "MQTT",
  126. []() -> int {
  127. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK);
  128. },
  129. []() -> PGM_P {
  130. return _mqtt_client_trusted_root_ca;
  131. },
  132. []() -> String {
  133. return getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  134. },
  135. []() -> uint16_t {
  136. return getSetting("mqttScMFLN", MQTT_SECURE_CLIENT_MFLN);
  137. },
  138. true
  139. };
  140. #endif
  141. // -----------------------------------------------------------------------------
  142. // Client configuration & setup
  143. // -----------------------------------------------------------------------------
  144. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  145. void _mqttSetupAsyncClient(bool secure = false) {
  146. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  147. _mqtt.setClientId(_mqtt_clientid.c_str());
  148. _mqtt.setKeepAlive(_mqtt_keepalive);
  149. _mqtt.setCleanSession(false);
  150. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  151. if (_mqtt_user.length() && _mqtt_pass.length()) {
  152. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  153. _mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
  154. }
  155. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  156. if (secure) {
  157. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  158. _mqtt.setSecure(secure);
  159. }
  160. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  161. _mqtt.connect();
  162. }
  163. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  164. #if (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  165. WiFiClient& _mqttGetClient(bool secure) {
  166. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  167. return (secure ? _mqtt_client_secure->get() : _mqtt_client);
  168. #else
  169. return _mqtt_client;
  170. #endif
  171. }
  172. bool _mqttSetupSyncClient(bool secure = false) {
  173. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  174. if (secure) {
  175. if (!_mqtt_client_secure) _mqtt_client_secure = std::make_unique<SecureClient>(_mqtt_sc_config);
  176. return _mqtt_client_secure->beforeConnected();
  177. }
  178. #endif
  179. return true;
  180. }
  181. bool _mqttConnectSyncClient(bool secure = false) {
  182. bool result = false;
  183. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  184. _mqtt.begin(_mqtt_server.c_str(), _mqtt_port, _mqttGetClient(secure));
  185. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_payload_offline.c_str(), _mqtt_retain, _mqtt_qos);
  186. _mqtt.setKeepAlive(_mqtt_keepalive);
  187. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str());
  188. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  189. _mqtt.setClient(_mqttGetClient(secure));
  190. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  191. if (_mqtt_user.length() && _mqtt_pass.length()) {
  192. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  193. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  194. } else {
  195. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  196. }
  197. #endif
  198. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  199. if (result && secure) {
  200. result = _mqtt_client_secure->afterConnected();
  201. }
  202. #endif
  203. return result;
  204. }
  205. #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  206. void _mqttPlaceholders(String& text) {
  207. text.replace("{hostname}", getSetting("hostname"));
  208. text.replace("{magnitude}", "#");
  209. String mac = WiFi.macAddress();
  210. mac.replace(":", "");
  211. text.replace("{mac}", mac);
  212. }
  213. template<typename T>
  214. void _mqttApplySetting(T& current, T& updated) {
  215. if (current != updated) {
  216. current = std::move(updated);
  217. mqttDisconnect();
  218. }
  219. }
  220. template<typename T>
  221. void _mqttApplySetting(T& current, const T& updated) {
  222. if (current != updated) {
  223. current = updated;
  224. mqttDisconnect();
  225. }
  226. }
  227. template<typename T>
  228. void _mqttApplyTopic(T& current, const char* magnitude) {
  229. String updated = mqttTopic(magnitude, false);
  230. if (current != updated) {
  231. mqttFlush();
  232. current = std::move(updated);
  233. }
  234. }
  235. void _mqttConfigure() {
  236. // Enable only when server is set
  237. {
  238. const String server = getSetting("mqttServer", MQTT_SERVER);
  239. const auto port = getSetting("mqttPort", static_cast<uint16_t>(MQTT_PORT));
  240. bool enabled = false;
  241. if (server.length()) {
  242. enabled = getSetting("mqttEnabled", 1 == MQTT_ENABLED);
  243. }
  244. _mqttApplySetting(_mqtt_server, server);
  245. _mqttApplySetting(_mqtt_enabled, enabled);
  246. _mqttApplySetting(_mqtt_port, port);
  247. if (!enabled) return;
  248. }
  249. // Get base topic and apply placeholders
  250. {
  251. String topic = getSetting("mqttTopic", MQTT_TOPIC);
  252. if (topic.endsWith("/")) topic.remove(topic.length()-1);
  253. // Replace things inside curly braces (like {hostname}, {mac} etc.)
  254. _mqttPlaceholders(topic);
  255. if (topic.indexOf("#") == -1) topic.concat("/#");
  256. _mqttApplySetting(_mqtt_topic, topic);
  257. }
  258. // Getter and setter
  259. {
  260. String setter = getSetting("mqttSetter", MQTT_SETTER);
  261. String getter = getSetting("mqttGetter", MQTT_GETTER);
  262. bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS;
  263. _mqttApplySetting(_mqtt_setter, setter);
  264. _mqttApplySetting(_mqtt_getter, getter);
  265. _mqttApplySetting(_mqtt_forward, forward);
  266. }
  267. // MQTT options
  268. {
  269. String user = getSetting("mqttUser", MQTT_USER);
  270. _mqttPlaceholders(user);
  271. String pass = getSetting("mqttPassword", MQTT_PASS);
  272. const auto qos = getSetting("mqttQoS", MQTT_QOS);
  273. const bool retain = getSetting("mqttRetain", 1 == MQTT_RETAIN);
  274. // Note: MQTT spec defines this as 2 bytes
  275. const auto keepalive = constrain(
  276. getSetting("mqttKeep", MQTT_KEEPALIVE),
  277. 0, std::numeric_limits<uint16_t>::max()
  278. );
  279. String id = getSetting("mqttClientID", getIdentifier());
  280. _mqttPlaceholders(id);
  281. _mqttApplySetting(_mqtt_user, user);
  282. _mqttApplySetting(_mqtt_pass, pass);
  283. _mqttApplySetting(_mqtt_qos, qos);
  284. _mqttApplySetting(_mqtt_retain, retain);
  285. _mqttApplySetting(_mqtt_keepalive, keepalive);
  286. _mqttApplySetting(_mqtt_clientid, id);
  287. _mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
  288. }
  289. // MQTT JSON
  290. {
  291. _mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", 1 == MQTT_USE_JSON));
  292. _mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
  293. }
  294. _mqttApplySetting(_mqtt_heartbeat_mode,
  295. getSetting("mqttHbMode", heartbeat::currentMode()));
  296. _mqttApplySetting(_mqtt_heartbeat_interval,
  297. getSetting("mqttHbIntvl", heartbeat::currentInterval()));
  298. // Skip messages in a small window right after the connection
  299. _mqtt_skip_time = getSetting("mqttSkipTime", MQTT_SKIP_TIME);
  300. // Custom payload strings
  301. settingsProcessConfig({
  302. {_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE},
  303. {_mqtt_payload_offline, "mqttPayloadOffline", MQTT_STATUS_OFFLINE}
  304. });
  305. // Reset reconnect delay to reconnect sooner
  306. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  307. }
  308. void _mqttBackwards() {
  309. String mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  310. if (mqttTopic.indexOf("{identifier}") > 0) {
  311. mqttTopic.replace("{identifier}", "{hostname}");
  312. setSetting("mqttTopic", mqttTopic);
  313. }
  314. }
  315. void _mqttInfo() {
  316. // Build information
  317. {
  318. #define __MQTT_INFO_STR(X) #X
  319. #define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X)
  320. DEBUG_MSG_P(PSTR(
  321. "[MQTT] "
  322. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  323. "AsyncMqttClient"
  324. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  325. "Arduino-MQTT"
  326. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  327. "PubSubClient"
  328. #endif
  329. ", SSL "
  330. #if SECURE_CLIENT != SEURE_CLIENT_NONE
  331. "ENABLED"
  332. #else
  333. "DISABLED"
  334. #endif
  335. ", Autoconnect "
  336. #if MQTT_AUTOCONNECT
  337. "ENABLED"
  338. #else
  339. "DISABLED"
  340. #endif
  341. ", Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " bytes"
  342. "\n"
  343. ));
  344. #undef _MQTT_INFO_STR
  345. #undef __MQTT_INFO_STR
  346. }
  347. // Notify about the general state of the client
  348. {
  349. const __FlashStringHelper* enabled = _mqtt_enabled
  350. ? F("ENABLED")
  351. : F("DISABLED");
  352. const __FlashStringHelper* state = nullptr;
  353. switch (_mqtt_state) {
  354. case AsyncClientState::Connecting:
  355. state = F("CONNECTING");
  356. break;
  357. case AsyncClientState::Connected:
  358. state = F("CONNECTED");
  359. break;
  360. case AsyncClientState::Disconnected:
  361. state = F("DISCONNECTED");
  362. break;
  363. case AsyncClientState::Disconnecting:
  364. state = F("DISCONNECTING");
  365. break;
  366. default:
  367. state = F("WAITING");
  368. break;
  369. }
  370. DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
  371. String(enabled).c_str(),
  372. String(state).c_str()
  373. );
  374. if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) {
  375. DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"),
  376. _mqtt_last_connection,
  377. _mqtt_reconnect_delay,
  378. MQTT_RECONNECT_DELAY_STEP
  379. );
  380. }
  381. }
  382. }
  383. // -----------------------------------------------------------------------------
  384. // WEB
  385. // -----------------------------------------------------------------------------
  386. #if WEB_SUPPORT
  387. bool _mqttWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  388. return (strncmp(key, "mqtt", 3) == 0);
  389. }
  390. void _mqttWebSocketOnVisible(JsonObject& root) {
  391. root["mqttVisible"] = 1;
  392. #if ASYNC_TCP_SSL_ENABLED
  393. root["mqttsslVisible"] = 1;
  394. #endif
  395. }
  396. void _mqttWebSocketOnData(JsonObject& root) {
  397. root["mqttStatus"] = mqttConnected();
  398. }
  399. void _mqttWebSocketOnConnected(JsonObject& root) {
  400. root["mqttEnabled"] = mqttEnabled();
  401. root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
  402. root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
  403. root["mqttUser"] = getSetting("mqttUser", MQTT_USER);
  404. root["mqttClientID"] = getSetting("mqttClientID");
  405. root["mqttPassword"] = getSetting("mqttPassword", MQTT_PASS);
  406. root["mqttKeep"] = _mqtt_keepalive;
  407. root["mqttRetain"] = _mqtt_retain;
  408. root["mqttQoS"] = _mqtt_qos;
  409. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  410. root["mqttUseSSL"] = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
  411. root["mqttFP"] = getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  412. #endif
  413. root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
  414. root["mqttUseJson"] = getSetting("mqttUseJson", 1 == MQTT_USE_JSON);
  415. }
  416. #endif
  417. // -----------------------------------------------------------------------------
  418. // SETTINGS
  419. // -----------------------------------------------------------------------------
  420. #if TERMINAL_SUPPORT
  421. void _mqttInitCommands() {
  422. terminalRegisterCommand(F("MQTT.RESET"), [](const terminal::CommandContext&) {
  423. _mqttConfigure();
  424. mqttDisconnect();
  425. terminalOK();
  426. });
  427. terminalRegisterCommand(F("MQTT.INFO"), [](const terminal::CommandContext&) {
  428. _mqttInfo();
  429. terminalOK();
  430. });
  431. }
  432. #endif // TERMINAL_SUPPORT
  433. // -----------------------------------------------------------------------------
  434. // MQTT Callbacks
  435. // -----------------------------------------------------------------------------
  436. void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
  437. if (type == MQTT_CONNECT_EVENT) {
  438. mqttSubscribe(MQTT_TOPIC_ACTION);
  439. }
  440. if (type == MQTT_MESSAGE_EVENT) {
  441. String t = mqttMagnitude(topic);
  442. if (t.equals(MQTT_TOPIC_ACTION)) {
  443. rpcHandleAction(payload);
  444. }
  445. }
  446. }
  447. bool _mqttHeartbeat(heartbeat::Mask mask) {
  448. // Backported from the older utils implementation.
  449. // Wait until the time is synced to avoid sending partial report *and*
  450. // as a result, wait until the next interval to actually send the datetime string.
  451. #if NTP_SUPPORT
  452. if ((mask & heartbeat::Report::Datetime) && !ntpSynced()) {
  453. return false;
  454. }
  455. #endif
  456. if (!mqttConnected()) {
  457. return false;
  458. }
  459. // TODO: rework old HEARTBEAT_REPEAT_STATUS?
  460. // for example: send full report once, send only the dynamic data after that
  461. // (interval, hostname, description, ssid, bssid, ip, mac, rssi, uptime, datetime, heap, loadavg, vcc)
  462. // otherwise, it is still possible by setting everything to 0 *but* the Report::Status bit
  463. // TODO: per-module mask?
  464. // TODO: simply send static data with onConnected, and the rest from here?
  465. if (mask & heartbeat::Report::Status)
  466. mqttSendStatus();
  467. if (mask & heartbeat::Report::Interval)
  468. mqttSend(MQTT_TOPIC_INTERVAL, String(_mqtt_heartbeat_interval.count()).c_str());
  469. if (mask & heartbeat::Report::App)
  470. mqttSend(MQTT_TOPIC_APP, APP_NAME);
  471. if (mask & heartbeat::Report::Version)
  472. mqttSend(MQTT_TOPIC_VERSION, getVersion().c_str());
  473. if (mask & heartbeat::Report::Board)
  474. mqttSend(MQTT_TOPIC_BOARD, getBoardName().c_str());
  475. if (mask & heartbeat::Report::Hostname)
  476. mqttSend(MQTT_TOPIC_HOSTNAME, getSetting("hostname", getIdentifier()).c_str());
  477. if (mask & heartbeat::Report::Description) {
  478. auto desc = getSetting("desc");
  479. if (desc.length()) {
  480. mqttSend(MQTT_TOPIC_DESCRIPTION, desc.c_str());
  481. }
  482. }
  483. if (mask & heartbeat::Report::Ssid)
  484. mqttSend(MQTT_TOPIC_SSID, WiFi.SSID().c_str());
  485. if (mask & heartbeat::Report::Bssid)
  486. mqttSend(MQTT_TOPIC_BSSID, WiFi.BSSIDstr().c_str());
  487. if (mask & heartbeat::Report::Ip)
  488. mqttSend(MQTT_TOPIC_IP, getIP().c_str());
  489. if (mask & heartbeat::Report::Mac)
  490. mqttSend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str());
  491. if (mask & heartbeat::Report::Rssi)
  492. mqttSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str());
  493. if (mask & heartbeat::Report::Uptime)
  494. mqttSend(MQTT_TOPIC_UPTIME, String(systemUptime()).c_str());
  495. #if NTP_SUPPORT
  496. if (mask & heartbeat::Report::Datetime)
  497. mqttSend(MQTT_TOPIC_DATETIME, ntpDateTime().c_str());
  498. #endif
  499. if (mask & heartbeat::Report::Freeheap) {
  500. auto stats = systemHeapStats();
  501. mqttSend(MQTT_TOPIC_FREEHEAP, String(stats.available).c_str());
  502. }
  503. if (mask & heartbeat::Report::Loadavg)
  504. mqttSend(MQTT_TOPIC_LOADAVG, String(systemLoadAverage()).c_str());
  505. if ((mask & heartbeat::Report::Vcc) && (ADC_MODE_VALUE == ADC_VCC))
  506. mqttSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str());
  507. auto status = mqttConnected();
  508. for (auto& cb : _mqtt_heartbeat_callbacks) {
  509. status = status && cb(mask);
  510. }
  511. return status;
  512. }
  513. void _mqttOnConnect() {
  514. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  515. _mqtt_last_connection = millis();
  516. _mqtt_state = AsyncClientState::Connected;
  517. systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval);
  518. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  519. // Clean subscriptions
  520. mqttUnsubscribeRaw("#");
  521. // Notify all subscribers about the connection
  522. for (auto& callback : _mqtt_callbacks) {
  523. callback(MQTT_CONNECT_EVENT, nullptr, nullptr);
  524. }
  525. }
  526. void _mqttOnDisconnect() {
  527. // Reset reconnection delay
  528. _mqtt_last_connection = millis();
  529. _mqtt_state = AsyncClientState::Disconnected;
  530. systemStopHeartbeat(_mqttHeartbeat);
  531. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  532. // Notify all subscribers about the disconnect
  533. for (auto& callback : _mqtt_callbacks) {
  534. callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr);
  535. }
  536. }
  537. // Force-skip everything received in a short window right after connecting to avoid syncronization issues.
  538. bool _mqttMaybeSkipRetained(char* topic) {
  539. if (_mqtt_skip_messages && (millis() - _mqtt_last_connection < _mqtt_skip_time)) {
  540. DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
  541. return true;
  542. }
  543. _mqtt_skip_messages = false;
  544. return false;
  545. }
  546. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  547. // MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could*
  548. // receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the
  549. // data until `(len + index) == total`.
  550. // TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String.
  551. // In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages.
  552. // TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks.
  553. void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  554. if (!len || (len > MQTT_BUFFER_MAX_SIZE) || (total > MQTT_BUFFER_MAX_SIZE)) return;
  555. if (_mqttMaybeSkipRetained(topic)) return;
  556. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  557. memmove(message + index, (char *) payload, len);
  558. // Not done yet
  559. if (total != (len + index)) {
  560. DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total);
  561. return;
  562. }
  563. message[len + index] = '\0';
  564. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  565. // Call subscribers with the message buffer
  566. for (auto& callback : _mqtt_callbacks) {
  567. callback(MQTT_MESSAGE_EVENT, topic, message);
  568. }
  569. }
  570. #else
  571. // Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/
  572. // TODO: consider reworking this (and async counterpart), giving callback func length of the message.
  573. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  574. if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return;
  575. if (_mqttMaybeSkipRetained(topic)) return;
  576. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  577. memmove(message, (char *) payload, len);
  578. message[len] = '\0';
  579. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  580. // Call subscribers with the message buffer
  581. for (auto& callback : _mqtt_callbacks) {
  582. callback(MQTT_MESSAGE_EVENT, topic, message);
  583. }
  584. }
  585. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  586. // -----------------------------------------------------------------------------
  587. // Public API
  588. // -----------------------------------------------------------------------------
  589. /**
  590. Returns the magnitude part of a topic
  591. @param topic the full MQTT topic
  592. @return String object with the magnitude part.
  593. */
  594. String mqttMagnitude(const char* topic) {
  595. String pattern = _mqtt_topic + _mqtt_setter;
  596. int position = pattern.indexOf("#");
  597. if (position == -1) return String();
  598. String start = pattern.substring(0, position);
  599. String end = pattern.substring(position + 1);
  600. String magnitude = String(topic);
  601. if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
  602. magnitude.replace(start, "");
  603. magnitude.replace(end, "");
  604. } else {
  605. magnitude = String();
  606. }
  607. return magnitude;
  608. }
  609. /**
  610. Returns a full MQTT topic from the magnitude
  611. @param magnitude the magnitude part of the topic.
  612. @param is_set whether to build a command topic (true)
  613. or a state topic (false).
  614. @return String full MQTT topic.
  615. */
  616. String mqttTopic(const char * magnitude, bool is_set) {
  617. String output = _mqtt_topic;
  618. output.replace("#", magnitude);
  619. output += is_set ? _mqtt_setter : _mqtt_getter;
  620. return output;
  621. }
  622. /**
  623. Returns a full MQTT topic from the magnitude
  624. @param magnitude the magnitude part of the topic.
  625. @param index index of the magnitude when more than one such magnitudes.
  626. @param is_set whether to build a command topic (true)
  627. or a state topic (false).
  628. @return String full MQTT topic.
  629. */
  630. String mqttTopic(const char * magnitude, unsigned int index, bool is_set) {
  631. char buffer[strlen(magnitude)+5];
  632. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), magnitude, index);
  633. return mqttTopic(buffer, is_set);
  634. }
  635. // -----------------------------------------------------------------------------
  636. bool mqttSendRaw(const char * topic, const char * message, bool retain) {
  637. constexpr size_t MessageLogMax { 128ul };
  638. if (_mqtt.connected()) {
  639. const unsigned int packetId {
  640. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  641. _mqtt.publish(topic, _mqtt_qos, retain, message)
  642. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  643. _mqtt.publish(topic, message, retain, _mqtt_qos)
  644. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  645. _mqtt.publish(topic, message, retain)
  646. #endif
  647. };
  648. const size_t message_len = strlen(message);
  649. if (message_len > MessageLogMax) {
  650. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, message_len, packetId);
  651. } else {
  652. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
  653. }
  654. return (packetId > 0);
  655. }
  656. return false;
  657. }
  658. bool mqttSendRaw(const char * topic, const char * message) {
  659. return mqttSendRaw(topic, message, _mqtt_retain);
  660. }
  661. void mqttSend(const char * topic, const char * message, bool force, bool retain) {
  662. // TODO: refactor JSON mode to trigger WS-like status payloads instead sending single topic+message?
  663. // (i.e. instead of {"relay/0": "1", ...} have {"relays": ["1"], ...})
  664. // Heartbeat handles periodic status dumps for everything, mqttSend alternative simply notifies the module to send it's status data
  665. if (!force && _mqtt_use_json) {
  666. mqttEnqueue(topic, message);
  667. _mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
  668. return;
  669. }
  670. mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain);
  671. }
  672. void mqttSend(const char * topic, const char * message, bool force) {
  673. mqttSend(topic, message, force, _mqtt_retain);
  674. }
  675. void mqttSend(const char * topic, const char * message) {
  676. mqttSend(topic, message, false);
  677. }
  678. void mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
  679. char buffer[strlen(topic)+5];
  680. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
  681. mqttSend(buffer, message, force, retain);
  682. }
  683. void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
  684. mqttSend(topic, index, message, force, _mqtt_retain);
  685. }
  686. void mqttSend(const char * topic, unsigned int index, const char * message) {
  687. mqttSend(topic, index, message, false);
  688. }
  689. // -----------------------------------------------------------------------------
  690. constexpr size_t MqttJsonPayloadBufferSize { 1024ul };
  691. void mqttFlush() {
  692. if (!_mqtt.connected()) {
  693. return;
  694. }
  695. if (_mqtt_json_payload.empty()) {
  696. return;
  697. }
  698. DynamicJsonBuffer jsonBuffer(MqttJsonPayloadBufferSize);
  699. JsonObject& root = jsonBuffer.createObject();
  700. #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
  701. if (ntpSynced()) {
  702. root[MQTT_TOPIC_DATETIME] = ntpDateTime();
  703. }
  704. #endif
  705. #if MQTT_ENQUEUE_MAC
  706. root[MQTT_TOPIC_MAC] = WiFi.macAddress();
  707. #endif
  708. #if MQTT_ENQUEUE_HOSTNAME
  709. root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname", getIdentifier());
  710. #endif
  711. #if MQTT_ENQUEUE_IP
  712. root[MQTT_TOPIC_IP] = getIP();
  713. #endif
  714. #if MQTT_ENQUEUE_MESSAGE_ID
  715. root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
  716. #endif
  717. for (auto& payload : _mqtt_json_payload) {
  718. root[payload.topic().c_str()] = payload.message().c_str();
  719. }
  720. String output;
  721. root.printTo(output);
  722. jsonBuffer.clear();
  723. _mqtt_json_payload_count = 0;
  724. _mqtt_json_payload.clear();
  725. mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
  726. }
  727. void mqttEnqueue(const char* topic, const char* message) {
  728. // Queue is not meant to send message "offline"
  729. // We must prevent the queue does not get full while offline
  730. if (_mqtt.connected()) {
  731. if (_mqtt_json_payload_count >= MQTT_QUEUE_MAX_SIZE) {
  732. mqttFlush();
  733. }
  734. _mqtt_json_payload.remove_if([topic](const MqttPayload& payload) {
  735. return payload.topic() == topic;
  736. });
  737. _mqtt_json_payload.emplace_front(topic, message);
  738. ++_mqtt_json_payload_count;
  739. }
  740. }
  741. // -----------------------------------------------------------------------------
  742. void mqttSubscribeRaw(const char * topic) {
  743. if (_mqtt.connected() && (strlen(topic) > 0)) {
  744. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  745. unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
  746. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
  747. #else // Arduino-MQTT or PubSubClient
  748. _mqtt.subscribe(topic, _mqtt_qos);
  749. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
  750. #endif
  751. }
  752. }
  753. void mqttSubscribe(const char * topic) {
  754. mqttSubscribeRaw(mqttTopic(topic, true).c_str());
  755. }
  756. void mqttUnsubscribeRaw(const char * topic) {
  757. if (_mqtt.connected() && (strlen(topic) > 0)) {
  758. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  759. unsigned int packetId = _mqtt.unsubscribe(topic);
  760. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
  761. #else // Arduino-MQTT or PubSubClient
  762. _mqtt.unsubscribe(topic);
  763. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
  764. #endif
  765. }
  766. }
  767. void mqttUnsubscribe(const char * topic) {
  768. mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
  769. }
  770. // -----------------------------------------------------------------------------
  771. void mqttEnabled(bool status) {
  772. _mqtt_enabled = status;
  773. }
  774. bool mqttEnabled() {
  775. return _mqtt_enabled;
  776. }
  777. bool mqttConnected() {
  778. return _mqtt.connected();
  779. }
  780. void mqttDisconnect() {
  781. if (_mqtt.connected()) {
  782. DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
  783. _mqtt.disconnect();
  784. }
  785. }
  786. bool mqttForward() {
  787. return _mqtt_forward;
  788. }
  789. void mqttRegister(mqtt_callback_f callback) {
  790. _mqtt_callbacks.push_front(callback);
  791. }
  792. void mqttSetBroker(IPAddress ip, uint16_t port) {
  793. setSetting("mqttServer", ip.toString());
  794. _mqtt_server = ip.toString();
  795. setSetting("mqttPort", port);
  796. _mqtt_port = port;
  797. mqttEnabled(1 == MQTT_AUTOCONNECT);
  798. }
  799. void mqttSetBrokerIfNone(IPAddress ip, uint16_t port) {
  800. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
  801. mqttSetBroker(ip, port);
  802. }
  803. }
  804. const String& mqttPayloadOnline() {
  805. return _mqtt_payload_online;
  806. }
  807. const String& mqttPayloadOffline() {
  808. return _mqtt_payload_offline;
  809. }
  810. const char* mqttPayloadStatus(bool status) {
  811. return status ? _mqtt_payload_online.c_str() : _mqtt_payload_offline.c_str();
  812. }
  813. void mqttSendStatus() {
  814. mqttSend(MQTT_TOPIC_STATUS, _mqtt_payload_online.c_str(), true);
  815. }
  816. // -----------------------------------------------------------------------------
  817. // Initialization
  818. // -----------------------------------------------------------------------------
  819. void _mqttConnect() {
  820. // Do not connect if disabled
  821. if (!_mqtt_enabled) return;
  822. // Do not connect if already connected or still trying to connect
  823. if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return;
  824. // Check reconnect interval
  825. if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
  826. // Increase the reconnect delay
  827. _mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
  828. if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
  829. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
  830. }
  831. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%hu\n"), _mqtt_server.c_str(), _mqtt_port);
  832. DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
  833. DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
  834. DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %c\n"), _mqtt_retain ? 'Y' : 'N');
  835. DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %hu (s)\n"), _mqtt_keepalive);
  836. DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
  837. _mqtt_state = AsyncClientState::Connecting;
  838. _mqtt_skip_messages = (_mqtt_skip_time > 0);
  839. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  840. const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
  841. #else
  842. const bool secure = false;
  843. #endif
  844. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  845. _mqttSetupAsyncClient(secure);
  846. #elif (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  847. if (_mqttSetupSyncClient(secure) && _mqttConnectSyncClient(secure)) {
  848. _mqttOnConnect();
  849. } else {
  850. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  851. _mqttOnDisconnect();
  852. }
  853. #else
  854. #error "please check that MQTT_LIBRARY is valid"
  855. #endif
  856. }
  857. void mqttLoop() {
  858. if (WiFi.status() != WL_CONNECTED) return;
  859. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  860. _mqttConnect();
  861. #else // MQTT_LIBRARY != MQTT_LIBRARY_ASYNCMQTTCLIENT
  862. if (_mqtt.connected()) {
  863. _mqtt.loop();
  864. } else {
  865. if (_mqtt_state != AsyncClientState::Disconnected) {
  866. _mqttOnDisconnect();
  867. }
  868. _mqttConnect();
  869. }
  870. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  871. }
  872. void mqttHeartbeat(heartbeat::Callback callback) {
  873. _mqtt_heartbeat_callbacks.push_front(callback);
  874. }
  875. void mqttSetup() {
  876. _mqttBackwards();
  877. _mqttInfo();
  878. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  879. // XXX: should not place this in config, addServerFingerprint does not check for duplicates
  880. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  881. {
  882. if (_mqtt_sc_config.on_fingerprint) {
  883. const String fingerprint = _mqtt_sc_config.on_fingerprint();
  884. uint8_t buffer[20] = {0};
  885. if (sslFingerPrintArray(fingerprint.c_str(), buffer)) {
  886. _mqtt.addServerFingerprint(buffer);
  887. }
  888. }
  889. }
  890. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  891. _mqtt.onMessage(_mqttOnMessageAsync);
  892. _mqtt.onConnect([](bool) {
  893. _mqttOnConnect();
  894. });
  895. _mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
  896. DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %u\n"), packetId);
  897. });
  898. _mqtt.onPublish([](uint16_t packetId) {
  899. DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %u\n"), packetId);
  900. });
  901. _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  902. switch (reason) {
  903. case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED:
  904. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  905. break;
  906. case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED:
  907. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  908. break;
  909. case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE:
  910. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  911. break;
  912. case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS:
  913. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  914. break;
  915. case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED:
  916. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  917. break;
  918. case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT:
  919. #if ASYNC_TCP_SSL_ENABLED
  920. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  921. #endif
  922. break;
  923. case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
  924. // This is never used by the AsyncMqttClient source
  925. #if 0
  926. DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n"));
  927. #endif
  928. break;
  929. case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE:
  930. DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n"));
  931. break;
  932. }
  933. _mqttOnDisconnect();
  934. });
  935. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  936. _mqtt.onMessageAdvanced([](MQTTClient *client, char topic[], char payload[], int length) {
  937. _mqttOnMessage(topic, payload, length);
  938. });
  939. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  940. _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  941. _mqttOnMessage(topic, (char *) payload, length);
  942. });
  943. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  944. _mqttConfigure();
  945. mqttRegister(_mqttCallback);
  946. #if WEB_SUPPORT
  947. wsRegister()
  948. .onVisible(_mqttWebSocketOnVisible)
  949. .onData(_mqttWebSocketOnData)
  950. .onConnected(_mqttWebSocketOnConnected)
  951. .onKeyCheck(_mqttWebSocketOnKeyCheck);
  952. mqttRegister([](unsigned int type, const char*, const char*) {
  953. if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) {
  954. wsPost(_mqttWebSocketOnData);
  955. }
  956. });
  957. #endif
  958. #if TERMINAL_SUPPORT
  959. _mqttInitCommands();
  960. #endif
  961. // Main callbacks
  962. espurnaRegisterLoop(mqttLoop);
  963. espurnaRegisterReload(_mqttConfigure);
  964. }
  965. #endif // MQTT_SUPPORT