|
@ -12,37 +12,35 @@ Copyright (C) 2016-2017 by Xose Pérez <xose dot perez at gmail dot com> |
|
|
#include <vector>
|
|
|
#include <vector>
|
|
|
#include <Ticker.h>
|
|
|
#include <Ticker.h>
|
|
|
|
|
|
|
|
|
const char *mqtt_user = 0; |
|
|
|
|
|
const char *mqtt_pass = 0; |
|
|
|
|
|
|
|
|
|
|
|
#if MQTT_USE_ASYNC // Using AsyncMqttClient
|
|
|
#if MQTT_USE_ASYNC // Using AsyncMqttClient
|
|
|
|
|
|
|
|
|
#include <AsyncMqttClient.h>
|
|
|
#include <AsyncMqttClient.h>
|
|
|
AsyncMqttClient mqtt; |
|
|
|
|
|
|
|
|
AsyncMqttClient _mqtt; |
|
|
|
|
|
|
|
|
#else // Using PubSubClient
|
|
|
#else // Using PubSubClient
|
|
|
|
|
|
|
|
|
#include <PubSubClient.h>
|
|
|
#include <PubSubClient.h>
|
|
|
PubSubClient mqtt; |
|
|
|
|
|
bool _mqttConnected = false; |
|
|
|
|
|
|
|
|
PubSubClient _mqtt; |
|
|
|
|
|
bool _mqtt_connected = false; |
|
|
|
|
|
|
|
|
WiFiClient _mqttClient; |
|
|
|
|
|
|
|
|
WiFiClient _mqtt_client; |
|
|
#if ASYNC_TCP_SSL_ENABLED
|
|
|
#if ASYNC_TCP_SSL_ENABLED
|
|
|
WiFiClientSecure _mqttClientSecure; |
|
|
|
|
|
|
|
|
WiFiClientSecure _mqtt_client_secure; |
|
|
#endif // ASYNC_TCP_SSL_ENABLED
|
|
|
#endif // ASYNC_TCP_SSL_ENABLED
|
|
|
|
|
|
|
|
|
#endif // MQTT_USE_ASYNC
|
|
|
#endif // MQTT_USE_ASYNC
|
|
|
|
|
|
|
|
|
bool _mqttEnabled = MQTT_ENABLED; |
|
|
|
|
|
String _mqttTopic; |
|
|
|
|
|
String _mqttSetter; |
|
|
|
|
|
String _mqttGetter; |
|
|
|
|
|
bool _mqttForward; |
|
|
|
|
|
char *_mqttUser = 0; |
|
|
|
|
|
char *_mqttPass = 0; |
|
|
|
|
|
char *_mqttWill; |
|
|
|
|
|
|
|
|
bool _mqtt_enabled = MQTT_ENABLED; |
|
|
|
|
|
unsigned char _mqtt_connection_tries = 0; |
|
|
|
|
|
String _mqtt_topic; |
|
|
|
|
|
String _mqtt_setter; |
|
|
|
|
|
String _mqtt_getter; |
|
|
|
|
|
bool _mqtt_forward; |
|
|
|
|
|
char *_mqtt_user = 0; |
|
|
|
|
|
char *_mqtt_pass = 0; |
|
|
|
|
|
char *_mqtt_will; |
|
|
#if MQTT_SKIP_RETAINED
|
|
|
#if MQTT_SKIP_RETAINED
|
|
|
unsigned long _mqttConnectedAt = 0; |
|
|
|
|
|
|
|
|
unsigned long _mqtt_connected_at = 0; |
|
|
#endif
|
|
|
#endif
|
|
|
|
|
|
|
|
|
std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks; |
|
|
std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks; |
|
@ -52,51 +50,51 @@ typedef struct { |
|
|
char * message; |
|
|
char * message; |
|
|
} mqtt_message_t; |
|
|
} mqtt_message_t; |
|
|
std::vector<mqtt_message_t> _mqtt_queue; |
|
|
std::vector<mqtt_message_t> _mqtt_queue; |
|
|
Ticker _mqttFlushTicker; |
|
|
|
|
|
|
|
|
Ticker _mqtt_flush_ticker; |
|
|
|
|
|
|
|
|
// -----------------------------------------------------------------------------
|
|
|
// -----------------------------------------------------------------------------
|
|
|
// Public API
|
|
|
// Public API
|
|
|
// -----------------------------------------------------------------------------
|
|
|
// -----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
bool mqttConnected() { |
|
|
bool mqttConnected() { |
|
|
return mqtt.connected(); |
|
|
|
|
|
|
|
|
return _mqtt.connected(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void mqttDisconnect() { |
|
|
void mqttDisconnect() { |
|
|
if (mqtt.connected()) { |
|
|
|
|
|
|
|
|
if (_mqtt.connected()) { |
|
|
DEBUG_MSG_P("[MQTT] Disconnecting\n"); |
|
|
DEBUG_MSG_P("[MQTT] Disconnecting\n"); |
|
|
mqtt.disconnect(); |
|
|
|
|
|
|
|
|
_mqtt.disconnect(); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
bool mqttForward() { |
|
|
bool mqttForward() { |
|
|
return _mqttForward; |
|
|
|
|
|
|
|
|
return _mqtt_forward; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
String mqttSubtopic(char * topic) { |
|
|
String mqttSubtopic(char * topic) { |
|
|
String response; |
|
|
String response; |
|
|
String t = String(topic); |
|
|
String t = String(topic); |
|
|
if (t.startsWith(_mqttTopic) && t.endsWith(_mqttSetter)) { |
|
|
|
|
|
response = t.substring(_mqttTopic.length(), t.length() - _mqttSetter.length()); |
|
|
|
|
|
|
|
|
if (t.startsWith(_mqtt_topic) && t.endsWith(_mqtt_setter)) { |
|
|
|
|
|
response = t.substring(_mqtt_topic.length(), t.length() - _mqtt_setter.length()); |
|
|
} |
|
|
} |
|
|
return response; |
|
|
return response; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void mqttSendRaw(const char * topic, const char * message) { |
|
|
void mqttSendRaw(const char * topic, const char * message) { |
|
|
if (mqtt.connected()) { |
|
|
|
|
|
|
|
|
if (_mqtt.connected()) { |
|
|
#if MQTT_USE_ASYNC
|
|
|
#if MQTT_USE_ASYNC
|
|
|
unsigned int packetId = mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message); |
|
|
|
|
|
|
|
|
unsigned int packetId = _mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId); |
|
|
#else
|
|
|
#else
|
|
|
mqtt.publish(topic, message, MQTT_RETAIN); |
|
|
|
|
|
|
|
|
_mqtt.publish(topic, message, MQTT_RETAIN); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message); |
|
|
#endif
|
|
|
#endif
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
String getTopic(const char * topic, bool set) { |
|
|
String getTopic(const char * topic, bool set) { |
|
|
String output = _mqttTopic + String(topic); |
|
|
|
|
|
if (set) output += _mqttSetter; |
|
|
|
|
|
|
|
|
String output = _mqtt_topic + String(topic); |
|
|
|
|
|
if (set) output += _mqtt_setter; |
|
|
return output; |
|
|
return output; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -124,7 +122,7 @@ void _mqttFlush() { |
|
|
|
|
|
|
|
|
String output; |
|
|
String output; |
|
|
root.printTo(output); |
|
|
root.printTo(output); |
|
|
String path = _mqttTopic + String(MQTT_TOPIC_JSON); |
|
|
|
|
|
|
|
|
String path = _mqtt_topic + String(MQTT_TOPIC_JSON); |
|
|
mqttSendRaw(path.c_str(), output.c_str()); |
|
|
mqttSendRaw(path.c_str(), output.c_str()); |
|
|
|
|
|
|
|
|
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { |
|
|
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) { |
|
@ -143,9 +141,9 @@ void mqttSend(const char * topic, const char * message, bool force) { |
|
|
element.topic = strdup(topic); |
|
|
element.topic = strdup(topic); |
|
|
element.message = strdup(message); |
|
|
element.message = strdup(message); |
|
|
_mqtt_queue.push_back(element); |
|
|
_mqtt_queue.push_back(element); |
|
|
_mqttFlushTicker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); |
|
|
|
|
|
|
|
|
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush); |
|
|
} else { |
|
|
} else { |
|
|
String path = _mqttTopic + String(topic) + _mqttGetter; |
|
|
|
|
|
|
|
|
String path = _mqtt_topic + String(topic) + _mqtt_getter; |
|
|
mqttSendRaw(path.c_str(), message); |
|
|
mqttSendRaw(path.c_str(), message); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -165,19 +163,19 @@ void mqttSend(const char * topic, unsigned int index, const char * message) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void mqttSubscribeRaw(const char * topic) { |
|
|
void mqttSubscribeRaw(const char * topic) { |
|
|
if (mqtt.connected() && (strlen(topic) > 0)) { |
|
|
|
|
|
|
|
|
if (_mqtt.connected() && (strlen(topic) > 0)) { |
|
|
#if MQTT_USE_ASYNC
|
|
|
#if MQTT_USE_ASYNC
|
|
|
unsigned int packetId = mqtt.subscribe(topic, MQTT_QOS); |
|
|
|
|
|
|
|
|
unsigned int packetId = _mqtt.subscribe(topic, MQTT_QOS); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId); |
|
|
#else
|
|
|
#else
|
|
|
mqtt.subscribe(topic, MQTT_QOS); |
|
|
|
|
|
|
|
|
_mqtt.subscribe(topic, MQTT_QOS); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic); |
|
|
#endif
|
|
|
#endif
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void mqttSubscribe(const char * topic) { |
|
|
void mqttSubscribe(const char * topic) { |
|
|
String path = _mqttTopic + String(topic) + _mqttSetter; |
|
|
|
|
|
|
|
|
String path = _mqtt_topic + String(topic) + _mqtt_setter; |
|
|
mqttSubscribeRaw(path.c_str()); |
|
|
mqttSubscribeRaw(path.c_str()); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -220,7 +218,7 @@ void _mqttOnConnect() { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n")); |
|
|
|
|
|
|
|
|
#if MQTT_SKIP_RETAINED
|
|
|
#if MQTT_SKIP_RETAINED
|
|
|
_mqttConnectedAt = millis(); |
|
|
|
|
|
|
|
|
_mqtt_connected_at = millis(); |
|
|
#endif
|
|
|
#endif
|
|
|
|
|
|
|
|
|
// Send first Heartbeat
|
|
|
// Send first Heartbeat
|
|
@ -252,7 +250,7 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) { |
|
|
strlcpy(message, (char *) payload, len + 1); |
|
|
strlcpy(message, (char *) payload, len + 1); |
|
|
|
|
|
|
|
|
#if MQTT_SKIP_RETAINED
|
|
|
#if MQTT_SKIP_RETAINED
|
|
|
if (millis() - _mqttConnectedAt < MQTT_SKIP_TIME) { |
|
|
|
|
|
|
|
|
if (millis() - _mqtt_connected_at < MQTT_SKIP_TIME) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message); |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
@ -308,67 +306,66 @@ bool mqttFormatFP(const char * fingerprint, char * destination) { |
|
|
#endif
|
|
|
#endif
|
|
|
|
|
|
|
|
|
void mqttEnabled(bool status) { |
|
|
void mqttEnabled(bool status) { |
|
|
_mqttEnabled = status; |
|
|
|
|
|
|
|
|
_mqtt_enabled = status; |
|
|
setSetting("mqttEnabled", status ? 1 : 0); |
|
|
setSetting("mqttEnabled", status ? 1 : 0); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
bool mqttEnabled() { |
|
|
bool mqttEnabled() { |
|
|
return _mqttEnabled; |
|
|
|
|
|
|
|
|
return _mqtt_enabled; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void mqttConnect() { |
|
|
void mqttConnect() { |
|
|
|
|
|
|
|
|
if (_mqttEnabled & !mqtt.connected()) { |
|
|
|
|
|
|
|
|
if (_mqtt_enabled & !_mqtt.connected()) { |
|
|
|
|
|
|
|
|
// Disable MQTT after MQTT_MAX_TRIES attemps in a row
|
|
|
// Disable MQTT after MQTT_MAX_TRIES attemps in a row
|
|
|
#if MQTT_MAX_TRIES > 0
|
|
|
#if MQTT_MAX_TRIES > 0
|
|
|
static unsigned int tries = 0; |
|
|
|
|
|
static unsigned long last_try = millis(); |
|
|
static unsigned long last_try = millis(); |
|
|
if (millis() - last_try < MQTT_TRY_INTERVAL) { |
|
|
if (millis() - last_try < MQTT_TRY_INTERVAL) { |
|
|
if (++tries > MQTT_MAX_TRIES) { |
|
|
|
|
|
|
|
|
if (++_mqtt_connection_tries > MQTT_MAX_TRIES) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] MQTT_MAX_TRIES met, disabling MQTT\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] MQTT_MAX_TRIES met, disabling MQTT\n")); |
|
|
mqttEnabled(false); |
|
|
mqttEnabled(false); |
|
|
tries = 0; |
|
|
|
|
|
|
|
|
_mqtt_connection_tries = 0; |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
tries = 0; |
|
|
|
|
|
|
|
|
_mqtt_connection_tries = 0; |
|
|
} |
|
|
} |
|
|
last_try = millis(); |
|
|
last_try = millis(); |
|
|
#endif
|
|
|
#endif
|
|
|
|
|
|
|
|
|
if (_mqttUser) free(_mqttUser); |
|
|
|
|
|
if (_mqttPass) free(_mqttPass); |
|
|
|
|
|
|
|
|
if (_mqtt_user) free(_mqtt_user); |
|
|
|
|
|
if (_mqtt_pass) free(_mqtt_pass); |
|
|
|
|
|
|
|
|
char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str()); |
|
|
char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str()); |
|
|
if (strlen(host) == 0) return; |
|
|
if (strlen(host) == 0) return; |
|
|
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt(); |
|
|
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt(); |
|
|
_mqttUser = strdup(getSetting("mqttUser").c_str()); |
|
|
|
|
|
_mqttPass = strdup(getSetting("mqttPassword").c_str()); |
|
|
|
|
|
if (_mqttWill) free(_mqttWill); |
|
|
|
|
|
_mqttWill = strdup((_mqttTopic + MQTT_TOPIC_STATUS).c_str()); |
|
|
|
|
|
|
|
|
_mqtt_user = strdup(getSetting("mqttUser").c_str()); |
|
|
|
|
|
_mqtt_pass = strdup(getSetting("mqttPassword").c_str()); |
|
|
|
|
|
if (_mqtt_will) free(_mqtt_will); |
|
|
|
|
|
_mqtt_will = strdup((_mqtt_topic + MQTT_TOPIC_STATUS).c_str()); |
|
|
|
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port); |
|
|
|
|
|
|
|
|
#if MQTT_USE_ASYNC
|
|
|
#if MQTT_USE_ASYNC
|
|
|
|
|
|
|
|
|
mqtt.setServer(host, port); |
|
|
|
|
|
mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false); |
|
|
|
|
|
mqtt.setWill(_mqttWill, MQTT_QOS, MQTT_RETAIN, "0"); |
|
|
|
|
|
if ((strlen(_mqttUser) > 0) && (strlen(_mqttPass) > 0)) { |
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqttUser); |
|
|
|
|
|
mqtt.setCredentials(_mqttUser, _mqttPass); |
|
|
|
|
|
|
|
|
_mqtt.setServer(host, port); |
|
|
|
|
|
_mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false); |
|
|
|
|
|
_mqtt.setWill(_mqtt_will, MQTT_QOS, MQTT_RETAIN, "0"); |
|
|
|
|
|
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) { |
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user); |
|
|
|
|
|
_mqtt.setCredentials(_mqtt_user, _mqtt_pass); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#if ASYNC_TCP_SSL_ENABLED
|
|
|
#if ASYNC_TCP_SSL_ENABLED
|
|
|
|
|
|
|
|
|
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; |
|
|
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; |
|
|
mqtt.setSecure(secure); |
|
|
|
|
|
|
|
|
_mqtt.setSecure(secure); |
|
|
if (secure) { |
|
|
if (secure) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); |
|
|
unsigned char fp[20] = {0}; |
|
|
unsigned char fp[20] = {0}; |
|
|
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { |
|
|
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { |
|
|
mqtt.addServerFingerprint(fp); |
|
|
|
|
|
|
|
|
_mqtt.addServerFingerprint(fp); |
|
|
} else { |
|
|
} else { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n")); |
|
|
} |
|
|
} |
|
@ -376,7 +373,7 @@ void mqttConnect() { |
|
|
|
|
|
|
|
|
#endif // ASYNC_TCP_SSL_ENABLED
|
|
|
#endif // ASYNC_TCP_SSL_ENABLED
|
|
|
|
|
|
|
|
|
mqtt.connect(); |
|
|
|
|
|
|
|
|
_mqtt.connect(); |
|
|
|
|
|
|
|
|
#else // not MQTT_USE_ASYNC
|
|
|
#else // not MQTT_USE_ASYNC
|
|
|
|
|
|
|
|
@ -387,11 +384,11 @@ void mqttConnect() { |
|
|
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; |
|
|
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1; |
|
|
if (secure) { |
|
|
if (secure) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n")); |
|
|
if (_mqttClientSecure.connect(host, port)) { |
|
|
|
|
|
|
|
|
if (_mqtt_client_secure.connect(host, port)) { |
|
|
char fp[60] = {0}; |
|
|
char fp[60] = {0}; |
|
|
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { |
|
|
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) { |
|
|
if (_mqttClientSecure.verify(fp, host)) { |
|
|
|
|
|
mqtt.setClient(_mqttClientSecure); |
|
|
|
|
|
|
|
|
if (_mqtt_client_secure.verify(fp, host)) { |
|
|
|
|
|
_mqtt.setClient(_mqtt_client_secure); |
|
|
} else { |
|
|
} else { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n")); |
|
|
response = false; |
|
|
response = false; |
|
@ -406,31 +403,31 @@ void mqttConnect() { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} else { |
|
|
} else { |
|
|
mqtt.setClient(_mqttClient); |
|
|
|
|
|
|
|
|
_mqtt.setClient(_mqtt_client); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#else // not ASYNC_TCP_SSL_ENABLED
|
|
|
#else // not ASYNC_TCP_SSL_ENABLED
|
|
|
|
|
|
|
|
|
mqtt.setClient(_mqttClient); |
|
|
|
|
|
|
|
|
_mqtt.setClient(_mqtt_client); |
|
|
|
|
|
|
|
|
#endif // ASYNC_TCP_SSL_ENABLED
|
|
|
#endif // ASYNC_TCP_SSL_ENABLED
|
|
|
|
|
|
|
|
|
if (response) { |
|
|
if (response) { |
|
|
|
|
|
|
|
|
mqtt.setServer(host, port); |
|
|
|
|
|
|
|
|
_mqtt.setServer(host, port); |
|
|
|
|
|
|
|
|
if ((strlen(_mqttUser) > 0) && (strlen(_mqttPass) > 0)) { |
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqttUser); |
|
|
|
|
|
response = mqtt.connect(getIdentifier().c_str(), _mqttUser, _mqttPass, _mqttWill, MQTT_QOS, MQTT_RETAIN, "0"); |
|
|
|
|
|
|
|
|
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) { |
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user); |
|
|
|
|
|
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_user, _mqtt_pass, _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0"); |
|
|
} else { |
|
|
} else { |
|
|
response = mqtt.connect(getIdentifier().c_str(), _mqttWill, MQTT_QOS, MQTT_RETAIN, "0"); |
|
|
|
|
|
|
|
|
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (response) { |
|
|
if (response) { |
|
|
_mqttOnConnect(); |
|
|
_mqttOnConnect(); |
|
|
_mqttConnected = true; |
|
|
|
|
|
|
|
|
_mqtt_connected = true; |
|
|
} else { |
|
|
} else { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n")); |
|
|
} |
|
|
} |
|
@ -446,20 +443,21 @@ void mqttConnect() { |
|
|
void mqttConfigure() { |
|
|
void mqttConfigure() { |
|
|
|
|
|
|
|
|
// Replace identifier
|
|
|
// Replace identifier
|
|
|
_mqttTopic = getSetting("mqttTopic", MQTT_TOPIC); |
|
|
|
|
|
_mqttTopic.replace("{identifier}", getSetting("hostname")); |
|
|
|
|
|
if (!_mqttTopic.endsWith("/")) _mqttTopic = _mqttTopic + "/"; |
|
|
|
|
|
|
|
|
_mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC); |
|
|
|
|
|
_mqtt_topic.replace("{identifier}", getSetting("hostname")); |
|
|
|
|
|
if (!_mqtt_topic.endsWith("/")) _mqtt_topic = _mqtt_topic + "/"; |
|
|
|
|
|
|
|
|
// Getters and setters
|
|
|
// Getters and setters
|
|
|
_mqttSetter = getSetting("mqttSetter", MQTT_USE_SETTER); |
|
|
|
|
|
_mqttGetter = getSetting("mqttGetter", MQTT_USE_GETTER); |
|
|
|
|
|
_mqttForward = !_mqttGetter.equals(_mqttSetter); |
|
|
|
|
|
|
|
|
_mqtt_setter = getSetting("mqttSetter", MQTT_USE_SETTER); |
|
|
|
|
|
_mqtt_getter = getSetting("mqttGetter", MQTT_USE_GETTER); |
|
|
|
|
|
_mqtt_forward = !_mqtt_getter.equals(_mqtt_setter); |
|
|
|
|
|
|
|
|
// Enable
|
|
|
// Enable
|
|
|
|
|
|
_mqtt_connection_tries = 0; |
|
|
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) { |
|
|
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) { |
|
|
mqttEnabled(false); |
|
|
mqttEnabled(false); |
|
|
} else { |
|
|
} else { |
|
|
_mqttEnabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1; |
|
|
|
|
|
|
|
|
_mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
@ -491,10 +489,10 @@ void mqttSetup() { |
|
|
|
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using ASYNC MQTT library\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using ASYNC MQTT library\n")); |
|
|
|
|
|
|
|
|
mqtt.onConnect([](bool sessionPresent) { |
|
|
|
|
|
|
|
|
_mqtt.onConnect([](bool sessionPresent) { |
|
|
_mqttOnConnect(); |
|
|
_mqttOnConnect(); |
|
|
}); |
|
|
}); |
|
|
mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { |
|
|
|
|
|
|
|
|
_mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) { |
|
|
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) { |
|
|
if (reason == AsyncMqttClientDisconnectReason::TCP_DISCONNECTED) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] TCP Disconnected\n")); |
|
|
} |
|
|
} |
|
@ -517,13 +515,13 @@ void mqttSetup() { |
|
|
#endif
|
|
|
#endif
|
|
|
_mqttOnDisconnect(); |
|
|
_mqttOnDisconnect(); |
|
|
}); |
|
|
}); |
|
|
mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { |
|
|
|
|
|
|
|
|
_mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { |
|
|
_mqttOnMessage(topic, payload, len); |
|
|
_mqttOnMessage(topic, payload, len); |
|
|
}); |
|
|
}); |
|
|
mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) { |
|
|
|
|
|
|
|
|
_mqtt.onSubscribe([](uint16_t packetId, uint8_t qos) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Subscribe ACK for PID %d\n"), packetId); |
|
|
}); |
|
|
}); |
|
|
mqtt.onPublish([](uint16_t packetId) { |
|
|
|
|
|
|
|
|
_mqtt.onPublish([](uint16_t packetId) { |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Publish ACK for PID %d\n"), packetId); |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
@ -531,7 +529,7 @@ void mqttSetup() { |
|
|
|
|
|
|
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using SYNC MQTT library\n")); |
|
|
DEBUG_MSG_P(PSTR("[MQTT] Using SYNC MQTT library\n")); |
|
|
|
|
|
|
|
|
mqtt.setCallback([](char* topic, byte* payload, unsigned int length) { |
|
|
|
|
|
|
|
|
_mqtt.setCallback([](char* topic, byte* payload, unsigned int length) { |
|
|
_mqttOnMessage(topic, (char *) payload, length); |
|
|
_mqttOnMessage(topic, (char *) payload, length); |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
@ -546,9 +544,9 @@ void mqttLoop() { |
|
|
|
|
|
|
|
|
#if MQTT_USE_ASYNC
|
|
|
#if MQTT_USE_ASYNC
|
|
|
|
|
|
|
|
|
if (!_mqttEnabled) return; |
|
|
|
|
|
|
|
|
if (!_mqtt_enabled) return; |
|
|
if (WiFi.status() != WL_CONNECTED) return; |
|
|
if (WiFi.status() != WL_CONNECTED) return; |
|
|
if (mqtt.connected) return; |
|
|
|
|
|
|
|
|
if (_mqtt.connected) return; |
|
|
|
|
|
|
|
|
static unsigned long last = 0; |
|
|
static unsigned long last = 0; |
|
|
if (millis() - last > MQTT_RECONNECT_DELAY) { |
|
|
if (millis() - last > MQTT_RECONNECT_DELAY) { |
|
@ -560,18 +558,18 @@ void mqttLoop() { |
|
|
|
|
|
|
|
|
if (WiFi.status() != WL_CONNECTED) return; |
|
|
if (WiFi.status() != WL_CONNECTED) return; |
|
|
|
|
|
|
|
|
if (mqtt.connected()) { |
|
|
|
|
|
|
|
|
if (_mqtt.connected()) { |
|
|
|
|
|
|
|
|
mqtt.loop(); |
|
|
|
|
|
|
|
|
_mqtt.loop(); |
|
|
|
|
|
|
|
|
} else { |
|
|
} else { |
|
|
|
|
|
|
|
|
if (_mqttConnected) { |
|
|
|
|
|
|
|
|
if (_mqtt_connected) { |
|
|
_mqttOnDisconnect(); |
|
|
_mqttOnDisconnect(); |
|
|
_mqttConnected = false; |
|
|
|
|
|
|
|
|
_mqtt_connected = false; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (_mqttEnabled) { |
|
|
|
|
|
|
|
|
if (_mqtt_enabled) { |
|
|
static unsigned long last = 0; |
|
|
static unsigned long last = 0; |
|
|
if (millis() - last > MQTT_RECONNECT_DELAY) { |
|
|
if (millis() - last > MQTT_RECONNECT_DELAY) { |
|
|
last = millis(); |
|
|
last = millis(); |
|
|