Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
15 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. /*
  2. MQTT MODULE
  3. Copyright (C) 2016-2017 by Xose Pérez <xose dot perez at gmail dot com>
  4. */
  5. #include <ESP8266WiFi.h>
  6. #include <ArduinoJson.h>
  7. #include <vector>
  8. #include <Ticker.h>
  9. const char *mqtt_user = 0;
  10. const char *mqtt_pass = 0;
  11. #if MQTT_USE_ASYNC // Using AsyncMqttClient
  12. #include <AsyncMqttClient.h>
  13. AsyncMqttClient mqtt;
  14. #else // Using PubSubClient
  15. #include <PubSubClient.h>
  16. PubSubClient mqtt;
  17. bool _mqttConnected = false;
  18. WiFiClient _mqttClient;
  19. #if ASYNC_TCP_SSL_ENABLED
  20. WiFiClientSecure _mqttClientSecure;
  21. #endif // ASYNC_TCP_SSL_ENABLED
  22. #endif // MQTT_USE_ASYNC
  23. bool _mqttEnabled = MQTT_ENABLED;
  24. String _mqttTopic;
  25. String _mqttSetter;
  26. String _mqttGetter;
  27. bool _mqttForward;
  28. char *_mqttUser = 0;
  29. char *_mqttPass = 0;
  30. char *_mqttWill;
  31. #if MQTT_SKIP_RETAINED
  32. unsigned long _mqttConnectedAt = 0;
  33. #endif
  34. std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
  35. typedef struct {
  36. char * topic;
  37. char * message;
  38. } mqtt_message_t;
  39. std::vector<mqtt_message_t> _mqtt_queue;
  40. Ticker _mqttFlushTicker;
  41. // -----------------------------------------------------------------------------
  42. // Public API
  43. // -----------------------------------------------------------------------------
  44. bool mqttConnected() {
  45. return mqtt.connected();
  46. }
  47. void mqttDisconnect() {
  48. if (mqtt.connected()) mqtt.disconnect();
  49. }
  50. bool mqttForward() {
  51. return _mqttForward;
  52. }
  53. String mqttSubtopic(char * topic) {
  54. String response;
  55. String t = String(topic);
  56. if (t.startsWith(_mqttTopic) && t.endsWith(_mqttSetter)) {
  57. response = t.substring(_mqttTopic.length(), t.length() - _mqttSetter.length());
  58. }
  59. return response;
  60. }
  61. void mqttSendRaw(const char * topic, const char * message) {
  62. if (mqtt.connected()) {
  63. #if MQTT_USE_ASYNC
  64. unsigned int packetId = mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message);
  65. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId);
  66. #else
  67. mqtt.publish(topic, message, MQTT_RETAIN);
  68. DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
  69. #endif
  70. }
  71. }
  72. void _mqttFlush() {
  73. if (_mqtt_queue.size() == 0) return;
  74. DynamicJsonBuffer jsonBuffer;
  75. JsonObject& root = jsonBuffer.createObject();
  76. for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
  77. mqtt_message_t element = _mqtt_queue[i];
  78. root[element.topic] = element.message;
  79. }
  80. #if NTP_SUPPORT
  81. if (ntpConnected()) root[MQTT_TOPIC_TIME] = ntpDateTime();
  82. #endif
  83. root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
  84. root[MQTT_TOPIC_IP] = getIP();
  85. String output;
  86. root.printTo(output);
  87. String path = _mqttTopic + String(MQTT_TOPIC_JSON);
  88. mqttSendRaw(path.c_str(), output.c_str());
  89. for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
  90. mqtt_message_t element = _mqtt_queue[i];
  91. free(element.topic);
  92. free(element.message);
  93. }
  94. _mqtt_queue.clear();
  95. }
  96. void mqttSend(const char * topic, const char * message, bool force) {
  97. bool useJson = force ? false : getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1;
  98. if (useJson) {
  99. mqtt_message_t element;
  100. element.topic = strdup(topic);
  101. element.message = strdup(message);
  102. _mqtt_queue.push_back(element);
  103. _mqttFlushTicker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush);
  104. } else {
  105. String path = _mqttTopic + String(topic) + _mqttGetter;
  106. mqttSendRaw(path.c_str(), message);
  107. }
  108. }
  109. void mqttSend(const char * topic, const char * message) {
  110. mqttSend(topic, message, false);
  111. }
  112. void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
  113. char buffer[strlen(topic)+5];
  114. snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
  115. mqttSend(buffer, message, force);
  116. }
  117. void mqttSend(const char * topic, unsigned int index, const char * message) {
  118. mqttSend(topic, index, message, false);
  119. }
  120. void mqttSubscribeRaw(const char * topic) {
  121. if (mqtt.connected() && (strlen(topic) > 0)) {
  122. #if MQTT_USE_ASYNC
  123. unsigned int packetId = mqtt.subscribe(topic, MQTT_QOS);
  124. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
  125. #else
  126. mqtt.subscribe(topic, MQTT_QOS);
  127. DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
  128. #endif
  129. }
  130. }
  131. void mqttSubscribe(const char * topic) {
  132. String path = _mqttTopic + String(topic) + _mqttSetter;
  133. mqttSubscribeRaw(path.c_str());
  134. }
  135. void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) {
  136. _mqtt_callbacks.push_back(callback);
  137. }
  138. // -----------------------------------------------------------------------------
  139. // Callbacks
  140. // -----------------------------------------------------------------------------
  141. void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
  142. if (type == MQTT_CONNECT_EVENT) {
  143. mqttSubscribe(MQTT_TOPIC_ACTION);
  144. }
  145. if (type == MQTT_MESSAGE_EVENT) {
  146. // Match topic
  147. String t = mqttSubtopic((char *) topic);
  148. // Actions
  149. if (t.equals(MQTT_TOPIC_ACTION)) {
  150. if (strcmp(payload, MQTT_ACTION_RESET) == 0) {
  151. customReset(CUSTOM_RESET_MQTT);
  152. ESP.restart();
  153. }
  154. }
  155. }
  156. }
  157. void _mqttOnConnect() {
  158. DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
  159. #if MQTT_SKIP_RETAINED
  160. _mqttConnectedAt = millis();
  161. #endif
  162. // Send first Heartbeat
  163. heartbeat();
  164. // Send connect event to subscribers
  165. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  166. (*_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
  167. }
  168. }
  169. void _mqttOnDisconnect() {
  170. DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
  171. // Send disconnect event to subscribers
  172. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  173. (*_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
  174. }
  175. }
  176. void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
  177. if (len == 0) return;
  178. char message[len + 1];
  179. strlcpy(message, (char *) payload, len + 1);
  180. #if MQTT_SKIP_RETAINED
  181. if (millis() - _mqttConnectedAt < MQTT_SKIP_TIME) {
  182. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message);
  183. return;
  184. }
  185. #endif
  186. DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
  187. // Send message event to subscribers
  188. for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
  189. (*_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
  190. }
  191. }
  192. #if MQTT_USE_ASYNC
  193. bool mqttFormatFP(const char * fingerprint, unsigned char * bytearray) {
  194. // check length (20 2-character digits ':' or ' ' separated => 20*2+19 = 59)
  195. if (strlen(fingerprint) != 59) return false;
  196. DEBUG_MSG_P(PSTR("[MQTT] Fingerprint %s\n"), fingerprint);
  197. // walk the fingerprint
  198. for (unsigned int i=0; i<20; i++) {
  199. bytearray[i] = strtol(fingerprint + 3*i, NULL, 16);
  200. }
  201. return true;
  202. }
  203. #else
  204. bool mqttFormatFP(const char * fingerprint, char * destination) {
  205. // check length (20 2-character digits ':' or ' ' separated => 20*2+19 = 59)
  206. if (strlen(fingerprint) != 59) return false;
  207. DEBUG_MSG_P(PSTR("[MQTT] Fingerprint %s\n"), fingerprint);
  208. // copy it
  209. strncpy(destination, fingerprint, 59);
  210. // walk the fingerprint replacing ':' for ' '
  211. for (unsigned char i = 0; i<59; i++) {
  212. if (destination[i] == ':') destination[i] = ' ';
  213. }
  214. return true;
  215. }
  216. #endif
  217. void mqttEnabled(bool status) {
  218. _mqttEnabled = status;
  219. setSetting("mqttEnabled", status ? 1 : 0);
  220. }
  221. bool mqttEnabled() {
  222. return _mqttEnabled;
  223. }
  224. void mqttConnect() {
  225. if (_mqttEnabled & !mqtt.connected()) {
  226. // Disable MQTT after MQTT_MAX_TRIES attemps in a row
  227. #if MQTT_MAX_TRIES > 0
  228. static unsigned int tries = 0;
  229. static unsigned long last_try = millis();
  230. if (millis() - last_try < MQTT_TRY_INTERVAL) {
  231. if (++tries > MQTT_MAX_TRIES) {
  232. DEBUG_MSG_P(PSTR("[MQTT] MQTT_MAX_TRIES met, disabling MQTT\n"));
  233. mqttEnabled(false);
  234. tries = 0;
  235. return;
  236. }
  237. } else {
  238. tries = 0;
  239. }
  240. last_try = millis();
  241. #endif
  242. if (_mqttUser) free(_mqttUser);
  243. if (_mqttPass) free(_mqttPass);
  244. char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
  245. if (strlen(host) == 0) return;
  246. unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
  247. _mqttUser = strdup(getSetting("mqttUser").c_str());
  248. _mqttPass = strdup(getSetting("mqttPassword").c_str());
  249. if (_mqttWill) free(_mqttWill);
  250. _mqttWill = strdup((_mqttTopic + MQTT_TOPIC_STATUS).c_str());
  251. DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
  252. #if MQTT_USE_ASYNC
  253. mqtt.setServer(host, port);
  254. mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
  255. mqtt.setWill(_mqttWill, MQTT_QOS, MQTT_RETAIN, "0");
  256. if ((strlen(_mqttUser) > 0) && (strlen(_mqttPass) > 0)) {
  257. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqttUser);
  258. mqtt.setCredentials(_mqttUser, _mqttPass);
  259. }
  260. #if ASYNC_TCP_SSL_ENABLED
  261. bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  262. mqtt.setSecure(secure);
  263. if (secure) {
  264. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  265. unsigned char fp[20] = {0};
  266. if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
  267. mqtt.addServerFingerprint(fp);
  268. } else {
  269. DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
  270. }
  271. }
  272. #endif // ASYNC_TCP_SSL_ENABLED
  273. mqtt.connect();
  274. #else // not MQTT_USE_ASYNC
  275. bool response = true;
  276. #if ASYNC_TCP_SSL_ENABLED
  277. bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
  278. if (secure) {
  279. DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
  280. if (_mqttClientSecure.connect(host, port)) {
  281. char fp[60] = {0};
  282. if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
  283. if (_mqttClientSecure.verify(fp, host)) {
  284. mqtt.setClient(_mqttClientSecure);
  285. } else {
  286. DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
  287. response = false;
  288. }
  289. } else {
  290. DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
  291. response = false;
  292. }
  293. } else {
  294. DEBUG_MSG_P(PSTR("[MQTT] Client connection failed\n"));
  295. response = false;
  296. }
  297. } else {
  298. mqtt.setClient(_mqttClient);
  299. }
  300. #else // not ASYNC_TCP_SSL_ENABLED
  301. mqtt.setClient(_mqttClient);
  302. #endif // ASYNC_TCP_SSL_ENABLED
  303. if (response) {
  304. mqtt.setServer(host, port);
  305. if ((strlen(_mqttUser) > 0) && (strlen(_mqttPass) > 0)) {
  306. DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqttUser);
  307. response = mqtt.connect(getIdentifier().c_str(), _mqttUser, _mqttPass, _mqttWill, MQTT_QOS, MQTT_RETAIN, "0");
  308. } else {
  309. response = mqtt.connect(getIdentifier().c_str(), _mqttWill, MQTT_QOS, MQTT_RETAIN, "0");
  310. }
  311. }
  312. if (response) {
  313. _mqttOnConnect();
  314. _mqttConnected = true;
  315. } else {
  316. DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
  317. }
  318. #endif // MQTT_USE_ASYNC
  319. free(host);
  320. }
  321. }
  322. void mqttConfigure() {
  323. // Replace identifier
  324. _mqttTopic = getSetting("mqttTopic", MQTT_TOPIC);
  325. _mqttTopic.replace("{identifier}", getSetting("hostname"));
  326. if (!_mqttTopic.endsWith("/")) _mqttTopic = _mqttTopic + "/";
  327. // Getters and setters
  328. _mqttSetter = getSetting("mqttSetter", MQTT_USE_SETTER);
  329. _mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER);
  330. _mqttForward = !_mqttGetter.equals(_mqttSetter);
  331. // Enable
  332. if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
  333. mqttEnabled(false);
  334. } else {
  335. _mqttEnabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
  336. }
  337. }
  338. void mqttSetup() {
  339. #if MQTT_USE_ASYNC
  340. mqtt.onConnect([](bool sessionPresent) {
  341. _mqttOnConnect();
  342. });
  343. mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
  344. if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) {
  345. DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n"));
  346. }
  347. if (reason == AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED) {
  348. DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n"));
  349. }
  350. if (reason == AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE) {
  351. DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n"));
  352. }
  353. if (reason == AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS) {
  354. DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n"));
  355. }
  356. if (reason == AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED) {
  357. DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n"));
  358. }
  359. #if ASYNC_TCP_SSL_ENABLED
  360. if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
  361. DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n"));
  362. }
  363. #endif
  364. _mqttOnDisconnect();
  365. });
  366. mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  367. _mqttOnMessage(topic, payload, len);
  368. });
  369. mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) {
  370. DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId);
  371. });
  372. mqtt.onPublish([](uint16_t packetId) {
  373. DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId);
  374. });
  375. #else
  376. mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
  377. _mqttOnMessage(topic, (char *) payload, length);
  378. });
  379. #endif
  380. mqttConfigure();
  381. mqttRegister(_mqttCallback);
  382. }
  383. void mqttLoop() {
  384. if (!_mqttEnabled) return;
  385. if (WiFi.status() == WL_CONNECTED) {
  386. if (!mqtt.connected()) {
  387. #if not MQTT_USE_ASYNC
  388. if (_mqttConnected) {
  389. _mqttOnDisconnect();
  390. _mqttConnected = false;
  391. }
  392. #endif
  393. static unsigned long last = 0;
  394. if (millis() - last > MQTT_RECONNECT_DELAY) {
  395. last = millis();
  396. mqttConnect();
  397. }
  398. #if not MQTT_USE_ASYNC
  399. } else {
  400. mqtt.loop();
  401. #endif
  402. }
  403. }
  404. }