/* MQTT MODULE Copyright (C) 2016-2019 by Xose PĂ©rez Updated secure client support by Niek van der Maas < mail at niekvandermaas dot nl> */ #include "mqtt.h" #if MQTT_SUPPORT #include #include #include #include "system.h" #include "mdns.h" #include "mqtt.h" #include "ntp.h" #include "rpc.h" #include "rtcmem.h" #include "ws.h" #include "libs/AsyncClientHelpers.h" #include "libs/SecureClientHelpers.h" #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT #include #include #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT #include #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT #include #endif // ----------------------------------------------------------------------------- #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT AsyncMqttClient _mqtt; #else // MQTT_LIBRARY_ARDUINOMQTT / MQTT_LIBRARY_PUBSUBCLIENT WiFiClient _mqtt_client; #if SECURE_CLIENT != SECURE_CLIENT_NONE std::unique_ptr _mqtt_client_secure = nullptr; #if MQTT_SECURE_CLIENT_INCLUDE_CA #include "static/mqtt_client_trusted_root_ca.h" // Assumes this header file defines a _mqtt_client_trusted_root_ca[] PROGMEM = "...PEM data..." #else #include "static/letsencrypt_isrgroot_pem.h" // Default to LetsEncrypt X3 certificate #define _mqtt_client_trusted_root_ca _ssl_letsencrypt_isrg_x3_ca #endif // MQTT_SECURE_CLIENT_INCLUDE_CA #endif // SECURE_CLIENT != SECURE_CLIENT_NONE #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT MQTTClient _mqtt(MQTT_BUFFER_MAX_SIZE); #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT PubSubClient _mqtt; #endif #endif // MQTT_LIBRARY == MQTT_ASYNCMQTTCLIENT unsigned long _mqtt_last_connection = 0; AsyncClientState _mqtt_state = AsyncClientState::Disconnected; bool _mqtt_skip_messages = false; unsigned long _mqtt_skip_time = MQTT_SKIP_TIME; unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; bool _mqtt_enabled = MQTT_ENABLED; bool _mqtt_use_json = false; bool _mqtt_retain = MQTT_RETAIN; int _mqtt_qos = MQTT_QOS; int _mqtt_keepalive = MQTT_KEEPALIVE; String _mqtt_topic; String _mqtt_topic_json; String _mqtt_setter; String _mqtt_getter; bool _mqtt_forward; String _mqtt_user; String _mqtt_pass; String _mqtt_will; String _mqtt_server; uint16_t _mqtt_port; String _mqtt_clientid; #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT struct MqttPidCallback { uint16_t pid; mqtt_pid_callback_f run; }; using MqttPidCallbacks = std::forward_list; MqttPidCallbacks _mqtt_publish_callbacks; MqttPidCallbacks _mqtt_subscribe_callbacks; #endif std::forward_list _mqtt_heartbeat_callbacks; heartbeat::Mode _mqtt_heartbeat_mode; heartbeat::Seconds _mqtt_heartbeat_interval; String _mqtt_payload_online; String _mqtt_payload_offline; std::forward_list _mqtt_callbacks; // ----------------------------------------------------------------------------- // JSON payload // ----------------------------------------------------------------------------- struct MqttPayload { MqttPayload() = delete; MqttPayload(const MqttPayload&) = default; // TODO: replace String implementation with Core v3 (or just use newer Core) // 2.7.x still has basic Arduino String move ctor that is not noexcept MqttPayload(MqttPayload&& other) noexcept : _topic(std::move(other._topic)), _message(std::move(other._message)) {} template MqttPayload(Topic&& topic, Message&& message) : _topic(std::forward(topic)), _message(std::forward(message)) {} const String& topic() const { return _topic; } const String& message() const { return _message; } private: String _topic; String _message; }; size_t _mqtt_json_payload_count { 0ul }; std::forward_list _mqtt_json_payload; Ticker _mqtt_json_payload_flush; // ----------------------------------------------------------------------------- // Secure client handlers // ----------------------------------------------------------------------------- #if SECURE_CLIENT == SECURE_CLIENT_AXTLS SecureClientConfig _mqtt_sc_config { "MQTT", []() -> String { return _mqtt_server; }, []() -> int { return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK); }, []() -> String { return getSetting("mqttFP", MQTT_SSL_FINGERPRINT); }, true }; #endif #if SECURE_CLIENT == SECURE_CLIENT_BEARSSL SecureClientConfig _mqtt_sc_config { "MQTT", []() -> int { return getSetting("mqttScCheck", MQTT_SECURE_CLIENT_CHECK); }, []() -> PGM_P { return _mqtt_client_trusted_root_ca; }, []() -> String { return getSetting("mqttFP", MQTT_SSL_FINGERPRINT); }, []() -> uint16_t { return getSetting("mqttScMFLN", MQTT_SECURE_CLIENT_MFLN); }, true }; #endif // ----------------------------------------------------------------------------- // Client configuration & setup // ----------------------------------------------------------------------------- #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT void _mqttSetupAsyncClient(bool secure = false) { _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port); _mqtt.setClientId(_mqtt_clientid.c_str()); _mqtt.setKeepAlive(_mqtt_keepalive); _mqtt.setCleanSession(false); _mqtt.setWill(_mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str()); if (_mqtt_user.length() && _mqtt_pass.length()) { DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str()); _mqtt.setCredentials(_mqtt_user.c_str(), _mqtt_pass.c_str()); } #if SECURE_CLIENT != SECURE_CLIENT_NONE if (secure) { DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); _mqtt.setSecure(secure); } #endif // SECURE_CLIENT != SECURE_CLIENT_NONE _mqtt.connect(); } #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT #if (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT) WiFiClient& _mqttGetClient(bool secure) { #if SECURE_CLIENT != SECURE_CLIENT_NONE return (secure ? _mqtt_client_secure->get() : _mqtt_client); #else return _mqtt_client; #endif } bool _mqttSetupSyncClient(bool secure = false) { #if SECURE_CLIENT != SECURE_CLIENT_NONE if (secure) { if (!_mqtt_client_secure) _mqtt_client_secure = std::make_unique(_mqtt_sc_config); return _mqtt_client_secure->beforeConnected(); } #endif return true; } bool _mqttConnectSyncClient(bool secure = false) { bool result = false; #if MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT _mqtt.begin(_mqtt_server.c_str(), _mqtt_port, _mqttGetClient(secure)); _mqtt.setWill(_mqtt_will.c_str(), _mqtt_payload_offline.c_str(), _mqtt_retain, _mqtt_qos); _mqtt.setKeepAlive(_mqtt_keepalive); result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str()); #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT _mqtt.setClient(_mqttGetClient(secure)); _mqtt.setServer(_mqtt_server.c_str(), _mqtt_port); if (_mqtt_user.length() && _mqtt_pass.length()) { DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user.c_str()); result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_user.c_str(), _mqtt_pass.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str()); } else { result = _mqtt.connect(_mqtt_clientid.c_str(), _mqtt_will.c_str(), _mqtt_qos, _mqtt_retain, _mqtt_payload_offline.c_str()); } #endif #if SECURE_CLIENT != SECURE_CLIENT_NONE if (result && secure) { result = _mqtt_client_secure->afterConnected(); } #endif return result; } #endif // (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT) void _mqttPlaceholders(String& text) { text.replace("{hostname}", getSetting("hostname")); text.replace("{magnitude}", "#"); String mac = WiFi.macAddress(); mac.replace(":", ""); text.replace("{mac}", mac); } template void _mqttApplySetting(T& current, T& updated) { if (current != updated) { current = std::move(updated); mqttDisconnect(); } } template void _mqttApplySetting(T& current, const T& updated) { if (current != updated) { current = updated; mqttDisconnect(); } } template void _mqttApplyTopic(T& current, const char* magnitude) { String updated = mqttTopic(magnitude, false); if (current != updated) { mqttFlush(); current = std::move(updated); } } void _mqttConfigure() { // Enable only when server is set { const String server = getSetting("mqttServer", MQTT_SERVER); const auto port = getSetting("mqttPort", static_cast(MQTT_PORT)); bool enabled = false; if (server.length()) { enabled = getSetting("mqttEnabled", 1 == MQTT_ENABLED); } _mqttApplySetting(_mqtt_server, server); _mqttApplySetting(_mqtt_enabled, enabled); _mqttApplySetting(_mqtt_port, port); if (!enabled) return; } // Get base topic and apply placeholders { String topic = getSetting("mqttTopic", MQTT_TOPIC); if (topic.endsWith("/")) topic.remove(topic.length()-1); // Replace things inside curly braces (like {hostname}, {mac} etc.) _mqttPlaceholders(topic); if (topic.indexOf("#") == -1) topic.concat("/#"); _mqttApplySetting(_mqtt_topic, topic); } // Getter and setter { String setter = getSetting("mqttSetter", MQTT_SETTER); String getter = getSetting("mqttGetter", MQTT_GETTER); bool forward = !setter.equals(getter) && RELAY_REPORT_STATUS; _mqttApplySetting(_mqtt_setter, setter); _mqttApplySetting(_mqtt_getter, getter); _mqttApplySetting(_mqtt_forward, forward); } // MQTT options { String user = getSetting("mqttUser", MQTT_USER); _mqttPlaceholders(user); String pass = getSetting("mqttPassword", MQTT_PASS); const auto qos = getSetting("mqttQoS", MQTT_QOS); const bool retain = getSetting("mqttRetain", 1 == MQTT_RETAIN); // Note: MQTT spec defines this as 2 bytes const auto keepalive = constrain( getSetting("mqttKeep", MQTT_KEEPALIVE), 0, std::numeric_limits::max() ); String id = getSetting("mqttClientID", getIdentifier()); _mqttPlaceholders(id); _mqttApplySetting(_mqtt_user, user); _mqttApplySetting(_mqtt_pass, pass); _mqttApplySetting(_mqtt_qos, qos); _mqttApplySetting(_mqtt_retain, retain); _mqttApplySetting(_mqtt_keepalive, keepalive); _mqttApplySetting(_mqtt_clientid, id); _mqttApplyTopic(_mqtt_will, MQTT_TOPIC_STATUS); } // MQTT JSON { _mqttApplySetting(_mqtt_use_json, getSetting("mqttUseJson", 1 == MQTT_USE_JSON)); _mqttApplyTopic(_mqtt_topic_json, MQTT_TOPIC_JSON); } _mqttApplySetting(_mqtt_heartbeat_mode, getSetting("mqttHbMode", heartbeat::currentMode())); _mqttApplySetting(_mqtt_heartbeat_interval, getSetting("mqttHbIntvl", heartbeat::currentInterval())); // Skip messages in a small window right after the connection _mqtt_skip_time = getSetting("mqttSkipTime", MQTT_SKIP_TIME); // Custom payload strings settingsProcessConfig({ {_mqtt_payload_online, "mqttPayloadOnline", MQTT_STATUS_ONLINE}, {_mqtt_payload_offline, "mqttPayloadOffline", MQTT_STATUS_OFFLINE} }); // Reset reconnect delay to reconnect sooner _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; } void _mqttBackwards() { String mqttTopic = getSetting("mqttTopic", MQTT_TOPIC); if (mqttTopic.indexOf("{identifier}") > 0) { mqttTopic.replace("{identifier}", "{hostname}"); setSetting("mqttTopic", mqttTopic); } } void _mqttInfo() { // Build information { #define __MQTT_INFO_STR(X) #X #define _MQTT_INFO_STR(X) __MQTT_INFO_STR(X) DEBUG_MSG_P(PSTR( "[MQTT] " #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT "AsyncMqttClient" #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT "Arduino-MQTT" #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT "PubSubClient" #endif ", SSL " #if SECURE_CLIENT != SEURE_CLIENT_NONE "ENABLED" #else "DISABLED" #endif ", Autoconnect " #if MQTT_AUTOCONNECT "ENABLED" #else "DISABLED" #endif ", Buffer size " _MQTT_INFO_STR(MQTT_BUFFER_MAX_SIZE) " bytes" "\n" )); #undef _MQTT_INFO_STR #undef __MQTT_INFO_STR } // Notify about the general state of the client { const __FlashStringHelper* enabled = _mqtt_enabled ? F("ENABLED") : F("DISABLED"); const __FlashStringHelper* state = nullptr; switch (_mqtt_state) { case AsyncClientState::Connecting: state = F("CONNECTING"); break; case AsyncClientState::Connected: state = F("CONNECTED"); break; case AsyncClientState::Disconnected: state = F("DISCONNECTED"); break; case AsyncClientState::Disconnecting: state = F("DISCONNECTING"); break; default: state = F("WAITING"); break; } DEBUG_MSG_P(PSTR("[MQTT] Client %s, %s\n"), String(enabled).c_str(), String(state).c_str() ); if (_mqtt_enabled && (_mqtt_state != AsyncClientState::Connected)) { DEBUG_MSG_P(PSTR("[MQTT] Retrying, Last %u with Delay %u (Step %u)\n"), _mqtt_last_connection, _mqtt_reconnect_delay, MQTT_RECONNECT_DELAY_STEP ); } } } // ----------------------------------------------------------------------------- // WEB // ----------------------------------------------------------------------------- #if WEB_SUPPORT bool _mqttWebSocketOnKeyCheck(const char * key, JsonVariant& value) { return (strncmp(key, "mqtt", 3) == 0); } void _mqttWebSocketOnVisible(JsonObject& root) { root["mqttVisible"] = 1; #if ASYNC_TCP_SSL_ENABLED root["mqttsslVisible"] = 1; #endif } void _mqttWebSocketOnData(JsonObject& root) { root["mqttStatus"] = mqttConnected(); } void _mqttWebSocketOnConnected(JsonObject& root) { root["mqttEnabled"] = mqttEnabled(); root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER); root["mqttPort"] = getSetting("mqttPort", MQTT_PORT); root["mqttUser"] = getSetting("mqttUser", MQTT_USER); root["mqttClientID"] = getSetting("mqttClientID"); root["mqttPassword"] = getSetting("mqttPassword", MQTT_PASS); root["mqttKeep"] = _mqtt_keepalive; root["mqttRetain"] = _mqtt_retain; root["mqttQoS"] = _mqtt_qos; #if SECURE_CLIENT != SECURE_CLIENT_NONE root["mqttUseSSL"] = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED); root["mqttFP"] = getSetting("mqttFP", MQTT_SSL_FINGERPRINT); #endif root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC); root["mqttUseJson"] = getSetting("mqttUseJson", 1 == MQTT_USE_JSON); } #endif // ----------------------------------------------------------------------------- // SETTINGS // ----------------------------------------------------------------------------- #if TERMINAL_SUPPORT void _mqttInitCommands() { terminalRegisterCommand(F("MQTT.RESET"), [](const terminal::CommandContext&) { _mqttConfigure(); mqttDisconnect(); terminalOK(); }); terminalRegisterCommand(F("MQTT.INFO"), [](const terminal::CommandContext&) { _mqttInfo(); terminalOK(); }); } #endif // TERMINAL_SUPPORT // ----------------------------------------------------------------------------- // MQTT Callbacks // ----------------------------------------------------------------------------- void _mqttCallback(unsigned int type, const char * topic, const char * payload) { if (type == MQTT_CONNECT_EVENT) { mqttSubscribe(MQTT_TOPIC_ACTION); } if (type == MQTT_MESSAGE_EVENT) { String t = mqttMagnitude(topic); if (t.equals(MQTT_TOPIC_ACTION)) { rpcHandleAction(payload); } } } bool _mqttHeartbeat(heartbeat::Mask mask) { // No point retrying, since we will be re-scheduled on connection if (!mqttConnected()) { return true; } #if NTP_SUPPORT // Backported from the older utils implementation. // Wait until the time is synced to avoid sending partial report *and* // as a result, wait until the next interval to actually send the datetime string. if ((mask & heartbeat::Report::Datetime) && !ntpSynced()) { return false; } #endif // TODO: rework old HEARTBEAT_REPEAT_STATUS? // for example: send full report once, send only the dynamic data after that // (interval, hostname, description, ssid, bssid, ip, mac, rssi, uptime, datetime, heap, loadavg, vcc) // otherwise, it is still possible by setting everything to 0 *but* the Report::Status bit // TODO: per-module mask? // TODO: simply send static data with onConnected, and the rest from here? if (mask & heartbeat::Report::Status) mqttSendStatus(); if (mask & heartbeat::Report::Interval) mqttSend(MQTT_TOPIC_INTERVAL, String(_mqtt_heartbeat_interval.count()).c_str()); if (mask & heartbeat::Report::App) mqttSend(MQTT_TOPIC_APP, APP_NAME); if (mask & heartbeat::Report::Version) mqttSend(MQTT_TOPIC_VERSION, getVersion().c_str()); if (mask & heartbeat::Report::Board) mqttSend(MQTT_TOPIC_BOARD, getBoardName().c_str()); if (mask & heartbeat::Report::Hostname) mqttSend(MQTT_TOPIC_HOSTNAME, getSetting("hostname", getIdentifier()).c_str()); if (mask & heartbeat::Report::Description) { auto desc = getSetting("desc"); if (desc.length()) { mqttSend(MQTT_TOPIC_DESCRIPTION, desc.c_str()); } } if (mask & heartbeat::Report::Ssid) mqttSend(MQTT_TOPIC_SSID, WiFi.SSID().c_str()); if (mask & heartbeat::Report::Bssid) mqttSend(MQTT_TOPIC_BSSID, WiFi.BSSIDstr().c_str()); if (mask & heartbeat::Report::Ip) mqttSend(MQTT_TOPIC_IP, getIP().c_str()); if (mask & heartbeat::Report::Mac) mqttSend(MQTT_TOPIC_MAC, WiFi.macAddress().c_str()); if (mask & heartbeat::Report::Rssi) mqttSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str()); if (mask & heartbeat::Report::Uptime) mqttSend(MQTT_TOPIC_UPTIME, String(systemUptime()).c_str()); #if NTP_SUPPORT if (mask & heartbeat::Report::Datetime) mqttSend(MQTT_TOPIC_DATETIME, ntpDateTime().c_str()); #endif if (mask & heartbeat::Report::Freeheap) { auto stats = systemHeapStats(); mqttSend(MQTT_TOPIC_FREEHEAP, String(stats.available).c_str()); } if (mask & heartbeat::Report::Loadavg) mqttSend(MQTT_TOPIC_LOADAVG, String(systemLoadAverage()).c_str()); if ((mask & heartbeat::Report::Vcc) && (ADC_MODE_VALUE == ADC_VCC)) mqttSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str()); auto status = mqttConnected(); for (auto& cb : _mqtt_heartbeat_callbacks) { status = status && cb(mask); } return status; } void _mqttOnConnect() { _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; _mqtt_last_connection = millis(); _mqtt_state = AsyncClientState::Connected; systemHeartbeat(_mqttHeartbeat, _mqtt_heartbeat_mode, _mqtt_heartbeat_interval); // Notify all subscribers about the connection for (auto& callback : _mqtt_callbacks) { callback(MQTT_CONNECT_EVENT, nullptr, nullptr); } DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); } void _mqttOnDisconnect() { #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT _mqtt_publish_callbacks.clear(); _mqtt_subscribe_callbacks.clear(); #endif _mqtt_last_connection = millis(); _mqtt_state = AsyncClientState::Disconnected; systemStopHeartbeat(_mqttHeartbeat); // Notify all subscribers about the disconnect for (auto& callback : _mqtt_callbacks) { callback(MQTT_DISCONNECT_EVENT, nullptr, nullptr); } DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n")); } #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT // Run the associated callback when message PID is acknowledged by the broker void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) { if (callbacks.empty()) { return; } auto end = callbacks.end(); auto prev = callbacks.before_begin(); auto it = callbacks.begin(); while (it != end) { if ((*it).pid == pid) { (*it).run(); it = callbacks.erase_after(prev); } else { prev = it; ++it; } } } #endif // Force-skip everything received in a short window right after connecting to avoid syncronization issues. bool _mqttMaybeSkipRetained(char* topic) { if (_mqtt_skip_messages && (millis() - _mqtt_last_connection < _mqtt_skip_time)) { DEBUG_MSG_P(PSTR("[MQTT] Received %s - SKIPPED\n"), topic); return true; } _mqtt_skip_messages = false; return false; } #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT // MQTT Broker can sometimes send messages in bulk. Even when message size is less than MQTT_BUFFER_MAX_SIZE, we *could* // receive a message with `len != total`, this requiring buffering of the received data. Prepare a static memory to store the // data until `(len + index) == total`. // TODO: One pending issue is streaming arbitrary data (e.g. binary, for OTA). We always set '\0' and API consumer expects C-String. // In that case, there could be MQTT_MESSAGE_RAW_EVENT and this callback only trigger on small messages. // TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks. void _mqttOnMessageAsync(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { if (!len || (len > MQTT_BUFFER_MAX_SIZE) || (total > MQTT_BUFFER_MAX_SIZE)) return; if (_mqttMaybeSkipRetained(topic)) return; static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0}; memmove(message + index, (char *) payload, len); // Not done yet if (total != (len + index)) { DEBUG_MSG_P(PSTR("[MQTT] Buffered %s => %u / %u bytes\n"), topic, len, total); return; } message[len + index] = '\0'; DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message); // Call subscribers with the message buffer for (auto& callback : _mqtt_callbacks) { callback(MQTT_MESSAGE_EVENT, topic, message); } } #else // Sync client already implements buffering, but we still need to add '\0' because API consumer expects C-String :/ // TODO: consider reworking this (and async counterpart), giving callback func length of the message. void _mqttOnMessage(char* topic, char* payload, unsigned int len) { if (!len || (len > MQTT_BUFFER_MAX_SIZE)) return; if (_mqttMaybeSkipRetained(topic)) return; static char message[((MQTT_BUFFER_MAX_SIZE + 1) + 31) & -32] = {0}; memmove(message, (char *) payload, len); message[len] = '\0'; DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message); // Call subscribers with the message buffer for (auto& callback : _mqtt_callbacks) { callback(MQTT_MESSAGE_EVENT, topic, message); } } #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT // ----------------------------------------------------------------------------- // Public API // ----------------------------------------------------------------------------- /** Returns the magnitude part of a topic @param topic the full MQTT topic @return String object with the magnitude part. */ String mqttMagnitude(const char* topic) { String output; String pattern = _mqtt_topic + _mqtt_setter; int position = pattern.indexOf("#"); if (position >= 0) { String start = pattern.substring(0, position); String end = pattern.substring(position + 1); String magnitude(topic); if (magnitude.startsWith(start) && magnitude.endsWith(end)) { magnitude.replace(start, ""); magnitude.replace(end, ""); output = std::move(magnitude); } } return output; } /** Returns a full MQTT topic from the magnitude @param magnitude the magnitude part of the topic. @param is_set whether to build a command topic (true) or a state topic (false). @return String full MQTT topic. */ String mqttTopic(const char* magnitude, bool is_set) { String output; output.reserve(strlen(magnitude) + _mqtt_topic.length() + _mqtt_setter.length() + _mqtt_getter.length()); output += _mqtt_topic; output.replace("#", magnitude); output += is_set ? _mqtt_setter : _mqtt_getter; return output; } /** Returns a full MQTT topic from the magnitude @param magnitude the magnitude part of the topic. @param index index of the magnitude when more than one such magnitudes. @param is_set whether to build a command topic (true) or a state topic (false). @return String full MQTT topic. */ String mqttTopic(const char* magnitude, unsigned int index, bool is_set) { String output; output.reserve(strlen(magnitude) + (sizeof(decltype(index)) * 4)); output += magnitude; output += '/'; output += index; return mqttTopic(output.c_str(), is_set); } // ----------------------------------------------------------------------------- uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int qos) { constexpr size_t MessageLogMax { 128ul }; if (_mqtt.connected()) { const unsigned int packetId { #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT _mqtt.publish(topic, qos, retain, message) #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT _mqtt.publish(topic, message, retain, qos) #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT _mqtt.publish(topic, message, retain) #endif }; const size_t message_len = strlen(message); if (message_len > MessageLogMax) { DEBUG_MSG_P(PSTR("[MQTT] Sending %s => (%u bytes) (PID %u)\n"), topic, message_len, packetId); } else { DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %u)\n"), topic, message, packetId); } return packetId; } return false; } uint16_t mqttSendRaw(const char * topic, const char * message, bool retain) { return mqttSendRaw(topic, message, retain, _mqtt_qos); } uint16_t mqttSendRaw(const char * topic, const char * message) { return mqttSendRaw(topic, message, _mqtt_retain, _mqtt_qos); } bool mqttSend(const char * topic, const char * message, bool force, bool retain) { if (!force && _mqtt_use_json) { mqttEnqueue(topic, message); _mqtt_json_payload_flush.once_ms(MQTT_USE_JSON_DELAY, mqttFlush); return true; } return mqttSendRaw(mqttTopic(topic, false).c_str(), message, retain) > 0; } bool mqttSend(const char * topic, const char * message, bool force) { return mqttSend(topic, message, force, _mqtt_retain); } bool mqttSend(const char * topic, const char * message) { return mqttSend(topic, message, false); } bool mqttSend(const char * topic, unsigned int index, const char * message, bool force, bool retain) { char buffer[strlen(topic)+5]; snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index); return mqttSend(buffer, message, force, retain); } bool mqttSend(const char * topic, unsigned int index, const char * message, bool force) { return mqttSend(topic, index, message, force, _mqtt_retain); } bool mqttSend(const char * topic, unsigned int index, const char * message) { return mqttSend(topic, index, message, false); } // ----------------------------------------------------------------------------- constexpr size_t MqttJsonPayloadBufferSize { 1024ul }; void mqttFlush() { if (!_mqtt.connected()) { return; } if (_mqtt_json_payload.empty()) { return; } DynamicJsonBuffer jsonBuffer(MqttJsonPayloadBufferSize); JsonObject& root = jsonBuffer.createObject(); #if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME if (ntpSynced()) { root[MQTT_TOPIC_DATETIME] = ntpDateTime(); } #endif #if MQTT_ENQUEUE_MAC root[MQTT_TOPIC_MAC] = WiFi.macAddress(); #endif #if MQTT_ENQUEUE_HOSTNAME root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname", getIdentifier()); #endif #if MQTT_ENQUEUE_IP root[MQTT_TOPIC_IP] = getIP(); #endif #if MQTT_ENQUEUE_MESSAGE_ID root[MQTT_TOPIC_MESSAGE_ID] = (Rtcmem->mqtt)++; #endif for (auto& payload : _mqtt_json_payload) { root[payload.topic().c_str()] = payload.message().c_str(); } String output; root.printTo(output); jsonBuffer.clear(); _mqtt_json_payload_count = 0; _mqtt_json_payload.clear(); mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str(), false); } void mqttEnqueue(const char* topic, const char* message) { // Queue is not meant to send message "offline" // We must prevent the queue does not get full while offline if (_mqtt.connected()) { if (_mqtt_json_payload_count >= MQTT_QUEUE_MAX_SIZE) { mqttFlush(); } _mqtt_json_payload.remove_if([topic](const MqttPayload& payload) { return payload.topic() == topic; }); _mqtt_json_payload.emplace_front(topic, message); ++_mqtt_json_payload_count; } } // ----------------------------------------------------------------------------- // Only async client returns resulting PID, sync libraries return either success (1) or failure (0) uint16_t mqttSubscribeRaw(const char* topic, int qos) { uint16_t pid { 0u }; if (_mqtt.connected() && (strlen(topic) > 0)) { pid = _mqtt.subscribe(topic, qos); DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, pid); } return pid; } uint16_t mqttSubscribeRaw(const char* topic) { return mqttSubscribeRaw(topic, _mqtt_qos); } bool mqttSubscribe(const char * topic) { return mqttSubscribeRaw(mqttTopic(topic, true).c_str(), _mqtt_qos); } uint16_t mqttUnsubscribeRaw(const char * topic) { uint16_t pid { 0u }; if (_mqtt.connected() && (strlen(topic) > 0)) { pid = _mqtt.unsubscribe(topic); DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing from %s (PID %d)\n"), topic, pid); } return pid; } bool mqttUnsubscribe(const char * topic) { return mqttUnsubscribeRaw(mqttTopic(topic, true).c_str()); } // ----------------------------------------------------------------------------- void mqttEnabled(bool status) { _mqtt_enabled = status; } bool mqttEnabled() { return _mqtt_enabled; } bool mqttConnected() { return _mqtt.connected(); } void mqttDisconnect() { if (_mqtt.connected()) { DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n")); _mqtt.disconnect(); } } bool mqttForward() { return _mqtt_forward; } /** Register a persistent lifecycle callback @param standalone function pointer */ void mqttRegister(mqtt_callback_f callback) { _mqtt_callbacks.push_front(callback); } #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT /** Register a temporary publish callback @param callable object */ void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f callback) { auto callable = MqttPidCallback { pid, callback }; _mqtt_publish_callbacks.push_front(std::move(callable)); } /** Register a temporary subscribe callback @param callable object */ void mqttOnSubscribe(uint16_t pid, mqtt_pid_callback_f callback) { auto callable = MqttPidCallback { pid, callback }; _mqtt_subscribe_callbacks.push_front(std::move(callable)); } #endif void mqttSetBroker(IPAddress ip, uint16_t port) { setSetting("mqttServer", ip.toString()); _mqtt_server = ip.toString(); setSetting("mqttPort", port); _mqtt_port = port; mqttEnabled(1 == MQTT_AUTOCONNECT); } void mqttSetBrokerIfNone(IPAddress ip, uint16_t port) { if (getSetting("mqttServer", MQTT_SERVER).length() == 0) { mqttSetBroker(ip, port); } } // TODO: these strings are only updated after running the configuration routine and when MQTT is *enabled* const String& mqttPayloadOnline() { return _mqtt_payload_online; } const String& mqttPayloadOffline() { return _mqtt_payload_offline; } const char* mqttPayloadStatus(bool status) { return status ? _mqtt_payload_online.c_str() : _mqtt_payload_offline.c_str(); } void mqttSendStatus() { mqttSend(MQTT_TOPIC_STATUS, _mqtt_payload_online.c_str(), true); } // ----------------------------------------------------------------------------- // Initialization // ----------------------------------------------------------------------------- void _mqttConnect() { // Do not connect if already connected or still trying to connect if (_mqtt.connected() || (_mqtt_state != AsyncClientState::Disconnected)) return; // Do not connect if disabled or no WiFi if (!_mqtt_enabled || (WiFi.status() != WL_CONNECTED)) return; // Check reconnect interval if (millis() - _mqtt_last_connection < _mqtt_reconnect_delay) return; // Increase the reconnect delay _mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP; if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) { _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX; } DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%hu\n"), _mqtt_server.c_str(), _mqtt_port); DEBUG_MSG_P(PSTR("[MQTT] Client ID: %s\n"), _mqtt_clientid.c_str()); DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), _mqtt_qos); DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %c\n"), _mqtt_retain ? 'Y' : 'N'); DEBUG_MSG_P(PSTR("[MQTT] Keepalive time: %hu (s)\n"), _mqtt_keepalive); DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will.c_str()); _mqtt_state = AsyncClientState::Connecting; _mqtt_skip_messages = (_mqtt_skip_time > 0); #if SECURE_CLIENT != SECURE_CLIENT_NONE const bool secure = getSetting("mqttUseSSL", 1 == MQTT_SSL_ENABLED); #else const bool secure = false; #endif #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT _mqttSetupAsyncClient(secure); #elif (MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT) || (MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT) if (_mqttSetupSyncClient(secure) && _mqttConnectSyncClient(secure)) { _mqttOnConnect(); } else { DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n")); _mqttOnDisconnect(); } #else #error "please check that MQTT_LIBRARY is valid" #endif } void mqttLoop() { #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT _mqttConnect(); #else if (_mqtt.connected()) { _mqtt.loop(); } else { if (_mqtt_state != AsyncClientState::Disconnected) { _mqttOnDisconnect(); } _mqttConnect(); } #endif } void mqttHeartbeat(heartbeat::Callback callback) { _mqtt_heartbeat_callbacks.push_front(callback); } void mqttSetup() { _mqttBackwards(); _mqttInfo(); #if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT // XXX: should not place this in config, addServerFingerprint does not check for duplicates #if SECURE_CLIENT != SECURE_CLIENT_NONE { if (_mqtt_sc_config.on_fingerprint) { const String fingerprint = _mqtt_sc_config.on_fingerprint(); uint8_t buffer[20] = {0}; if (sslFingerPrintArray(fingerprint.c_str(), buffer)) { _mqtt.addServerFingerprint(buffer); } } } #endif // SECURE_CLIENT != SECURE_CLIENT_NONE _mqtt.onMessage(_mqttOnMessageAsync); _mqtt.onConnect([](bool) { _mqttOnConnect(); }); _mqtt.onSubscribe([](uint16_t pid, int) { _mqttPidCallback(_mqtt_subscribe_callbacks, pid); }); _mqtt.onPublish([](uint16_t pid) { _mqttPidCallback(_mqtt_publish_callbacks, pid); }); _mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { switch (reason) { case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED: DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n")); break; case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED: DEBUG_MSG_P(PSTR("[MQTT] Identifier Rejected\n")); break; case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE: DEBUG_MSG_P(PSTR("[MQTT] Server unavailable\n")); break; case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS: DEBUG_MSG_P(PSTR("[MQTT] Malformed credentials\n")); break; case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED: DEBUG_MSG_P(PSTR("[MQTT] Not authorized\n")); break; case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT: #if ASYNC_TCP_SSL_ENABLED DEBUG_MSG_P(PSTR("[MQTT] Bad fingerprint\n")); #endif break; case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION: // This is never used by the AsyncMqttClient source #if 0 DEBUG_MSG_P(PSTR("[MQTT] Unacceptable protocol version\n")); #endif break; case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE: DEBUG_MSG_P(PSTR("[MQTT] Connect packet too big\n")); break; } _mqttOnDisconnect(); }); #elif MQTT_LIBRARY == MQTT_LIBRARY_ARDUINOMQTT _mqtt.onMessageAdvanced([](MQTTClient* , char topic[], char payload[], int length) { _mqttOnMessage(topic, payload, length); }); #elif MQTT_LIBRARY == MQTT_LIBRARY_PUBSUBCLIENT _mqtt.setCallback([](char* topic, byte* payload, unsigned int length) { _mqttOnMessage(topic, (char *) payload, length); }); #endif // MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT _mqttConfigure(); mqttRegister(_mqttCallback); #if WEB_SUPPORT wsRegister() .onVisible(_mqttWebSocketOnVisible) .onData(_mqttWebSocketOnData) .onConnected(_mqttWebSocketOnConnected) .onKeyCheck(_mqttWebSocketOnKeyCheck); mqttRegister([](unsigned int type, const char*, const char*) { if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) { wsPost(_mqttWebSocketOnData); } }); #endif #if TERMINAL_SUPPORT _mqttInitCommands(); #endif // Main callbacks espurnaRegisterLoop(mqttLoop); espurnaRegisterReload(_mqttConfigure); } #endif // MQTT_SUPPORT