Mirror of espurna firmware for wireless switches and more
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1836 lines
53 KiB

6 years ago
6 years ago
6 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. Updated secure client support by Niek van der Maas < mail at niekvandermaas dot nl>
  5. */
  6. #include "espurna.h"
  7. #if MQTT_SUPPORT
  8. #include <forward_list>
  9. #include <utility>
  10. #include "system.h"
  11. #include "mdns.h"
  12. #include "mqtt.h"
  13. #include "ntp.h"
  14. #include "rpc.h"
  15. #include "rtcmem.h"
  16. #include "ws.h"
  17. #include "libs/AsyncClientHelpers.h"
  18. #include "libs/SecureClientHelpers.h"
  19. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  20. #include <ESPAsyncTCP.h>
  21. #include <AsyncMqttClient.h>
  22. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  23. #include <MQTTClient.h>
  24. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  25. #include <PubSubClient.h>
  26. #endif
  27. // -----------------------------------------------------------------------------
  28. namespace {
  29. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  30. AsyncMqttClient _mqtt;
  31. #else // MQTT_LIBRARY_ARDUINOMQTT / MQTT_LIBRARY_PUBSUBCLIENT
  32. WiFiClient _mqtt_client;
  33. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  34. std::unique_ptr<SecureClient> _mqtt_client_secure = nullptr;
  35. #if MQTT_SECURE_CLIENT_INCLUDE_CA
  36. #include "static/mqtt_client_trusted_root_ca.h" // Assumes this header file defines a _mqtt_client_trusted_root_ca[] PROGMEM = "...PEM data..."
  37. #else
  38. #include "static/letsencrypt_isrgroot_pem.h" // Default to LetsEncrypt X3 certificate
  39. #define _mqtt_client_trusted_root_ca _ssl_letsencrypt_isrg_x3_ca
  40. #endif // MQTT_SECURE_CLIENT_INCLUDE_CA
  41. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  42. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  43. MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE);
  44. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  45. PubSubClient _mqtt;
  46. #endif
  47. #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT
  48. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  49. struct MqttPidCallbackHandler {
  50. uint16_t pid;
  51. MqttPidCallback callback;
  52. };
  53. using MqttPidCallbacks = std::forward_list<MqttPidCallbackHandler>;
  54. MqttPidCallbacks _mqtt_publish_callbacks;
  55. MqttPidCallbacks _mqtt_subscribe_callbacks;
  56. #endif
  57. std::forward_list<espurna::heartbeat::Callback> _mqtt_heartbeat_callbacks;
  58. espurna::heartbeat::Mode _mqtt_heartbeat_mode;
  59. espurna::duration::Seconds _mqtt_heartbeat_interval;
  60. String _mqtt_payload_online;
  61. String _mqtt_payload_offline;
  62. std::forward_list<MqttCallback> _mqtt_callbacks;
  63. } // namespace
  64. // -----------------------------------------------------------------------------
  65. // Settings
  66. // -----------------------------------------------------------------------------
  67. namespace mqtt {
  68. using KeepAlive = std::chrono::duration<uint16_t>;
  69. } // namespace mqtt
  70. namespace espurna {
  71. namespace settings {
  72. namespace internal {
  73. template<>
  74. mqtt::KeepAlive convert(const String& value) {
  75. return mqtt::KeepAlive { convert<uint16_t>(value) };
  76. }
  77. String serialize(mqtt::KeepAlive value) {
  78. return serialize(value.count());
  79. }
  80. } // namespace internal
  81. } // namespace settings
  82. } // namespace espurna
  83. namespace mqtt {
  84. namespace build {
  85. namespace {
  86. static constexpr espurna::duration::Milliseconds SkipTime { MQTT_SKIP_TIME };
  87. static constexpr espurna::duration::Milliseconds ReconnectDelayMin { MQTT_RECONNECT_DELAY_MIN };
  88. static constexpr espurna::duration::Milliseconds ReconnectDelayMax { MQTT_RECONNECT_DELAY_MAX };
  89. static constexpr espurna::duration::Milliseconds ReconnectStep { MQTT_RECONNECT_DELAY_STEP };
  90. static constexpr size_t MessageLogMax { 128ul };
  91. PROGMEM_STRING(Server, MQTT_SERVER);
  92. constexpr uint16_t port() {
  93. return MQTT_PORT;
  94. }
  95. constexpr bool enabled() {
  96. return 1 == MQTT_ENABLED;
  97. }
  98. constexpr bool autoconnect() {
  99. return 1 == MQTT_AUTOCONNECT;
  100. }
  101. PROGMEM_STRING(Topic, MQTT_TOPIC);
  102. PROGMEM_STRING(Getter, MQTT_GETTER);
  103. PROGMEM_STRING(Setter, MQTT_SETTER);
  104. PROGMEM_STRING(User, MQTT_USER);
  105. PROGMEM_STRING(Password, MQTT_PASS);
  106. constexpr int qos() {
  107. return MQTT_QOS;
  108. }
  109. constexpr bool retain() {
  110. return 1 == MQTT_RETAIN;
  111. }
  112. static constexpr KeepAlive KeepaliveMin { 15 };
  113. static constexpr KeepAlive KeepaliveMax{ KeepAlive::max() };
  114. constexpr KeepAlive keepalive() {
  115. return KeepAlive { MQTT_KEEPALIVE };
  116. }
  117. static_assert(keepalive() >= KeepaliveMin, "");
  118. static_assert(keepalive() <= KeepaliveMax, "");
  119. PROGMEM_STRING(TopicWill, MQTT_TOPIC_STATUS);
  120. constexpr bool json() {
  121. return 1 == MQTT_USE_JSON;
  122. }
  123. static constexpr auto JsonDelay = espurna::duration::Milliseconds(MQTT_USE_JSON_DELAY);
  124. PROGMEM_STRING(TopicJson, MQTT_TOPIC_JSON);
  125. constexpr espurna::duration::Milliseconds skipTime() {
  126. return espurna::duration::Milliseconds(MQTT_SKIP_TIME);
  127. }
  128. PROGMEM_STRING(PayloadOnline, MQTT_STATUS_ONLINE);
  129. PROGMEM_STRING(PayloadOffline, MQTT_STATUS_OFFLINE);
  130. constexpr bool secure() {
  131. return 1 == MQTT_SSL_ENABLED;
  132. }
  133. int secureClientCheck() {
  134. return MQTT_SECURE_CLIENT_CHECK;
  135. }
  136. PROGMEM_STRING(Fingerprint, MQTT_SSL_FINGERPRINT);
  137. constexpr uint16_t mfln() {
  138. return MQTT_SECURE_CLIENT_MFLN;
  139. }
  140. } // namespace
  141. } // namespace build
  142. namespace settings {
  143. namespace keys {
  144. namespace {
  145. PROGMEM_STRING(Server, "mqttServer");
  146. PROGMEM_STRING(Port, "mqttPort");
  147. PROGMEM_STRING(Enabled, "mqttEnabled");
  148. PROGMEM_STRING(Autoconnect, "mqttAutoconnect");
  149. PROGMEM_STRING(Topic, "mqttTopic");
  150. PROGMEM_STRING(Getter, "mqttGetter");
  151. PROGMEM_STRING(Setter, "mqttSetter");
  152. PROGMEM_STRING(User, "mqttUser");
  153. PROGMEM_STRING(Password, "mqttPassword");
  154. PROGMEM_STRING(QoS, "mqttQoS");
  155. PROGMEM_STRING(Retain, "mqttRetain");
  156. PROGMEM_STRING(Keepalive, "mqttKeep");
  157. PROGMEM_STRING(ClientId, "mqttClientID");
  158. PROGMEM_STRING(TopicWill, "mqttWill");
  159. PROGMEM_STRING(UseJson, "mqttUseJson");
  160. PROGMEM_STRING(TopicJson, "mqttJson");
  161. PROGMEM_STRING(HeartbeatMode, "mqttHbMode");
  162. PROGMEM_STRING(HeartbeatInterval, "mqttHbIntvl");
  163. PROGMEM_STRING(SkipTime, "mqttSkipTime");
  164. PROGMEM_STRING(PayloadOnline, "mqttPayloadOnline");
  165. PROGMEM_STRING(PayloadOffline, "mqttPayloadOffline");
  166. PROGMEM_STRING(Secure, "mqttUseSSL");
  167. PROGMEM_STRING(Fingerprint, "mqttFP");
  168. PROGMEM_STRING(SecureClientCheck, "mqttScCheck");
  169. PROGMEM_STRING(SecureClientMfln, "mqttScMFLN");
  170. } // namespace
  171. } // namespace keys
  172. namespace {
  173. String server() {
  174. return getSetting(keys::Server, espurna::StringView(build::Server));
  175. }
  176. uint16_t port() {
  177. return getSetting(keys::Port, build::port());
  178. }
  179. bool enabled() {
  180. return getSetting(keys::Enabled, build::enabled());
  181. }
  182. bool autoconnect() {
  183. return getSetting(keys::Autoconnect, build::autoconnect());
  184. }
  185. String topic() {
  186. return getSetting(keys::Topic, espurna::StringView(build::Topic));
  187. }
  188. String getter() {
  189. return getSetting(keys::Getter, espurna::StringView(build::Getter));
  190. }
  191. String setter() {
  192. return getSetting(keys::Setter, espurna::StringView(build::Setter));
  193. }
  194. String user() {
  195. return getSetting(keys::User, espurna::StringView(build::User));
  196. }
  197. String password() {
  198. return getSetting(keys::Password, espurna::StringView(build::Password));
  199. }
  200. int qos() {
  201. return getSetting(keys::QoS, build::qos());
  202. }
  203. bool retain() {
  204. return getSetting(keys::Retain, build::retain());
  205. }
  206. KeepAlive keepalive() {
  207. return std::clamp(
  208. getSetting(keys::Keepalive, build::keepalive()),
  209. build::KeepaliveMin, build::KeepaliveMax);
  210. }
  211. String clientId() {
  212. return getSetting(keys::ClientId, systemIdentifier());
  213. }
  214. String topicWill() {
  215. return getSetting(keys::TopicWill, espurna::StringView(build::TopicWill));
  216. }
  217. bool json() {
  218. return getSetting(keys::UseJson, build::json());
  219. }
  220. String topicJson() {
  221. return getSetting(keys::TopicJson, espurna::StringView(build::TopicJson));
  222. }
  223. espurna::heartbeat::Mode heartbeatMode() {
  224. return getSetting(keys::HeartbeatMode, espurna::heartbeat::currentMode());
  225. }
  226. espurna::duration::Seconds heartbeatInterval() {
  227. return getSetting(keys::HeartbeatInterval, espurna::heartbeat::currentInterval());
  228. }
  229. espurna::duration::Milliseconds skipTime() {
  230. return getSetting(keys::SkipTime, build::skipTime());
  231. }
  232. String payloadOnline() {
  233. return getSetting(keys::PayloadOnline, espurna::StringView(build::PayloadOnline));
  234. }
  235. String payloadOffline() {
  236. return getSetting(keys::PayloadOffline, espurna::StringView(build::PayloadOffline));
  237. }
  238. [[gnu::unused]]
  239. bool secure() {
  240. return getSetting(keys::Secure, build::secure());
  241. }
  242. [[gnu::unused]]
  243. int secureClientCheck() {
  244. return getSetting(keys::SecureClientCheck, build::secureClientCheck());
  245. }
  246. [[gnu::unused]]
  247. String fingerprint() {
  248. return getSetting(keys::Fingerprint, espurna::StringView(build::Fingerprint));
  249. }
  250. [[gnu::unused]]
  251. uint16_t mfln() {
  252. return getSetting(keys::SecureClientMfln, build::mfln());
  253. }
  254. } // namespace
  255. namespace query {
  256. namespace {
  257. namespace internal {
  258. #define EXACT_VALUE(NAME, FUNC)\
  259. String NAME () {\
  260. return espurna::settings::internal::serialize(FUNC());\
  261. }
  262. EXACT_VALUE(port, settings::port)
  263. EXACT_VALUE(enabled, settings::enabled)
  264. EXACT_VALUE(autoconnect, settings::autoconnect)
  265. EXACT_VALUE(qos, settings::qos)
  266. EXACT_VALUE(retain, settings::retain)
  267. EXACT_VALUE(keepalive, settings::keepalive)
  268. EXACT_VALUE(json, settings::json)
  269. EXACT_VALUE(heartbeatMode, settings::heartbeatMode)
  270. EXACT_VALUE(heartbeatInterval, settings::heartbeatInterval)
  271. EXACT_VALUE(skipTime, settings::skipTime)
  272. #undef EXACT_VALUE
  273. } // namespace internal
  274. static constexpr espurna::settings::query::Setting Settings[] PROGMEM {
  275. {keys::Server, settings::server},
  276. {keys::Port, internal::port},
  277. {keys::Enabled, internal::enabled},
  278. {keys::Autoconnect, internal::autoconnect},
  279. {keys::Topic, settings::topic},
  280. {keys::Getter, settings::getter},
  281. {keys::Setter, settings::setter},
  282. {keys::User, settings::user},
  283. {keys::Password, settings::password},
  284. {keys::QoS, internal::qos},
  285. {keys::Retain, internal::retain},
  286. {keys::Keepalive, internal::keepalive},
  287. {keys::ClientId, settings::clientId},
  288. {keys::TopicWill, settings::topicWill},
  289. {keys::UseJson, internal::json},
  290. {keys::TopicJson, settings::topicJson},
  291. {keys::HeartbeatMode, internal::heartbeatMode},
  292. {keys::HeartbeatInterval, internal::heartbeatInterval},
  293. {keys::SkipTime, internal::skipTime},
  294. {keys::PayloadOnline, settings::payloadOnline},
  295. {keys::PayloadOffline, settings::payloadOffline},
  296. };
  297. bool checkSamePrefix(espurna::StringView key) {
  298. return espurna::settings::query::samePrefix(key, STRING_VIEW("mqtt"));
  299. }
  300. String findValueFrom(espurna::StringView key) {
  301. return espurna::settings::query::Setting::findValueFrom(Settings, key);
  302. }
  303. void setup() {
  304. ::settingsRegisterQueryHandler({
  305. .check = checkSamePrefix,
  306. .get = findValueFrom
  307. });
  308. }
  309. } // namespace
  310. } // namespace query
  311. } // namespace settings
  312. } // namespace mqtt
  313. namespace {
  314. using MqttTimeSource = espurna::time::CoreClock;
  315. MqttTimeSource::time_point _mqtt_last_connection{};
  316. MqttTimeSource::duration _mqtt_skip_time { mqtt::build::SkipTime };
  317. MqttTimeSource::duration _mqtt_reconnect_delay { mqtt::build::ReconnectDelayMin };
  318. AsyncClientState _mqtt_state { AsyncClientState::Disconnected };
  319. bool _mqtt_skip_messages { false };
  320. bool _mqtt_enabled { mqtt::build::enabled() };
  321. bool _mqtt_use_json { mqtt::build::json() };
  322. bool _mqtt_forward { false };
  323. struct MqttConnectionSettings {
  324. bool retain { mqtt::build::retain() };
  325. int qos { mqtt::build::qos() };
  326. mqtt::KeepAlive keepalive { mqtt::build::keepalive() };
  327. String topic;
  328. String topic_json;
  329. String setter;
  330. String getter;
  331. String user;
  332. String pass;
  333. String will;
  334. String server;
  335. uint16_t port { 0 };
  336. String clientId;
  337. };
  338. static MqttConnectionSettings _mqtt_settings;
  339. template <typename Lhs, typename Rhs>
  340. static void _mqttApplySetting(Lhs& lhs, Rhs&& rhs) {
  341. if (lhs != rhs) {
  342. lhs = std::forward<Rhs>(rhs);
  343. mqttDisconnect();
  344. }
  345. }
  346. // Can't have **any** MQTT placeholders but our own `{magnitude}`
  347. bool _mqttValidTopicString(espurna::StringView value) {
  348. size_t hash = 0;
  349. size_t plus = 0;
  350. for (auto it = value.begin(); it != value.end(); ++it) {
  351. switch (*it) {
  352. case '#':
  353. ++hash;
  354. break;
  355. case '+':
  356. ++plus;
  357. break;
  358. }
  359. }
  360. return (hash <= 1) && (plus == 0);
  361. }
  362. bool _mqttApplyValidTopicString(String& lhs, String&& rhs) {
  363. if (_mqttValidTopicString(rhs)) {
  364. _mqttApplySetting(lhs, std::move(rhs));
  365. return true;
  366. }
  367. mqttDisconnect();
  368. return false;
  369. }
  370. } // namespace
  371. // -----------------------------------------------------------------------------
  372. // JSON payload
  373. // -----------------------------------------------------------------------------
  374. namespace {
  375. struct MqttPayload {
  376. MqttPayload() = delete;
  377. MqttPayload(const MqttPayload&) = default;
  378. // TODO: replace String implementation with Core v3 (or just use newer Core)
  379. // 2.7.x still has basic Arduino String move ctor that is not noexcept
  380. MqttPayload(MqttPayload&& other) noexcept :
  381. _topic(std::move(other._topic)),
  382. _message(std::move(other._message))
  383. {}
  384. template <typename Topic, typename Message>
  385. MqttPayload(Topic&& topic, Message&& message) :
  386. _topic(std::forward<Topic>(topic)),
  387. _message(std::forward<Message>(message))
  388. {}
  389. const String& topic() const {
  390. return _topic;
  391. }
  392. const String& message() const {
  393. return _message;
  394. }
  395. private:
  396. String _topic;
  397. String _message;
  398. };
  399. size_t _mqtt_json_payload_count { 0ul };
  400. std::forward_list<MqttPayload> _mqtt_json_payload;
  401. espurna::timer::SystemTimer _mqtt_json_payload_flush;
  402. } // namespace
  403. // -----------------------------------------------------------------------------
  404. // Secure client handlers
  405. // -----------------------------------------------------------------------------
  406. namespace {
  407. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  408. SecureClientConfig _mqtt_sc_config {
  409. .tag = "MQTT",
  410. #if SECURE_CLIENT == SECURE_CLIENT_AXTLS
  411. .on_host = []() -> String {
  412. return _mqtt_server;
  413. },
  414. #endif
  415. .on_check = mqtt::settings::secureClientCheck,
  416. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  417. .on_certificate = []() -> const char* {
  418. return _mqtt_client_trusted_root_ca;
  419. },
  420. #endif
  421. .on_fingerprint = mqtt::settings::fingerprint,
  422. #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL
  423. .on_mfln = mqtt::settings::mfln,
  424. #endif
  425. .debug = true,
  426. };
  427. #endif
  428. } // namespace
  429. // -----------------------------------------------------------------------------
  430. // Client configuration & setup
  431. // -----------------------------------------------------------------------------
  432. namespace {
  433. // TODO: MQTT standard has some weird rules about session persistance on the broker
  434. // ref. 3.1.2.4 Clean Session, where we are uniquely identified by the client-id:
  435. // - subscriptions that are no longer useful are still there
  436. // unsub # will be acked, but we were never subbed to # to begin with ...
  437. // - we *will* receive messages that were sent using qos 1 or 2 while we were offline
  438. // which is only sort-of good, but MQTT broker v3 will never timeout those messages.
  439. // this would be the main reason for turning ON the clean session
  440. // - connecting with clean session ON will purge existing session *and* also prevent
  441. // the broker from caching the messages after the current connection ends.
  442. // there is no middle-ground, where previous session is removed but the current one is preserved
  443. // so, turning it ON <-> OFF during runtime is not very useful :/
  444. //
  445. // Pending MQTT v5 client
  446. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  447. void _mqttSetupAsyncClient(bool secure = false) {
  448. _mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_settings.port);
  449. _mqtt.setClientId(_mqtt_settings.clientId.c_str());
  450. _mqtt.setKeepAlive(_mqtt_settings.keepalive.count());
  451. _mqtt.setCleanSession(false);
  452. _mqtt.setWill(
  453. _mqtt_settings.will.c_str(),
  454. _mqtt_settings.qos,
  455. _mqtt_settings.retain,
  456. _mqtt_payload_offline.c_str());
  457. if (_mqtt_settings.user.length() && _mqtt_settings.pass.length()) {
  458. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_settings.user.c_str());
  459. _mqtt.setCredentials(
  460. _mqtt_settings.user.c_str(),
  461. _mqtt_settings.pass.c_str());
  462. }
  463. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  464. if (secure) {
  465. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  466. _mqtt.setSecure(secure);
  467. }
  468. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  469. _mqtt.connect();
  470. }
  471. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  472. #if (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  473. WiFiClient& _mqttGetClient(bool secure) {
  474. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  475. return (secure ? _mqtt_client_secure->get() : _mqtt_client);
  476. #else
  477. return _mqtt_client;
  478. #endif
  479. }
  480. bool _mqttSetupSyncClient(bool secure = false) {
  481. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  482. if (secure) {
  483. if (!_mqtt_client_secure) _mqtt_client_secure = std::make_unique<SecureClient>(_mqtt_sc_config);
  484. return _mqtt_client_secure->beforeConnected();
  485. }
  486. #endif
  487. return true;
  488. }
  489. bool _mqttConnectSyncClient(bool secure = false) {
  490. bool result = false;
  491. #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  492. _mqtt.begin(_mqtt_settings.server.c_str(),
  493. _mqtt_settings.port,
  494. _mqttGetClient(secure));
  495. _mqtt.setWill(_mqtt_settings.will.c_str(),
  496. _mqtt_payload_offline.c_str(),
  497. _mqtt_settings.retain, _mqtt_settings.qos);
  498. _mqtt.setKeepAlive(_mqtt_settings.keepalive.count());
  499. result = _mqtt.connect(
  500. _mqtt_settings.clientId.c_str(),
  501. _mqtt_settings.user.c_str(),
  502. _mqtt_settings.pass.c_str());
  503. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  504. _mqtt.setClient(_mqttGetClient(secure));
  505. _mqtt.setServer(_mqtt_settings.server.c_str(), _mqtt_port);
  506. if (_mqtt_settings.user.length() && _mqtt_settings.pass.length()) {
  507. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_settings.user.c_str());
  508. result = _mqtt.connect(
  509. _mqtt_settings.clientid.c_str(),
  510. _mqtt_settings.user.c_str(),
  511. _mqtt_settings.pass.c_str(),
  512. _mqtt_settings.will.c_str(),
  513. _mqtt_settings.qos,
  514. _mqtt_settings.retain,
  515. _mqtt_payload_offline.c_str());
  516. } else {
  517. result = _mqtt.connect(
  518. _mqtt_settings.clientid.c_str(),
  519. _mqtt_settings.will.c_str(),
  520. _mqtt_settings.qos,
  521. _mqtt_settings.retain,
  522. _mqtt_payload_offline.c_str());
  523. }
  524. #endif
  525. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  526. if (result && secure) {
  527. result = _mqtt_client_secure->afterConnected();
  528. }
  529. #endif
  530. return result;
  531. }
  532. #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  533. String _mqttPlaceholders(String text) {
  534. static const String mac = String(systemChipId());
  535. text.replace(F("{mac}"), mac);
  536. text.replace(F("{hostname}"), systemHostname());
  537. text.replace(F("{magnitude}"), F("#"));
  538. return text;
  539. }
  540. #if MDNS_SERVER_SUPPORT
  541. void _mqttMdnsSchedule();
  542. void _mqttMdnsStop();
  543. #endif
  544. void _mqttConfigure() {
  545. // Make sure we have both the server to connect to things are enabled
  546. {
  547. _mqttApplySetting(_mqtt_settings.server, mqtt::settings::server());
  548. _mqttApplySetting(_mqtt_settings.port, mqtt::settings::port());
  549. _mqttApplySetting(_mqtt_enabled, mqtt::settings::enabled());
  550. #if MDNS_SERVER_SUPPORT
  551. if (!_mqtt_enabled) {
  552. _mqttMdnsStop();
  553. }
  554. #endif
  555. if (!_mqtt_settings.server.length()) {
  556. #if MDNS_SERVER_SUPPORT
  557. // But, start mdns discovery when it would've been enabled
  558. if (_mqtt_enabled && mqtt::settings::autoconnect()) {
  559. _mqttMdnsSchedule();
  560. }
  561. #endif
  562. return;
  563. }
  564. }
  565. // Get base topic and apply placeholders
  566. {
  567. // Replace things inside curly braces (like {hostname}, {mac} etc.)
  568. auto topic = _mqttPlaceholders(mqtt::settings::topic());
  569. if (!_mqttValidTopicString(topic)) {
  570. mqttDisconnect();
  571. return;
  572. }
  573. // Topic **must** end with some kind of word
  574. if (topic.endsWith("/")) {
  575. topic.remove(topic.length() - 1);
  576. }
  577. // For simple topics, sssume right-hand side contains magnitude
  578. if (topic.indexOf("#") == -1) {
  579. topic.concat("/#");
  580. }
  581. _mqttApplySetting(_mqtt_settings.topic, std::move(topic));
  582. }
  583. // Getter and setter
  584. _mqttApplyValidTopicString(_mqtt_settings.getter, mqtt::settings::getter());
  585. _mqttApplyValidTopicString(_mqtt_settings.setter, mqtt::settings::setter());
  586. _mqttApplySetting(_mqtt_forward,
  587. !_mqtt_settings.setter.equals(_mqtt_settings.getter));
  588. // Last will aka status topic
  589. // (note that *must* be after topic updates)
  590. _mqttApplyValidTopicString(_mqtt_settings.will,
  591. mqttTopic(mqtt::settings::topicWill()));
  592. // MQTT options
  593. _mqttApplySetting(_mqtt_settings.user, _mqttPlaceholders(mqtt::settings::user()));
  594. _mqttApplySetting(_mqtt_settings.pass, mqtt::settings::password());
  595. _mqttApplySetting(_mqtt_settings.clientId, _mqttPlaceholders(mqtt::settings::clientId()));
  596. _mqttApplySetting(_mqtt_settings.qos, mqtt::settings::qos());
  597. _mqttApplySetting(_mqtt_settings.retain, mqtt::settings::retain());
  598. _mqttApplySetting(_mqtt_settings.keepalive, mqtt::settings::keepalive());
  599. // MQTT JSON
  600. _mqttApplySetting(_mqtt_use_json, mqtt::settings::json());
  601. _mqttApplyValidTopicString(_mqtt_settings.topic_json,
  602. mqttTopic(mqtt::settings::topicJson()));
  603. // Heartbeat messages
  604. _mqttApplySetting(_mqtt_heartbeat_mode, mqtt::settings::heartbeatMode());
  605. _mqttApplySetting(_mqtt_heartbeat_interval, mqtt::settings::heartbeatInterval());
  606. _mqtt_skip_time = mqtt::settings::skipTime();
  607. // Custom payload strings
  608. _mqtt_payload_online = mqtt::settings::payloadOnline();
  609. _mqtt_payload_offline = mqtt::settings::payloadOffline();
  610. // Reset reconnect delay to reconnect sooner
  611. _mqtt_reconnect_delay = mqtt::build::ReconnectDelayMin;
  612. }
  613. #if MDNS_SERVER_SUPPORT
  614. constexpr auto MqttMdnsDiscoveryInterval = espurna::duration::Seconds(15);
  615. espurna::timer::SystemTimer _mqtt_mdns_discovery;
  616. void _mqttMdnsStop() {
  617. _mqtt_mdns_discovery.stop();
  618. }
  619. void _mqttMdnsDiscovery();
  620. void _mqttMdnsSchedule() {
  621. _mqtt_mdns_discovery.once(MqttMdnsDiscoveryInterval, _mqttMdnsDiscovery);
  622. }
  623. void _mqttMdnsDiscovery() {
  624. if (mdnsRunning()) {
  625. DEBUG_MSG_P(PSTR("[MQTT] Querying MDNS service _mqtt._tcp\n"));
  626. auto found = mdnsServiceQuery("mqtt", "tcp", [](String&& server, uint16_t port) {
  627. DEBUG_MSG_P(PSTR("[MQTT] MDNS found broker at %s:%hu\n"), server.c_str(), port);
  628. setSetting("mqttServer", server);
  629. setSetting("mqttPort", port);
  630. return true;
  631. });
  632. if (found) {
  633. _mqttMdnsStop();
  634. _mqttConfigure();
  635. return;
  636. }
  637. }
  638. _mqttMdnsSchedule();
  639. }
  640. #endif
  641. void _mqttBackwards() {
  642. auto topic = mqtt::settings::topic();
  643. if (topic.indexOf("{identifier}") > 0) {
  644. topic.replace("{identifier}", "{hostname}");
  645. setSetting("mqttTopic", topic);
  646. }
  647. }
  648. #define __MQTT_INFO_STR(X) #X
  649. #define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X)
  650. alignas(4) static constexpr char MqttBuild[] PROGMEM_STRING_ATTR {
  651. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  652. "AsyncMqttClient"
  653. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  654. "Arduino-MQTT"
  655. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  656. "PubSubClient"
  657. #endif
  658. #if SECURE_CLIENT != SEURE_CLIENT_NONE
  659. " (w/ SECURE CLIENT)"
  660. #endif
  661. " Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " (bytes)"
  662. };
  663. #undef _MQTT_INFO_STR
  664. #undef __MQTT_INFO_STR
  665. constexpr espurna::StringView _mqttBuildInfo() {
  666. return MqttBuild;
  667. }
  668. String _mqttClientState(AsyncClientState state) {
  669. espurna::StringView out;
  670. switch (state) {
  671. case AsyncClientState::Connecting:
  672. out = STRING_VIEW("CONNECTING");
  673. break;
  674. case AsyncClientState::Connected:
  675. out = STRING_VIEW("CONNECTED");
  676. break;
  677. case AsyncClientState::Disconnected:
  678. out = STRING_VIEW("DISCONNECTED");
  679. break;
  680. case AsyncClientState::Disconnecting:
  681. out = STRING_VIEW("DISCONNECTING");
  682. break;
  683. default:
  684. out = STRING_VIEW("WAITING");
  685. break;
  686. }
  687. return out.toString();
  688. }
  689. String _mqttClientInfo(bool enabled, AsyncClientState state) {
  690. String out;
  691. if (_mqtt_enabled) {
  692. out += _mqttClientState(state);
  693. } else {
  694. out += STRING_VIEW("DISABLED");
  695. }
  696. return out;
  697. }
  698. String _mqttClientInfo() {
  699. return _mqttClientInfo(_mqtt_enabled, _mqtt_state);
  700. }
  701. void _mqttInfo() {
  702. constexpr auto build = _mqttBuildInfo();
  703. DEBUG_MSG_P(PSTR("[MQTT] %.*s\n"), build.length(), build.data());
  704. const auto client = _mqttClientInfo();
  705. DEBUG_MSG_P(PSTR("[MQTT] Client %.*s\n"), client.length(), client.c_str());
  706. if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) {
  707. DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"),
  708. _mqtt_last_connection.time_since_epoch().count(),
  709. _mqtt_reconnect_delay.count(),
  710. mqtt::build::ReconnectStep.count());
  711. }
  712. }
  713. } // namespace
  714. // -----------------------------------------------------------------------------
  715. // WEB
  716. // -----------------------------------------------------------------------------
  717. namespace {
  718. #if WEB_SUPPORT
  719. bool _mqttWebSocketOnKeyCheck(espurna::StringView key, const JsonVariant&) {
  720. return mqtt::settings::query::checkSamePrefix(key);
  721. }
  722. void _mqttWebSocketOnVisible(JsonObject& root) {
  723. wsPayloadModule(root, PSTR("mqtt"));
  724. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  725. wsPayloadModule(root, PSTR("mqttssl"));
  726. #endif
  727. }
  728. void _mqttWebSocketOnData(JsonObject& root) {
  729. root[F("mqttStatus")] = mqttConnected();
  730. }
  731. void _mqttWebSocketOnConnected(JsonObject& root) {
  732. using namespace mqtt::settings::keys;
  733. using mqtt::settings::keys::Server;
  734. root[FPSTR(Enabled)] = mqttEnabled();
  735. root[FPSTR(Server)] = mqtt::settings::server();
  736. root[FPSTR(Port)] = mqtt::settings::port();
  737. root[FPSTR(User)] = mqtt::settings::user();
  738. root[FPSTR(Password)] = mqtt::settings::password();
  739. root[FPSTR(Retain)] = mqtt::settings::retain();
  740. root[FPSTR(Keepalive)] = mqtt::settings::keepalive().count();
  741. root[FPSTR(ClientId)] = mqtt::settings::clientId();
  742. root[FPSTR(QoS)] = mqtt::settings::qos();
  743. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  744. root[FPSTR(Secure)] = mqtt::settings::secure();
  745. root[FPSTR(Fingerprint)] = mqtt::settings::fingerprint();
  746. #endif
  747. root[FPSTR(Topic)] = mqtt::settings::topic();
  748. root[FPSTR(UseJson)] = mqtt::settings::json();
  749. }
  750. #endif
  751. } // namespace
  752. // -----------------------------------------------------------------------------
  753. // SETTINGS
  754. // -----------------------------------------------------------------------------
  755. #if TERMINAL_SUPPORT
  756. namespace {
  757. PROGMEM_STRING(MqttCommand, "MQTT");
  758. static void _mqttCommand(::terminal::CommandContext&& ctx) {
  759. constexpr auto build = _mqttBuildInfo();
  760. ctx.output.printf_P(PSTR("%.*s\n"), build.length(), build.c_str());
  761. const auto client = _mqttClientInfo();
  762. ctx.output.printf_P(PSTR("client %.*s\n"), client.length(), client.c_str());
  763. settingsDump(ctx, mqtt::settings::query::Settings);
  764. terminalOK(ctx);
  765. }
  766. PROGMEM_STRING(MqttCommandReset, "MQTT.RESET");
  767. static void _mqttCommandReset(::terminal::CommandContext&& ctx) {
  768. _mqttConfigure();
  769. mqttDisconnect();
  770. terminalOK(ctx);
  771. }
  772. PROGMEM_STRING(MqttCommandSend, "MQTT.SEND");
  773. static void _mqttCommandSend(::terminal::CommandContext&& ctx) {
  774. if (ctx.argv.size() == 3) {
  775. if (mqttSend(ctx.argv[1].c_str(), ctx.argv[2].c_str(), false, false)) {
  776. terminalOK(ctx);
  777. } else {
  778. terminalError(ctx, F("Cannot queue the message"));
  779. }
  780. return;
  781. }
  782. terminalError(ctx, F("MQTT.SEND <topic> <payload>"));
  783. }
  784. static constexpr ::terminal::Command MqttCommands[] PROGMEM {
  785. {MqttCommand, _mqttCommand},
  786. {MqttCommandReset, _mqttCommandReset},
  787. {MqttCommandSend, _mqttCommandSend},
  788. };
  789. void _mqttCommandsSetup() {
  790. espurna::terminal::add(MqttCommands);
  791. }
  792. } // namespace
  793. #endif // TERMINAL_SUPPORT
  794. // -----------------------------------------------------------------------------
  795. // MQTT Callbacks
  796. // -----------------------------------------------------------------------------
  797. namespace {
  798. void _mqttCallback(unsigned int type, espurna::StringView topic, espurna::StringView payload) {
  799. if (type == MQTT_CONNECT_EVENT) {
  800. mqttSubscribe(MQTT_TOPIC_ACTION);
  801. }
  802. if (type == MQTT_MESSAGE_EVENT) {
  803. auto t = mqttMagnitude(topic);
  804. if (t.equals(MQTT_TOPIC_ACTION)) {
  805. rpcHandleAction(payload);
  806. }
  807. }
  808. }
  809. bool _mqttHeartbeat(espurna::heartbeat::Mask mask) {
  810. // No point retrying, since we will be re-scheduled on connection
  811. if (!mqttConnected()) {
  812. return true;
  813. }
  814. #if NTP_SUPPORT
  815. // Backported from the older utils implementation.
  816. // Wait until the time is synced to avoid sending partial report *and*
  817. // as a result, wait until the next interval to actually send the datetime string.
  818. if ((mask & espurna::heartbeat::Report::Datetime) && !ntpSynced()) {
  819. return false;
  820. }
  821. #endif
  822. // TODO: rework old HEARTBEAT_REPEAT_STATUS?
  823. // for example: send full report once, send only the dynamic data after that
  824. // (interval, hostname, description, ssid, bssid, ip, mac, rssi, uptime, datetime, heap, loadavg, vcc)
  825. // otherwise, it is still possible by setting everything to 0 *but* the Report::Status bit
  826. // TODO: per-module mask?
  827. // TODO: simply send static data with onConnected, and the rest from here?
  828. if (mask & espurna::heartbeat::Report::Status)
  829. mqttSendStatus();
  830. if (mask & espurna::heartbeat::Report::Interval)
  831. mqttSend(MQTT_TOPIC_INTERVAL, String(_mqtt_heartbeat_interval.count()).c_str());
  832. const auto app = buildApp();
  833. if (mask & espurna::heartbeat::Report::App)
  834. mqttSend(MQTT_TOPIC_APP, String(app.name).c_str());
  835. if (mask & espurna::heartbeat::Report::Version)
  836. mqttSend(MQTT_TOPIC_VERSION, String(app.version).c_str());
  837. if (mask & espurna::heartbeat::Report::Board)
  838. mqttSend(MQTT_TOPIC_BOARD, systemDevice().c_str());
  839. if (mask & espurna::heartbeat::Report::Hostname)
  840. mqttSend(MQTT_TOPIC_HOSTNAME, systemHostname().c_str());
  841. if (mask & espurna::heartbeat::Report::Description) {
  842. const auto value = systemDescription();
  843. if (value.length()) {
  844. mqttSend(MQTT_TOPIC_DESCRIPTION, value.c_str());
  845. }
  846. }
  847. if (mask & espurna::heartbeat::Report::Ssid)
  848. mqttSend(MQTT_TOPIC_SSID, WiFi.SSID().c_str());
  849. if (mask & espurna::heartbeat::Report::Bssid)
  850. mqttSend(MQTT_TOPIC_BSSID, WiFi.BSSIDstr().c_str());
  851. if (mask & espurna::heartbeat::Report::Ip)
  852. mqttSend(MQTT_TOPIC_IP, wifiStaIp().toString().c_str());
  853. if (mask & espurna::heartbeat::Report::Mac)
  854. mqttSend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str());
  855. if (mask & espurna::heartbeat::Report::Rssi)
  856. mqttSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str());
  857. if (mask & espurna::heartbeat::Report::Uptime)
  858. mqttSend(MQTT_TOPIC_UPTIME, String(systemUptime().count()).c_str());
  859. #if NTP_SUPPORT
  860. if (mask & espurna::heartbeat::Report::Datetime)
  861. mqttSend(MQTT_TOPIC_DATETIME, ntpDateTime().c_str());
  862. #endif
  863. if (mask & espurna::heartbeat::Report::Freeheap) {
  864. const auto stats = systemHeapStats();
  865. mqttSend(MQTT_TOPIC_FREEHEAP, String(stats.available).c_str());
  866. }
  867. if (mask & espurna::heartbeat::Report::Loadavg)
  868. mqttSend(MQTT_TOPIC_LOADAVG, String(systemLoadAverage()).c_str());
  869. if ((mask & espurna::heartbeat::Report::Vcc) && (ADC_MODE_VALUE == ADC_VCC))
  870. mqttSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str());
  871. auto status = mqttConnected();
  872. for (auto& cb : _mqtt_heartbeat_callbacks) {
  873. status = status && cb(mask);
  874. }
  875. return status;
  876. }
  877. void _mqttOnConnect() {
  878. _mqtt_reconnect_delay = mqtt::build::ReconnectDelayMin;
  879. _mqtt_last_connection = MqttTimeSource::now();
  880. _mqtt_state = AsyncClientState::Connected;
  881. systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval);
  882. // Notify all subscribers about the connection
  883. for (const auto callback : _mqtt_callbacks) {
  884. callback(MQTT_CONNECT_EVENT,
  885. espurna::StringView(),
  886. espurna::StringView());
  887. }
  888. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  889. }
  890. void _mqttOnDisconnect() {
  891. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  892. _mqtt_publish_callbacks.clear();
  893. _mqtt_subscribe_callbacks.clear();
  894. #endif
  895. _mqtt_last_connection = MqttTimeSource::now();
  896. _mqtt_state = AsyncClientState::Disconnected;
  897. systemStopHeartbeat(_mqttHeartbeat);
  898. // Notify all subscribers about the disconnect
  899. for (const auto callback : _mqtt_callbacks) {
  900. callback(MQTT_DISCONNECT_EVENT,
  901. espurna::StringView(),
  902. espurna::StringView());
  903. }
  904. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  905. }
  906. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  907. // Run the associated callback when message PID is acknowledged by the broker
  908. void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) {
  909. if (callbacks.empty()) {
  910. return;
  911. }
  912. auto end = callbacks.end();
  913. auto prev = callbacks.before_begin();
  914. auto it = callbacks.begin();
  915. while (it != end) {
  916. if ((*it).pid == pid) {
  917. (*it).callback();
  918. it = callbacks.erase_after(prev);
  919. } else {
  920. prev = it;
  921. ++it;
  922. }
  923. }
  924. }
  925. #endif
  926. // Force-skip everything received in a short window right after connecting to avoid syncronization issues.
  927. bool _mqttMaybeSkipRetained(char* topic) {
  928. if (_mqtt_skip_messages && (MqttTimeSource::now() - _mqtt_last_connection < _mqtt_skip_time)) {
  929. DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic);
  930. return true;
  931. }
  932. _mqtt_skip_messages = false;
  933. return false;
  934. }
  935. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  936. // MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could*
  937. // receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the
  938. // data until `(len + index) == total`.
  939. // TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String.
  940. // In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages.
  941. // TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks.
  942. void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties, size_t len, size_t index, size_t total) {
  943. static constexpr size_t BufferSize { MQTT_BUFFER_MAX_SIZE };
  944. static_assert(BufferSize > 0, "");
  945. if (!len || (len > BufferSize) || (total > BufferSize)) {
  946. return;
  947. }
  948. if (_mqttMaybeSkipRetained(topic)) {
  949. return;
  950. }
  951. alignas(4) static char buffer[((BufferSize + 3) & ~3) + 4] = {0};
  952. std::copy(payload, payload + len, &buffer[index]);
  953. // Not done yet
  954. if (total != (len + index)) {
  955. DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total);
  956. return;
  957. }
  958. buffer[len + index] = '\0';
  959. if (len < mqtt::build::MessageLogMax) {
  960. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, buffer);
  961. } else {
  962. DEBUG_MSG_P(PSTR("[MQTT] Received %s => (%u bytes)\n"), topic, len);
  963. }
  964. auto topic_view = espurna::StringView{ topic };
  965. auto message_view = espurna::StringView{ &buffer[0], &buffer[total] };
  966. for (const auto callback : _mqtt_callbacks) {
  967. callback(MQTT_MESSAGE_EVENT, topic_view, message_view);
  968. }
  969. }
  970. #else
  971. // Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/
  972. // TODO: consider reworking this (and async counterpart), giving callback func length of the message.
  973. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  974. if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return;
  975. if (_mqttMaybeSkipRetained(topic)) return;
  976. static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0};
  977. memmove(message, (char *) payload, len);
  978. message[len] = '\0';
  979. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  980. // Call subscribers with the message buffer
  981. for (auto& callback : _mqtt_callbacks) {
  982. callback(MQTT_MESSAGE_EVENT, topic, message);
  983. }
  984. }
  985. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  986. } // namespace
  987. // -----------------------------------------------------------------------------
  988. // Public API
  989. // -----------------------------------------------------------------------------
  990. // Return {magnitude} (aka #) part of the topic string
  991. // e.g.
  992. // * <TOPIC>/#/set - generic topic placement
  993. // ^
  994. // * <LHS>/#/<RHS>/set - when {magnitude} is used
  995. // ^
  996. // * #/<RHS>/set - when magnitude is at the start
  997. // ^
  998. // * #/set - when *only* {magnitude} is used (or, empty topic string)
  999. // ^
  1000. // Depends on the topic and setter settings values.
  1001. // Note that function is ignoring the fact that these strings may not contain the
  1002. // root topic b/c MQTT handles that instead of us (and it's good idea to trust it).
  1003. espurna::StringView mqttMagnitude(espurna::StringView topic) {
  1004. using espurna::StringView;
  1005. StringView out;
  1006. const auto pattern = _mqtt_settings.topic + _mqtt_settings.setter;
  1007. auto it = std::find(pattern.begin(), pattern.end(), '#');
  1008. if (it == pattern.end()) {
  1009. return out;
  1010. }
  1011. const auto start = StringView(pattern.begin(), it);
  1012. if (start.length()) {
  1013. topic = StringView(topic.begin() + start.length(), topic.end());
  1014. }
  1015. const auto end = StringView(it + 1, pattern.end());
  1016. if (end.length()) {
  1017. topic = StringView(topic.begin(), topic.end() - end.length());
  1018. }
  1019. out = StringView(topic.begin(), topic.end());
  1020. return out;
  1021. }
  1022. // Creates a proper MQTT topic for on the given 'magnitude'
  1023. static String _mqttTopicWith(String magnitude) {
  1024. String out;
  1025. out.reserve(magnitude.length()
  1026. + _mqtt_settings.topic.length()
  1027. + _mqtt_settings.setter.length()
  1028. + _mqtt_settings.getter.length());
  1029. out += _mqtt_settings.topic;
  1030. out.replace("#", magnitude);
  1031. return out;
  1032. }
  1033. // When magnitude is a status topic aka getter
  1034. static String _mqttTopicGetter(String magnitude) {
  1035. return _mqttTopicWith(magnitude) + _mqtt_settings.getter;
  1036. }
  1037. // When magnitude is an input topic aka setter
  1038. String _mqttTopicSetter(String magnitude) {
  1039. return _mqttTopicWith(magnitude) + _mqtt_settings.setter;
  1040. }
  1041. // When magnitude is indexed, append its index to the topic
  1042. static String _mqttTopicIndexed(String topic, size_t index) {
  1043. return topic + '/' + String(index, 10);
  1044. }
  1045. String mqttTopic(const String& magnitude) {
  1046. return _mqttTopicGetter(magnitude);
  1047. }
  1048. String mqttTopic(const String& magnitude, size_t index) {
  1049. return _mqttTopicGetter(_mqttTopicIndexed(magnitude, index));
  1050. }
  1051. String mqttTopicSetter(const String& magnitude) {
  1052. return _mqttTopicSetter(magnitude);
  1053. }
  1054. String mqttTopicSetter(const String& magnitude, size_t index) {
  1055. return _mqttTopicSetter(_mqttTopicIndexed(magnitude, index));
  1056. }
  1057. // -----------------------------------------------------------------------------
  1058. uint16_t mqttSendRaw(const char* topic, const char* message, bool retain, int qos) {
  1059. if (_mqtt.connected()) {
  1060. const unsigned int packetId {
  1061. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1062. _mqtt.publish(topic, qos, retain, message)
  1063. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  1064. _mqtt.publish(topic, message, retain, qos)
  1065. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  1066. _mqtt.publish(topic, message, retain)
  1067. #endif
  1068. };
  1069. #if DEBUG_SUPPORT
  1070. {
  1071. const size_t len = strlen(message);
  1072. auto begin = message;
  1073. auto end = message + len;
  1074. if ((len > mqtt::build::MessageLogMax) || (end != std::find(begin, end, '\n'))) {
  1075. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, len, packetId);
  1076. } else {
  1077. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId);
  1078. }
  1079. }
  1080. #endif
  1081. return packetId;
  1082. }
  1083. return false;
  1084. }
  1085. uint16_t mqttSendRaw(const char* topic, const char* message, bool retain) {
  1086. return mqttSendRaw(topic, message, retain, _mqtt_settings.qos);
  1087. }
  1088. uint16_t mqttSendRaw(const char* topic, const char* message) {
  1089. return mqttSendRaw(topic, message, _mqtt_settings.retain);
  1090. }
  1091. bool mqttSend(const char* topic, const char* message, bool force, bool retain) {
  1092. if (!force && _mqtt_use_json) {
  1093. mqttEnqueue(topic, message);
  1094. _mqtt_json_payload_flush.once(mqtt::build::JsonDelay, mqttFlush);
  1095. return true;
  1096. }
  1097. return mqttSendRaw(mqttTopic(topic).c_str(), message, retain) > 0;
  1098. }
  1099. bool mqttSend(const char* topic, const char* message, bool force) {
  1100. return mqttSend(topic, message, force, _mqtt_settings.retain);
  1101. }
  1102. bool mqttSend(const char* topic, const char* message) {
  1103. return mqttSend(topic, message, false);
  1104. }
  1105. bool mqttSend(const char* topic, unsigned int index, const char* message, bool force, bool retain) {
  1106. const size_t TopicLen { strlen(topic) };
  1107. String out;
  1108. out.reserve(TopicLen + 5);
  1109. out.concat(topic, TopicLen);
  1110. out += '/';
  1111. out += index;
  1112. return mqttSend(out.c_str(), message, force, retain);
  1113. }
  1114. bool mqttSend(const char* topic, unsigned int index, const char* message, bool force) {
  1115. return mqttSend(topic, index, message, force, _mqtt_settings.retain);
  1116. }
  1117. bool mqttSend(const char* topic, unsigned int index, const char* message) {
  1118. return mqttSend(topic, index, message, false);
  1119. }
  1120. // -----------------------------------------------------------------------------
  1121. constexpr size_t MqttJsonPayloadBufferSize { 1024ul };
  1122. void mqttFlush() {
  1123. if (!_mqtt.connected()) {
  1124. return;
  1125. }
  1126. if (_mqtt_json_payload.empty()) {
  1127. return;
  1128. }
  1129. DynamicJsonBuffer jsonBuffer(MqttJsonPayloadBufferSize);
  1130. JsonObject& root = jsonBuffer.createObject();
  1131. #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
  1132. if (ntpSynced()) {
  1133. root[MQTT_TOPIC_DATETIME] = ntpDateTime();
  1134. }
  1135. #endif
  1136. #if MQTT_ENQUEUE_MAC
  1137. root[MQTT_TOPIC_MAC] = WiFi.macAddress();
  1138. #endif
  1139. #if MQTT_ENQUEUE_HOSTNAME
  1140. root[MQTT_TOPIC_HOSTNAME] = systemHostname();
  1141. #endif
  1142. #if MQTT_ENQUEUE_IP
  1143. root[MQTT_TOPIC_IP] = wifiStaIp().toString();
  1144. #endif
  1145. #if MQTT_ENQUEUE_MESSAGE_ID
  1146. root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++;
  1147. #endif
  1148. // ref. https://github.com/xoseperez/espurna/issues/2503
  1149. // pretend that the message is already a valid json value
  1150. // when the string looks like a number
  1151. // ([0-9] with an optional decimal separator [.])
  1152. for (auto& payload : _mqtt_json_payload) {
  1153. const char* const topic { payload.topic().c_str() };
  1154. const char* const message { payload.message().c_str() };
  1155. if (isNumber(payload.message())) {
  1156. root[topic] = RawJson(message);
  1157. } else {
  1158. root[topic] = message;
  1159. }
  1160. }
  1161. String output;
  1162. root.printTo(output);
  1163. jsonBuffer.clear();
  1164. _mqtt_json_payload_count = 0;
  1165. _mqtt_json_payload.clear();
  1166. mqttSendRaw(_mqtt_settings.topic_json.c_str(), output.c_str(), false);
  1167. }
  1168. void mqttEnqueue(espurna::StringView topic, espurna::StringView payload) {
  1169. // Queue is not meant to send message "offline"
  1170. // We must prevent the queue does not get full while offline
  1171. if (_mqtt.connected()) {
  1172. if (_mqtt_json_payload_count >= MQTT_QUEUE_MAX_SIZE) {
  1173. mqttFlush();
  1174. }
  1175. _mqtt_json_payload.remove_if(
  1176. [topic](const MqttPayload& payload) {
  1177. return topic == payload.topic();
  1178. });
  1179. _mqtt_json_payload.emplace_front(
  1180. topic.toString(), payload.toString());
  1181. ++_mqtt_json_payload_count;
  1182. }
  1183. }
  1184. // -----------------------------------------------------------------------------
  1185. // Only async client returns resulting PID, sync libraries return either success (1) or failure (0)
  1186. uint16_t mqttSubscribeRaw(const char* topic, int qos) {
  1187. uint16_t pid { 0u };
  1188. if (_mqtt.connected() && (strlen(topic) > 0)) {
  1189. pid = _mqtt.subscribe(topic, qos);
  1190. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, pid);
  1191. }
  1192. return pid;
  1193. }
  1194. uint16_t mqttSubscribeRaw(const char* topic) {
  1195. return mqttSubscribeRaw(topic, _mqtt_settings.qos);
  1196. }
  1197. bool mqttSubscribe(const char* topic) {
  1198. return mqttSubscribeRaw(mqttTopicSetter(topic).c_str(), _mqtt_settings.qos);
  1199. }
  1200. uint16_t mqttUnsubscribeRaw(const char* topic) {
  1201. uint16_t pid { 0u };
  1202. if (_mqtt.connected() && (strlen(topic) > 0)) {
  1203. pid = _mqtt.unsubscribe(topic);
  1204. DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing from %s (PID %d)\n"), topic, pid);
  1205. }
  1206. return pid;
  1207. }
  1208. bool mqttUnsubscribe(const char* topic) {
  1209. return mqttUnsubscribeRaw(mqttTopicSetter(topic).c_str());
  1210. }
  1211. // -----------------------------------------------------------------------------
  1212. void mqttEnabled(bool status) {
  1213. _mqtt_enabled = status;
  1214. }
  1215. bool mqttEnabled() {
  1216. return _mqtt_enabled;
  1217. }
  1218. bool mqttConnected() {
  1219. return _mqtt.connected();
  1220. }
  1221. void mqttDisconnect() {
  1222. if (_mqtt.connected()) {
  1223. DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
  1224. _mqtt.disconnect();
  1225. }
  1226. }
  1227. bool mqttForward() {
  1228. return _mqtt_forward;
  1229. }
  1230. /**
  1231. Register a persistent lifecycle callback
  1232. @param standalone function pointer
  1233. */
  1234. void mqttRegister(MqttCallback callback) {
  1235. _mqtt_callbacks.push_front(callback);
  1236. }
  1237. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1238. /**
  1239. Register a temporary publish callback
  1240. @param callable object
  1241. */
  1242. void mqttOnPublish(uint16_t pid, MqttPidCallback callback) {
  1243. _mqtt_publish_callbacks.push_front(
  1244. MqttPidCallbackHandler{
  1245. .pid = pid,
  1246. .callback = std::move(callback),
  1247. });
  1248. }
  1249. /**
  1250. Register a temporary subscribe callback
  1251. @param callable object
  1252. */
  1253. void mqttOnSubscribe(uint16_t pid, MqttPidCallback callback) {
  1254. _mqtt_subscribe_callbacks.push_front(
  1255. MqttPidCallbackHandler{
  1256. .pid = pid,
  1257. .callback = std::move(callback),
  1258. });
  1259. }
  1260. #endif
  1261. // TODO: these strings are only updated after running the configuration routine and when MQTT is *enabled*
  1262. const String& mqttPayloadOnline() {
  1263. return _mqtt_payload_online;
  1264. }
  1265. const String& mqttPayloadOffline() {
  1266. return _mqtt_payload_offline;
  1267. }
  1268. const char* mqttPayloadStatus(bool status) {
  1269. return status ? _mqtt_payload_online.c_str() : _mqtt_payload_offline.c_str();
  1270. }
  1271. void mqttSendStatus() {
  1272. mqttSendRaw(_mqtt_settings.will.c_str(), _mqtt_payload_online.c_str(), true);
  1273. }
  1274. // -----------------------------------------------------------------------------
  1275. // Initialization
  1276. // -----------------------------------------------------------------------------
  1277. namespace {
  1278. void _mqttConnect() {
  1279. // Do not connect if already connected or still trying to connect
  1280. if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return;
  1281. // Do not connect if disabled or no WiFi
  1282. if (!_mqtt_enabled || (!wifiConnected())) return;
  1283. // Check reconnect interval
  1284. if (MqttTimeSource::now() - _mqtt_last_connection < _mqtt_reconnect_delay) return;
  1285. // Increase the reconnect delay each attempt
  1286. _mqtt_reconnect_delay += mqtt::build::ReconnectStep;
  1287. _mqtt_reconnect_delay = std::clamp(_mqtt_reconnect_delay,
  1288. mqtt::build::ReconnectDelayMin, mqtt::build::ReconnectDelayMax);
  1289. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%hu\n"),
  1290. _mqtt_settings.server.c_str(), _mqtt_settings.port);
  1291. _mqtt_state = AsyncClientState::Connecting;
  1292. _mqtt_skip_messages = (_mqtt_skip_time.count() > 0);
  1293. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  1294. const bool secure = mqtt::settings::secure();
  1295. #else
  1296. const bool secure = false;
  1297. #endif
  1298. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1299. _mqttSetupAsyncClient(secure);
  1300. #elif (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT)
  1301. if (_mqttSetupSyncClient(secure) && _mqttConnectSyncClient(secure)) {
  1302. _mqttOnConnect();
  1303. } else {
  1304. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  1305. _mqttOnDisconnect();
  1306. }
  1307. #else
  1308. #error "please check that MQTT_LIBRARY is valid"
  1309. #endif
  1310. }
  1311. } // namespace
  1312. void mqttLoop() {
  1313. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1314. _mqttConnect();
  1315. #else
  1316. if (_mqtt.connected()) {
  1317. _mqtt.loop();
  1318. } else {
  1319. if (_mqtt_state != AsyncClientState::Disconnected) {
  1320. _mqttOnDisconnect();
  1321. }
  1322. _mqttConnect();
  1323. }
  1324. #endif
  1325. }
  1326. void mqttHeartbeat(espurna::heartbeat::Callback callback) {
  1327. _mqtt_heartbeat_callbacks.push_front(callback);
  1328. }
  1329. void mqttSetup() {
  1330. _mqttBackwards();
  1331. _mqttInfo();
  1332. mqtt::settings::query::setup();
  1333. #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1334. // XXX: should not place this in config, addServerFingerprint does not check for duplicates
  1335. #if SECURE_CLIENT != SECURE_CLIENT_NONE
  1336. {
  1337. if (_mqtt_sc_config.on_fingerprint) {
  1338. const String fingerprint = _mqtt_sc_config.on_fingerprint();
  1339. uint8_t buffer[20] = {0};
  1340. if (sslFingerPrintArray(fingerprint.c_str(), buffer)) {
  1341. _mqtt.addServerFingerprint(buffer);
  1342. }
  1343. }
  1344. }
  1345. #endif // SECURE_CLIENT != SECURE_CLIENT_NONE
  1346. _mqtt.onMessage(_mqttOnMessageAsync);
  1347. _mqtt.onConnect([](bool) {
  1348. _mqttOnConnect();
  1349. });
  1350. _mqtt.onSubscribe([](uint16_t pid, int) {
  1351. _mqttPidCallback(_mqtt_subscribe_callbacks, pid);
  1352. });
  1353. _mqtt.onPublish([](uint16_t pid) {
  1354. _mqttPidCallback(_mqtt_publish_callbacks, pid);
  1355. });
  1356. _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  1357. switch (reason) {
  1358. case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED:
  1359. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  1360. break;
  1361. case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED:
  1362. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  1363. break;
  1364. case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE:
  1365. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  1366. break;
  1367. case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS:
  1368. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  1369. break;
  1370. case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED:
  1371. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  1372. break;
  1373. case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT:
  1374. #if ASYNC_TCP_SSL_ENABLED
  1375. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  1376. #endif
  1377. break;
  1378. case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
  1379. // This is never used by the AsyncMqttClient source
  1380. #if 0
  1381. DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n"));
  1382. #endif
  1383. break;
  1384. case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE:
  1385. DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n"));
  1386. break;
  1387. }
  1388. _mqttOnDisconnect();
  1389. });
  1390. #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT
  1391. _mqtt.onMessageAdvanced([](MQTTClient* , char topic[], char payload[], int length) {
  1392. _mqttOnMessage(topic, payload, length);
  1393. });
  1394. #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT
  1395. _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  1396. _mqttOnMessage(topic, (char *) payload, length);
  1397. });
  1398. #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
  1399. _mqttConfigure();
  1400. mqttRegister(_mqttCallback);
  1401. #if WEB_SUPPORT
  1402. wsRegister()
  1403. .onVisible(_mqttWebSocketOnVisible)
  1404. .onData(_mqttWebSocketOnData)
  1405. .onConnected(_mqttWebSocketOnConnected)
  1406. .onKeyCheck(_mqttWebSocketOnKeyCheck);
  1407. mqttRegister([](unsigned int type, espurna::StringView, espurna::StringView) {
  1408. if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) {
  1409. wsPost(_mqttWebSocketOnData);
  1410. }
  1411. });
  1412. #endif
  1413. #if TERMINAL_SUPPORT
  1414. _mqttCommandsSetup();
  1415. #endif
  1416. // Main callbacks
  1417. espurnaRegisterLoop(mqttLoop);
  1418. espurnaRegisterReload(_mqttConfigure);
  1419. }
  1420. #endif // MQTT_SUPPORT