Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1335 lines
40 KiB

7 years ago
Terminal: change command-line parser (#2247) Change the underlying command line handling: - switch to a custom parser, inspired by redis / sds - update terminalRegisterCommand signature, pass only bare minimum - clean-up `help` & `commands`. update settings `set`, `get` and `del` - allow our custom test suite to run command-line tests - clean-up Stream IO to allow us to print large things into debug stream (for example, `eeprom.dump`) - send parsing errors to the debug log As a proof of concept, introduce `TERMINAL_MQTT_SUPPORT` and `TERMINAL_WEB_API_SUPPORT` - MQTT subscribes to the `<root>/cmd/set` and sends response to the `<root>/cmd`. We can't output too much, as we don't have any large-send API. - Web API listens to the `/api/cmd?apikey=...&line=...` (or PUT, params inside the body). This one is intended as a possible replacement of the `API_SUPPORT`. Internals introduce a 'task' around the AsyncWebServerRequest object that will simulate what WiFiClient does and push data into it continuously, switching between CONT and SYS. Both are experimental. We only accept a single command and not every command is updated to use Print `ctx.output` object. We are also somewhat limited by the Print / Stream overall, perhaps I am overestimating the usefulness of Arduino compatibility to such an extent :) Web API handler can also sometimes show only part of the result, whenever the command tries to yield() by itself waiting for something. Perhaps we would need to create a custom request handler for that specific use-case.
4 years ago
Terminal: change command-line parser (#2247) Change the underlying command line handling: - switch to a custom parser, inspired by redis / sds - update terminalRegisterCommand signature, pass only bare minimum - clean-up `help` & `commands`. update settings `set`, `get` and `del` - allow our custom test suite to run command-line tests - clean-up Stream IO to allow us to print large things into debug stream (for example, `eeprom.dump`) - send parsing errors to the debug log As a proof of concept, introduce `TERMINAL_MQTT_SUPPORT` and `TERMINAL_WEB_API_SUPPORT` - MQTT subscribes to the `<root>/cmd/set` and sends response to the `<root>/cmd`. We can't output too much, as we don't have any large-send API. - Web API listens to the `/api/cmd?apikey=...&line=...` (or PUT, params inside the body). This one is intended as a possible replacement of the `API_SUPPORT`. Internals introduce a 'task' around the AsyncWebServerRequest object that will simulate what WiFiClient does and push data into it continuously, switching between CONT and SYS. Both are experimental. We only accept a single command and not every command is updated to use Print `ctx.output` object. We are also somewhat limited by the Print / Stream overall, perhaps I am overestimating the usefulness of Arduino compatibility to such an extent :) Web API handler can also sometimes show only part of the result, whenever the command tries to yield() by itself waiting for something. Perhaps we would need to create a custom request handler for that specific use-case.
4 years ago
7 years ago
7 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. Updated secure client support by Niek van der Maas < mail at niekvandermaas dot nl>
  5. */
  6. #include "mqtt.h"
  7. #if MQTT_SUPPORT
  8. #include <forward_list>
  9. #include <utility>
  10. #include <Ticker.h>
  11. #include "system.h"
  12. #include "mdns.h"
  13. #include "mqtt.h"
  14. #include "ntp.h"
  15. #include "rpc.h"
  16. #include "rtcmem.h"
  17. #include "ws.h"
  18. #include "libs/AsyncClientHelpers.h"
  19. #include "libs/SecureClientHelpers.h"
  20. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  21. #include <ESPAsyncTCP.h>
  22. #include <AsyncMqttClient.h>
  23. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  24. #include <MQTTClient.h>
  25. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  26. #include <PubSubClient.h>
  27. #endif
  28. // -----------------------------------------------------------------------------
  29. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  30. AsyncMqttClient _mqtt;
  31. #else // MQTT_LIBRARY_ARDUINOMQTT / MQTT_LIBRARY_PUBSUBCLIENT
  32. WiFiClient _mqtt_client;
  33. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  34. std::unique_ptr<SecureClient> _mqtt_client_secure = nullptr;
  35. #if MQTT_SECURE_CLIENT_INCLUDE_CA
  36. #include "static/mqtt_client_trusted_root_ca.h" // Assumes this header file defines a _mqtt_client_trusted_root_ca[] PROGMEM = "...PEM data..."
  37. #else
  38. #include "static/letsencrypt_isrgroot_pem.h" // Default to LetsEncrypt X3 certificate
  39. #define _mqtt_client_trusted_root_ca _ssl_letsencrypt_isrg_x3_ca
  40. #endif // MQTT_SECURE_CLIENT_INCLUDE_CA
  41. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  42. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  43. MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE);
  44. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  45. PubSubClient _mqtt;
  46. #endif
  47. #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
  48. unsigned long _mqtt_last_connection = 0;
  49. AsyncClientState _mqtt_state = AsyncClientState::Disconnected;
  50. bool _mqtt_skip_messages = false;
  51. unsigned long _mqtt_skip_time = MQTT_SKIP_TIME;
  52. unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  53. bool _mqtt_enabled = MQTT_ENABLED;
  54. bool _mqtt_use_json = false;
  55. bool _mqtt_retain = MQTT_RETAIN;
  56. int _mqtt_qos = MQTT_QOS;
  57. int _mqtt_keepalive = MQTT_KEEPALIVE;
  58. String _mqtt_topic;
  59. String _mqtt_topic_json;
  60. String _mqtt_setter;
  61. String _mqtt_getter;
  62. bool _mqtt_forward;
  63. String _mqtt_user;
  64. String _mqtt_pass;
  65. String _mqtt_will;
  66. String _mqtt_server;
  67. uint16_t _mqtt_port;
  68. String _mqtt_clientid;
  69. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  70. struct MqttPidCallback {
  71. uint16_t pid;
  72. mqtt_pid_callback_f run;
  73. };
  74. using MqttPidCallbacks = std::forward_list<MqttPidCallback>;
  75. MqttPidCallbacks _mqtt_publish_callbacks;
  76. MqttPidCallbacks _mqtt_subscribe_callbacks;
  77. #endif
  78. std::forward_list<heartbeat::Callback> _mqtt_heartbeat_callbacks;
  79. heartbeat::Mode _mqtt_heartbeat_mode;
  80. heartbeat::Seconds _mqtt_heartbeat_interval;
  81. String _mqtt_payload_online;
  82. String _mqtt_payload_offline;
  83. std::forward_list<mqtt_callback_f> _mqtt_callbacks;
  84. // -----------------------------------------------------------------------------
  85. // JSON payload
  86. // -----------------------------------------------------------------------------
  87. struct MqttPayload {
  88. MqttPayload() = delete;
  89. MqttPayload(const MqttPayload&) = default;
  90. // TODO: replace String implementation with Core v3 (or just use newer Core)
  91. // 2.7.x still has basic Arduino String move ctor that is not noexcept
  92. MqttPayload(MqttPayload&& other) noexcept :
  93. _topic(std::move(other._topic)),
  94. _message(std::move(other._message))
  95. {}
  96. template <typename Topic, typename Message>
  97. MqttPayload(Topic&& topic, Message&& message) :
  98. _topic(std::forward<Topic>(topic)),
  99. _message(std::forward<Message>(message))
  100. {}
  101. const String& topic() const {
  102. return _topic;
  103. }
  104. const String& message() const {
  105. return _message;
  106. }
  107. private:
  108. String _topic;
  109. String _message;
  110. };
  111. size_t _mqtt_json_payload_count { 0ul };
  112. std::forward_list<MqttPayload> _mqtt_json_payload;
  113. Ticker _mqtt_json_payload_flush;
  114. // -----------------------------------------------------------------------------
  115. // Secure client handlers
  116. // -----------------------------------------------------------------------------
  117. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  118. SecureClientConfig _mqtt_sc_config {
  119. "MQTT",
  120. []() -> String {
  121. return _mqtt_server;
  122. },
  123. []() -> int {
  124. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK);
  125. },
  126. []() -> String {
  127. return getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  128. },
  129. true
  130. };
  131. #endif
  132. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  133. SecureClientConfig _mqtt_sc_config {
  134. "MQTT",
  135. []() -> int {
  136. return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK);
  137. },
  138. []() -> PGM_P {
  139. return _mqtt_client_trusted_root_ca;
  140. },
  141. []() -> String {
  142. return getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  143. },
  144. []() -> uint16_t {
  145. return getSetting("mqttScMFLN", MQTT_SECURE_CLIENT_MFLN);
  146. },
  147. true
  148. };
  149. #endif
  150. // -----------------------------------------------------------------------------
  151. // Client configuration & setup
  152. // -----------------------------------------------------------------------------
  153. // TODO: MQTT standard has some weird rules about session persistance on the broker
  154. // ref. 3.1.2.4 Clean Session, where we are uniquely identified by the client-id:
  155. // - subscriptions that are no longer useful are still there
  156. // unsub # will be acked, but we were never subbed to # to begin with ...
  157. // - we *will* receive messages that were sent using qos 1 or 2 while we were offline
  158. // which is only sort-of good, but MQTT broker v3 will never timeout those messages.
  159. // this would be the main reason for turning ON the clean session
  160. // - connecting with clean session ON will purge existing session *and* also prevent
  161. // the broker from caching the messages after the current connection ends.
  162. // there is no middle-ground, where previous session is removed but the current one is preserved
  163. // so, turning it ON <-> OFF during runtime is not very useful :/
  164. //
  165. // Pending MQTT v5 client
  166. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  167. void _mqttSetupAsyncClient(bool secure = false) {
  168. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  169. _mqtt.setClientId(_mqtt_clientid.c_str());
  170. _mqtt.setKeepAlive(_mqtt_keepalive);
  171. _mqtt.setCleanSession(false);
  172. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  173. if (_mqtt_user.length() && _mqtt_pass.length()) {
  174. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  175. _mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str());
  176. }
  177. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  178. if (secure) {
  179. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  180. _mqtt.setSecure(secure);
  181. }
  182. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  183. _mqtt.connect();
  184. }
  185. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  186. #if (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  187. WiFiClient& _mqttGetClient(bool secure) {
  188. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  189. return (secure ? _mqtt_client_secure->get() : _mqtt_client);
  190. #else
  191. return _mqtt_client;
  192. #endif
  193. }
  194. bool _mqttSetupSyncClient(bool secure = false) {
  195. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  196. if (secure) {
  197. if (!_mqtt_client_secure) _mqtt_client_secure = std::make_unique<SecureClient>(_mqtt_sc_config);
  198. return _mqtt_client_secure->beforeConnected();
  199. }
  200. #endif
  201. return true;
  202. }
  203. bool _mqttConnectSyncClient(bool secure = false) {
  204. bool result = false;
  205. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  206. _mqtt.begin(_mqtt_server.c_str(), _mqtt_port, _mqttGetClient(secure));
  207. _mqtt.setWill(_mqtt_will.c_str(), _mqtt_payload_offline.c_str(), _mqtt_retain, _mqtt_qos);
  208. _mqtt.setKeepAlive(_mqtt_keepalive);
  209. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str());
  210. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  211. _mqtt.setClient(_mqttGetClient(secure));
  212. _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port);
  213. if (_mqtt_user.length() && _mqtt_pass.length()) {
  214. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str());
  215. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  216. } else {
  217. result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str());
  218. }
  219. #endif
  220. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  221. if (result && secure) {
  222. result = _mqtt_client_secure->afterConnected();
  223. }
  224. #endif
  225. return result;
  226. }
  227. #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  228. void _mqttPlaceholders(String& text) {
  229. text.replace("{hostname}", getSetting("hostname"));
  230. text.replace("{magnitude}", "#");
  231. String mac = WiFi.macAddress();
  232. mac.replace(":", "");
  233. text.replace("{mac}", mac);
  234. }
  235. template<typename T>
  236. void _mqttApplySetting(T& current, T& updated) {
  237. if (current != updated) {
  238. current = std::move(updated);
  239. mqttDisconnect();
  240. }
  241. }
  242. template<typename T>
  243. void _mqttApplySetting(T& current, const T& updated) {
  244. if (current != updated) {
  245. current = updated;
  246. mqttDisconnect();
  247. }
  248. }
  249. template<typename T>
  250. void _mqttApplyTopic(T& current, const char* magnitude) {
  251. String updated = mqttTopic(magnitude, false);
  252. if (current != updated) {
  253. mqttFlush();
  254. current = std::move(updated);
  255. }
  256. }
  257. void _mqttConfigure() {
  258. // Enable only when server is set
  259. {
  260. const String server = getSetting("mqttServer", MQTT_SERVER);
  261. const auto port = getSetting("mqttPort", static_cast<uint16_t>(MQTT_PORT));
  262. bool enabled = false;
  263. if (server.length()) {
  264. enabled = getSetting("mqttEnabled", 1 == MQTT_ENABLED);
  265. }
  266. _mqttApplySetting(_mqtt_server, server);
  267. _mqttApplySetting(_mqtt_enabled, enabled);
  268. _mqttApplySetting(_mqtt_port, port);
  269. if (!enabled) return;
  270. }
  271. // Get base topic and apply placeholders
  272. {
  273. String topic = getSetting("mqttTopic", MQTT_TOPIC);
  274. if (topic.endsWith("/")) topic.remove(topic.length()-1);
  275. // Replace things inside curly braces (like {hostname}, {mac} etc.)
  276. _mqttPlaceholders(topic);
  277. if (topic.indexOf("#") == -1) topic.concat("/#");
  278. _mqttApplySetting(_mqtt_topic, topic);
  279. }
  280. // Getter and setter
  281. {
  282. String setter = getSetting("mqttSetter", MQTT_SETTER);
  283. String getter = getSetting("mqttGetter", MQTT_GETTER);
  284. bool forward = !setter.equals(getter);
  285. _mqttApplySetting(_mqtt_setter, setter);
  286. _mqttApplySetting(_mqtt_getter, getter);
  287. _mqttApplySetting(_mqtt_forward, forward);
  288. }
  289. // MQTT options
  290. {
  291. String user = getSetting("mqttUser", MQTT_USER);
  292. _mqttPlaceholders(user);
  293. String pass = getSetting("mqttPassword", MQTT_PASS);
  294. const auto qos = getSetting("mqttQoS", MQTT_QOS);
  295. const bool retain = getSetting("mqttRetain", 1 == MQTT_RETAIN);
  296. // Note: MQTT spec defines this as 2 bytes
  297. const auto keepalive = constrain(
  298. getSetting("mqttKeep", MQTT_KEEPALIVE),
  299. 0, std::numeric_limits<uint16_t>::max()
  300. );
  301. String id = getSetting("mqttClientID", getIdentifier());
  302. _mqttPlaceholders(id);
  303. _mqttApplySetting(_mqtt_user, user);
  304. _mqttApplySetting(_mqtt_pass, pass);
  305. _mqttApplySetting(_mqtt_qos, qos);
  306. _mqttApplySetting(_mqtt_retain, retain);
  307. _mqttApplySetting(_mqtt_keepalive, keepalive);
  308. _mqttApplySetting(_mqtt_clientid, id);
  309. _mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS);
  310. }
  311. // MQTT JSON
  312. {
  313. _mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", 1 == MQTT_USE_JSON));
  314. _mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON);
  315. }
  316. _mqttApplySetting(_mqtt_heartbeat_mode,
  317. getSetting("mqttHbMode", heartbeat::currentMode()));
  318. _mqttApplySetting(_mqtt_heartbeat_interval,
  319. getSetting("mqttHbIntvl", heartbeat::currentInterval()));
  320. // Skip messages in a small window right after the connection
  321. _mqtt_skip_time = getSetting("mqttSkipTime", MQTT_SKIP_TIME);
  322. // Custom payload strings
  323. settingsProcessConfig({
  324. {_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE},
  325. {_mqtt_payload_offline, "mqttPayloadOffline", MQTT_STATUS_OFFLINE}
  326. });
  327. // Reset reconnect delay to reconnect sooner
  328. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  329. }
  330. void _mqttBackwards() {
  331. String mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  332. if (mqttTopic.indexOf("{identifier}") > 0) {
  333. mqttTopic.replace("{identifier}", "{hostname}");
  334. setSetting("mqttTopic", mqttTopic);
  335. }
  336. }
  337. void _mqttInfo() {
  338. // Build information
  339. {
  340. #define __MQTT_INFO_STR(X) #X
  341. #define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X)
  342. DEBUG_MSG_P(PSTR(
  343. "[MQTT] "
  344. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  345. "AsyncMqttClient"
  346. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  347. "Arduino-MQTT"
  348. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  349. "PubSubClient"
  350. #endif
  351. ", SSL "
  352. #if SECURE_CLIENT != SEURE_CLIENT_NONE
  353. "ENABLED"
  354. #else
  355. "DISABLED"
  356. #endif
  357. ", Autoconnect "
  358. #if MQTT_AUTOCONNECT
  359. "ENABLED"
  360. #else
  361. "DISABLED"
  362. #endif
  363. ", Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " bytes"
  364. "\n"
  365. ));
  366. #undef _MQTT_INFO_STR
  367. #undef __MQTT_INFO_STR
  368. }
  369. // Notify about the general state of the client
  370. {
  371. const __FlashStringHelper* enabled = _mqtt_enabled
  372. ? F("ENABLED")
  373. : F("DISABLED");
  374. const __FlashStringHelper* state = nullptr;
  375. switch (_mqtt_state) {
  376. case AsyncClientState::Connecting:
  377. state = F("CONNECTING");
  378. break;
  379. case AsyncClientState::Connected:
  380. state = F("CONNECTED");
  381. break;
  382. case AsyncClientState::Disconnected:
  383. state = F("DISCONNECTED");
  384. break;
  385. case AsyncClientState::Disconnecting:
  386. state = F("DISCONNECTING");
  387. break;
  388. default:
  389. state = F("WAITING");
  390. break;
  391. }
  392. DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"),
  393. String(enabled).c_str(),
  394. String(state).c_str()
  395. );
  396. if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) {
  397. DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"),
  398. _mqtt_last_connection,
  399. _mqtt_reconnect_delay,
  400. MQTT_RECONNECT_DELAY_STEP
  401. );
  402. }
  403. }
  404. }
  405. // -----------------------------------------------------------------------------
  406. // WEB
  407. // -----------------------------------------------------------------------------
  408. #if WEB_SUPPORT
  409. bool _mqttWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  410. return (strncmp(key, "mqtt", 3) == 0);
  411. }
  412. void _mqttWebSocketOnVisible(JsonObject& root) {
  413. root["mqttVisible"] = 1;
  414. #if ASYNC_TCP_SSL_ENABLED
  415. root["mqttsslVisible"] = 1;
  416. #endif
  417. }
  418. void _mqttWebSocketOnData(JsonObject& root) {
  419. root["mqttStatus"] = mqttConnected();
  420. }
  421. void _mqttWebSocketOnConnected(JsonObject& root) {
  422. root["mqttEnabled"] = mqttEnabled();
  423. root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
  424. root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
  425. root["mqttUser"] = getSetting("mqttUser", MQTT_USER);
  426. root["mqttClientID"] = getSetting("mqttClientID");
  427. root["mqttPassword"] = getSetting("mqttPassword", MQTT_PASS);
  428. root["mqttKeep"] = _mqtt_keepalive;
  429. root["mqttRetain"] = _mqtt_retain;
  430. root["mqttQoS"] = _mqtt_qos;
  431. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  432. root["mqttUseSSL"] = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
  433. root["mqttFP"] = getSetting("mqttFP", MQTT_SSL_FINGERPRINT);
  434. #endif
  435. root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
  436. root["mqttUseJson"] = getSetting("mqttUseJson", 1 == MQTT_USE_JSON);
  437. }
  438. #endif
  439. // -----------------------------------------------------------------------------
  440. // SETTINGS
  441. // -----------------------------------------------------------------------------
  442. #if TERMINAL_SUPPORT
  443. void _mqttInitCommands() {
  444. terminalRegisterCommand(F("MQTT.RESET"), [](const terminal::CommandContext&) {
  445. _mqttConfigure();
  446. mqttDisconnect();
  447. terminalOK();
  448. });
  449. terminalRegisterCommand(F("MQTT.INFO"), [](const terminal::CommandContext&) {
  450. _mqttInfo();
  451. terminalOK();
  452. });
  453. }
  454. #endif // TERMINAL_SUPPORT
  455. // -----------------------------------------------------------------------------
  456. // MQTT Callbacks
  457. // -----------------------------------------------------------------------------
  458. void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
  459. if (type == MQTT_CONNECT_EVENT) {
  460. mqttSubscribe(MQTT_TOPIC_ACTION);
  461. }
  462. if (type == MQTT_MESSAGE_EVENT) {
  463. String t = mqttMagnitude(topic);
  464. if (t.equals(MQTT_TOPIC_ACTION)) {
  465. rpcHandleAction(payload);
  466. }
  467. }
  468. }
  469. bool _mqttHeartbeat(heartbeat::Mask mask) {
  470. // No point retrying, since we will be re-scheduled on connection
  471. if (!mqttConnected()) {
  472. return true;
  473. }
  474. #if NTP_SUPPORT
  475. // Backported from the older utils implementation.
  476. // Wait until the time is synced to avoid sending partial report *and*
  477. // as a result, wait until the next interval to actually send the datetime string.
  478. if ((mask & heartbeat::Report::Datetime) && !ntpSynced()) {
  479. return false;
  480. }
  481. #endif
  482. // TODO: rework old HEARTBEAT_REPEAT_STATUS?
  483. // for example: send full report once, send only the dynamic data after that
  484. // (interval, hostname, description, ssid, bssid, ip, mac, rssi, uptime, datetime, heap, loadavg, vcc)
  485. // otherwise, it is still possible by setting everything to 0 *but* the Report::Status bit
  486. // TODO: per-module mask?
  487. // TODO: simply send static data with onConnected, and the rest from here?
  488. if (mask & heartbeat::Report::Status)
  489. mqttSendStatus();
  490. if (mask & heartbeat::Report::Interval)
  491. mqttSend(MQTT_TOPIC_INTERVAL, String(_mqtt_heartbeat_interval.count()).c_str());
  492. if (mask & heartbeat::Report::App)
  493. mqttSend(MQTT_TOPIC_APP, APP_NAME);
  494. if (mask & heartbeat::Report::Version)
  495. mqttSend(MQTT_TOPIC_VERSION, getVersion().c_str());
  496. if (mask & heartbeat::Report::Board)
  497. mqttSend(MQTT_TOPIC_BOARD, getBoardName().c_str());
  498. if (mask & heartbeat::Report::Hostname)
  499. mqttSend(MQTT_TOPIC_HOSTNAME, getSetting("hostname", getIdentifier()).c_str());
  500. if (mask & heartbeat::Report::Description) {
  501. auto desc = getSetting("desc");
  502. if (desc.length()) {
  503. mqttSend(MQTT_TOPIC_DESCRIPTION, desc.c_str());
  504. }
  505. }
  506. if (mask & heartbeat::Report::Ssid)
  507. mqttSend(MQTT_TOPIC_SSID, WiFi.SSID().c_str());
  508. if (mask & heartbeat::Report::Bssid)
  509. mqttSend(MQTT_TOPIC_BSSID, WiFi.BSSIDstr().c_str());
  510. if (mask & heartbeat::Report::Ip)
  511. mqttSend(MQTT_TOPIC_IP, wifiStaIp().toString().c_str());
  512. if (mask & heartbeat::Report::Mac)
  513. mqttSend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str());
  514. if (mask & heartbeat::Report::Rssi)
  515. mqttSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str());
  516. if (mask & heartbeat::Report::Uptime)
  517. mqttSend(MQTT_TOPIC_UPTIME, String(systemUptime()).c_str());
  518. #if NTP_SUPPORT
  519. if (mask & heartbeat::Report::Datetime)
  520. mqttSend(MQTT_TOPIC_DATETIME, ntpDateTime().c_str());
  521. #endif
  522. if (mask & heartbeat::Report::Freeheap) {
  523. auto stats = systemHeapStats();
  524. mqttSend(MQTT_TOPIC_FREEHEAP, String(stats.available).c_str());
  525. }
  526. if (mask & heartbeat::Report::Loadavg)
  527. mqttSend(MQTT_TOPIC_LOADAVG, String(systemLoadAverage()).c_str());
  528. if ((mask & heartbeat::Report::Vcc) && (ADC_MODE_VALUE == ADC_VCC))
  529. mqttSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str());
  530. auto status = mqttConnected();
  531. for (auto& cb : _mqtt_heartbeat_callbacks) {
  532. status = status && cb(mask);
  533. }
  534. return status;
  535. }
  536. void _mqttOnConnect() {
  537. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
  538. _mqtt_last_connection = millis();
  539. _mqtt_state = AsyncClientState::Connected;
  540. systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval);
  541. // Notify all subscribers about the connection
  542. for (auto& callback : _mqtt_callbacks) {
  543. callback(MQTT_CONNECT_EVENT, nullptr, nullptr);
  544. }
  545. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  546. }
  547. void _mqttOnDisconnect() {
  548. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  549. _mqtt_publish_callbacks.clear();
  550. _mqtt_subscribe_callbacks.clear();
  551. #endif
  552. _mqtt_last_connection = millis();
  553. _mqtt_state = AsyncClientState::Disconnected;
  554. systemStopHeartbeat(_mqttHeartbeat);
  555. // Notify all subscribers about the disconnect
  556. for (auto& callback : _mqtt_callbacks) {
  557. callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr);
  558. }
  559. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  560. }
  561. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  562. // Run the associated callback when message PID is acknowledged by the broker
  563. void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) {
  564. if (callbacks.empty()) {
  565. return;
  566. }
  567. auto end = callbacks.end();
  568. auto prev = callbacks.before_begin();
  569. auto it = callbacks.begin();
  570. while (it != end) {
  571. if ((*it).pid == pid) {
  572. (*it).run();
  573. it = callbacks.erase_after(prev);
  574. } else {
  575. prev = it;
  576. ++it;
  577. }
  578. }
  579. }
  580. #endif
  581. // Force-skip everything received in a short window right after connecting to avoid syncronization issues.
  582. bool _mqttMaybeSkipRetained(char* topic) {
  583. if (_mqtt_skip_messages && (millis() - _mqtt_last_connection < _mqtt_skip_time)) {
  584. DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
  585. return true;
  586. }
  587. _mqtt_skip_messages = false;
  588. return false;
  589. }
  590. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  591. // MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could*
  592. // receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the
  593. // data until `(len + index) == total`.
  594. // TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String.
  595. // In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages.
  596. // TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks.
  597. void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  598. if (!len || (len > MQTT_BUFFER_MAX_SIZE) || (total > MQTT_BUFFER_MAX_SIZE)) return;
  599. if (_mqttMaybeSkipRetained(topic)) return;
  600. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  601. memmove(message + index, (char *) payload, len);
  602. // Not done yet
  603. if (total != (len + index)) {
  604. DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total);
  605. return;
  606. }
  607. message[len + index] = '\0';
  608. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  609. // Call subscribers with the message buffer
  610. for (auto& callback : _mqtt_callbacks) {
  611. callback(MQTT_MESSAGE_EVENT, topic, message);
  612. }
  613. }
  614. #else
  615. // Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/
  616. // TODO: consider reworking this (and async counterpart), giving callback func length of the message.
  617. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  618. if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return;
  619. if (_mqttMaybeSkipRetained(topic)) return;
  620. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  621. memmove(message, (char *) payload, len);
  622. message[len] = '\0';
  623. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  624. // Call subscribers with the message buffer
  625. for (auto& callback : _mqtt_callbacks) {
  626. callback(MQTT_MESSAGE_EVENT, topic, message);
  627. }
  628. }
  629. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  630. // -----------------------------------------------------------------------------
  631. // Public API
  632. // -----------------------------------------------------------------------------
  633. /**
  634. Returns the magnitude part of a topic
  635. @param topic the full MQTT topic
  636. @return String object with the magnitude part.
  637. */
  638. String mqttMagnitude(const char* topic) {
  639. String output;
  640. String pattern = _mqtt_topic + _mqtt_setter;
  641. int position = pattern.indexOf("#");
  642. if (position >= 0) {
  643. String start = pattern.substring(0, position);
  644. String end = pattern.substring(position + 1);
  645. String magnitude(topic);
  646. if (magnitude.startsWith(start) && magnitude.endsWith(end)) {
  647. magnitude.replace(start, "");
  648. magnitude.replace(end, "");
  649. output = std::move(magnitude);
  650. }
  651. }
  652. return output;
  653. }
  654. /**
  655. Returns a full MQTT topic from the magnitude
  656. @param magnitude the magnitude part of the topic.
  657. @param is_set whether to build a command topic (true)
  658. or a state topic (false).
  659. @return String full MQTT topic.
  660. */
  661. String mqttTopic(const char* magnitude, bool is_set) {
  662. String output;
  663. output.reserve(strlen(magnitude)
  664. + _mqtt_topic.length()
  665. + _mqtt_setter.length()
  666. + _mqtt_getter.length());
  667. output += _mqtt_topic;
  668. output.replace("#", magnitude);
  669. output += is_set ? _mqtt_setter : _mqtt_getter;
  670. return output;
  671. }
  672. /**
  673. Returns a full MQTT topic from the magnitude
  674. @param magnitude the magnitude part of the topic.
  675. @param index index of the magnitude when more than one such magnitudes.
  676. @param is_set whether to build a command topic (true)
  677. or a state topic (false).
  678. @return String full MQTT topic.
  679. */
  680. String mqttTopic(const char* magnitude, unsigned int index, bool is_set) {
  681. String output;
  682. output.reserve(strlen(magnitude) + (sizeof(decltype(index)) * 4));
  683. output += magnitude;
  684. output += '/';
  685. output += index;
  686. return mqttTopic(output.c_str(), is_set);
  687. }
  688. // -----------------------------------------------------------------------------
  689. uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int qos) {
  690. constexpr size_t MessageLogMax { 128ul };
  691. if (_mqtt.connected()) {
  692. const unsigned int packetId {
  693. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  694. _mqtt.publish(topic, qos, retain, message)
  695. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  696. _mqtt.publish(topic, message, retain, qos)
  697. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  698. _mqtt.publish(topic, message, retain)
  699. #endif
  700. };
  701. const size_t message_len = strlen(message);
  702. if (message_len > MessageLogMax) {
  703. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, message_len, packetId);
  704. } else {
  705. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
  706. }
  707. return packetId;
  708. }
  709. return false;
  710. }
  711. uint16_t mqttSendRaw(const char * topic, const char * message, bool retain) {
  712. return mqttSendRaw(topic, message, retain, _mqtt_qos);
  713. }
  714. uint16_t mqttSendRaw(const char * topic, const char * message) {
  715. return mqttSendRaw(topic, message, _mqtt_retain, _mqtt_qos);
  716. }
  717. bool mqttSend(const char * topic, const char * message, bool force, bool retain) {
  718. if (!force && _mqtt_use_json) {
  719. mqttEnqueue(topic, message);
  720. _mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
  721. return true;
  722. }
  723. return mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain) > 0;
  724. }
  725. bool mqttSend(const char * topic, const char * message, bool force) {
  726. return mqttSend(topic, message, force, _mqtt_retain);
  727. }
  728. bool mqttSend(const char * topic, const char * message) {
  729. return mqttSend(topic, message, false);
  730. }
  731. bool mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) {
  732. char buffer[strlen(topic)+5];
  733. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
  734. return mqttSend(buffer, message, force, retain);
  735. }
  736. bool mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
  737. return mqttSend(topic, index, message, force, _mqtt_retain);
  738. }
  739. bool mqttSend(const char * topic, unsigned int index, const char * message) {
  740. return mqttSend(topic, index, message, false);
  741. }
  742. // -----------------------------------------------------------------------------
  743. constexpr size_t MqttJsonPayloadBufferSize { 1024ul };
  744. void mqttFlush() {
  745. if (!_mqtt.connected()) {
  746. return;
  747. }
  748. if (_mqtt_json_payload.empty()) {
  749. return;
  750. }
  751. DynamicJsonBuffer jsonBuffer(MqttJsonPayloadBufferSize);
  752. JsonObject& root = jsonBuffer.createObject();
  753. #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
  754. if (ntpSynced()) {
  755. root[MQTT_TOPIC_DATETIME] = ntpDateTime();
  756. }
  757. #endif
  758. #if MQTT_ENQUEUE_MAC
  759. root[MQTT_TOPIC_MAC] = WiFi.macAddress();
  760. #endif
  761. #if MQTT_ENQUEUE_HOSTNAME
  762. root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname", getIdentifier());
  763. #endif
  764. #if MQTT_ENQUEUE_IP
  765. root[MQTT_TOPIC_IP] = wifiStaIp().toString();
  766. #endif
  767. #if MQTT_ENQUEUE_MESSAGE_ID
  768. root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
  769. #endif
  770. for (auto& payload : _mqtt_json_payload) {
  771. root[payload.topic().c_str()] = payload.message().c_str();
  772. }
  773. String output;
  774. root.printTo(output);
  775. jsonBuffer.clear();
  776. _mqtt_json_payload_count = 0;
  777. _mqtt_json_payload.clear();
  778. mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false);
  779. }
  780. void mqttEnqueue(const char* topic, const char* message) {
  781. // Queue is not meant to send message "offline"
  782. // We must prevent the queue does not get full while offline
  783. if (_mqtt.connected()) {
  784. if (_mqtt_json_payload_count >= MQTT_QUEUE_MAX_SIZE) {
  785. mqttFlush();
  786. }
  787. _mqtt_json_payload.remove_if([topic](const MqttPayload& payload) {
  788. return payload.topic() == topic;
  789. });
  790. _mqtt_json_payload.emplace_front(topic, message);
  791. ++_mqtt_json_payload_count;
  792. }
  793. }
  794. // -----------------------------------------------------------------------------
  795. // Only async client returns resulting PID, sync libraries return either success (1) or failure (0)
  796. uint16_t mqttSubscribeRaw(const char* topic, int qos) {
  797. uint16_t pid { 0u };
  798. if (_mqtt.connected() && (strlen(topic) > 0)) {
  799. pid = _mqtt.subscribe(topic, qos);
  800. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, pid);
  801. }
  802. return pid;
  803. }
  804. uint16_t mqttSubscribeRaw(const char* topic) {
  805. return mqttSubscribeRaw(topic, _mqtt_qos);
  806. }
  807. bool mqttSubscribe(const char * topic) {
  808. return mqttSubscribeRaw(mqttTopic(topic, true).c_str(), _mqtt_qos);
  809. }
  810. uint16_t mqttUnsubscribeRaw(const char * topic) {
  811. uint16_t pid { 0u };
  812. if (_mqtt.connected() && (strlen(topic) > 0)) {
  813. pid = _mqtt.unsubscribe(topic);
  814. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing from %s (PID %d)\n"), topic, pid);
  815. }
  816. return pid;
  817. }
  818. bool mqttUnsubscribe(const char * topic) {
  819. return mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
  820. }
  821. // -----------------------------------------------------------------------------
  822. void mqttEnabled(bool status) {
  823. _mqtt_enabled = status;
  824. }
  825. bool mqttEnabled() {
  826. return _mqtt_enabled;
  827. }
  828. bool mqttConnected() {
  829. return _mqtt.connected();
  830. }
  831. void mqttDisconnect() {
  832. if (_mqtt.connected()) {
  833. DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
  834. _mqtt.disconnect();
  835. }
  836. }
  837. bool mqttForward() {
  838. return _mqtt_forward;
  839. }
  840. /**
  841. Register a persistent lifecycle callback
  842. @param standalone function pointer
  843. */
  844. void mqttRegister(mqtt_callback_f callback) {
  845. _mqtt_callbacks.push_front(callback);
  846. }
  847. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  848. /**
  849. Register a temporary publish callback
  850. @param callable object
  851. */
  852. void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f callback) {
  853. auto callable = MqttPidCallback { pid, callback };
  854. _mqtt_publish_callbacks.push_front(std::move(callable));
  855. }
  856. /**
  857. Register a temporary subscribe callback
  858. @param callable object
  859. */
  860. void mqttOnSubscribe(uint16_t pid, mqtt_pid_callback_f callback) {
  861. auto callable = MqttPidCallback { pid, callback };
  862. _mqtt_subscribe_callbacks.push_front(std::move(callable));
  863. }
  864. #endif
  865. void mqttSetBroker(IPAddress ip, uint16_t port) {
  866. setSetting("mqttServer", ip.toString());
  867. _mqtt_server = ip.toString();
  868. setSetting("mqttPort", port);
  869. _mqtt_port = port;
  870. mqttEnabled(1 == MQTT_AUTOCONNECT);
  871. }
  872. void mqttSetBrokerIfNone(IPAddress ip, uint16_t port) {
  873. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
  874. mqttSetBroker(ip, port);
  875. }
  876. }
  877. // TODO: these strings are only updated after running the configuration routine and when MQTT is *enabled*
  878. const String& mqttPayloadOnline() {
  879. return _mqtt_payload_online;
  880. }
  881. const String& mqttPayloadOffline() {
  882. return _mqtt_payload_offline;
  883. }
  884. const char* mqttPayloadStatus(bool status) {
  885. return status ? _mqtt_payload_online.c_str() : _mqtt_payload_offline.c_str();
  886. }
  887. void mqttSendStatus() {
  888. mqttSend(MQTT_TOPIC_STATUS, _mqtt_payload_online.c_str(), true);
  889. }
  890. // -----------------------------------------------------------------------------
  891. // Initialization
  892. // -----------------------------------------------------------------------------
  893. void _mqttConnect() {
  894. // Do not connect if already connected or still trying to connect
  895. if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return;
  896. // Do not connect if disabled or no WiFi
  897. if (!_mqtt_enabled || (!wifiConnected())) return;
  898. // Check reconnect interval
  899. if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
  900. // Increase the reconnect delay
  901. _mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
  902. if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
  903. _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
  904. }
  905. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%hu\n"), _mqtt_server.c_str(), _mqtt_port);
  906. DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str());
  907. DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos);
  908. DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %c\n"), _mqtt_retain ? 'Y' : 'N');
  909. DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %hu (s)\n"), _mqtt_keepalive);
  910. DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str());
  911. _mqtt_state = AsyncClientState::Connecting;
  912. _mqtt_skip_messages = (_mqtt_skip_time > 0);
  913. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  914. const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED);
  915. #else
  916. const bool secure = false;
  917. #endif
  918. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  919. _mqttSetupAsyncClient(secure);
  920. #elif (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  921. if (_mqttSetupSyncClient(secure) && _mqttConnectSyncClient(secure)) {
  922. _mqttOnConnect();
  923. } else {
  924. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  925. _mqttOnDisconnect();
  926. }
  927. #else
  928. #error "please check that MQTT_LIBRARY is valid"
  929. #endif
  930. }
  931. void mqttLoop() {
  932. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  933. _mqttConnect();
  934. #else
  935. if (_mqtt.connected()) {
  936. _mqtt.loop();
  937. } else {
  938. if (_mqtt_state != AsyncClientState::Disconnected) {
  939. _mqttOnDisconnect();
  940. }
  941. _mqttConnect();
  942. }
  943. #endif
  944. }
  945. void mqttHeartbeat(heartbeat::Callback callback) {
  946. _mqtt_heartbeat_callbacks.push_front(callback);
  947. }
  948. void mqttSetup() {
  949. _mqttBackwards();
  950. _mqttInfo();
  951. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  952. // XXX: should not place this in config, addServerFingerprint does not check for duplicates
  953. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  954. {
  955. if (_mqtt_sc_config.on_fingerprint) {
  956. const String fingerprint = _mqtt_sc_config.on_fingerprint();
  957. uint8_t buffer[20] = {0};
  958. if (sslFingerPrintArray(fingerprint.c_str(), buffer)) {
  959. _mqtt.addServerFingerprint(buffer);
  960. }
  961. }
  962. }
  963. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  964. _mqtt.onMessage(_mqttOnMessageAsync);
  965. _mqtt.onConnect([](bool) {
  966. _mqttOnConnect();
  967. });
  968. _mqtt.onSubscribe([](uint16_t pid, int) {
  969. _mqttPidCallback(_mqtt_subscribe_callbacks, pid);
  970. });
  971. _mqtt.onPublish([](uint16_t pid) {
  972. _mqttPidCallback(_mqtt_publish_callbacks, pid);
  973. });
  974. _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  975. switch (reason) {
  976. case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED:
  977. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  978. break;
  979. case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED:
  980. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  981. break;
  982. case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE:
  983. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  984. break;
  985. case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS:
  986. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  987. break;
  988. case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED:
  989. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  990. break;
  991. case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT:
  992. #if ASYNC_TCP_SSL_ENABLED
  993. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  994. #endif
  995. break;
  996. case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
  997. // This is never used by the AsyncMqttClient source
  998. #if 0
  999. DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n"));
  1000. #endif
  1001. break;
  1002. case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE:
  1003. DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n"));
  1004. break;
  1005. }
  1006. _mqttOnDisconnect();
  1007. });
  1008. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  1009. _mqtt.onMessageAdvanced([](MQTTClient* , char topic[], char payload[], int length) {
  1010. _mqttOnMessage(topic, payload, length);
  1011. });
  1012. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  1013. _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  1014. _mqttOnMessage(topic, (char *) payload, length);
  1015. });
  1016. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1017. _mqttConfigure();
  1018. mqttRegister(_mqttCallback);
  1019. #if WEB_SUPPORT
  1020. wsRegister()
  1021. .onVisible(_mqttWebSocketOnVisible)
  1022. .onData(_mqttWebSocketOnData)
  1023. .onConnected(_mqttWebSocketOnConnected)
  1024. .onKeyCheck(_mqttWebSocketOnKeyCheck);
  1025. mqttRegister([](unsigned int type, const char*, const char*) {
  1026. if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) {
  1027. wsPost(_mqttWebSocketOnData);
  1028. }
  1029. });
  1030. #endif
  1031. #if TERMINAL_SUPPORT
  1032. _mqttInitCommands();
  1033. #endif
  1034. // Main callbacks
  1035. espurnaRegisterLoop(mqttLoop);
  1036. espurnaRegisterReload(_mqttConfigure);
  1037. }
  1038. #endif // MQTT_SUPPORT