Browse Source

Replace MQTT_MAX_TRIES disabling feature with an increasing reconnection interval (issue #215)

fastled
Xose Pérez 7 years ago
parent
commit
3c848d13b1
2 changed files with 103 additions and 118 deletions
  1. +5
    -3
      code/espurna/config/general.h
  2. +98
    -115
      code/espurna/mqtt.ino

+ 5
- 3
code/espurna/config/general.h View File

@ -390,9 +390,11 @@ PROGMEM const char* const custom_reset_string[] = {
#define MQTT_RETAIN true // MQTT retain flag
#define MQTT_QOS 0 // MQTT QoS value for all messages
#define MQTT_KEEPALIVE 30 // MQTT keepalive value
#define MQTT_RECONNECT_DELAY 10000 // Try to reconnect after 10s
#define MQTT_TRY_INTERVAL 30000 // Timeframe for disconnect retries
#define MQTT_MAX_TRIES 5 // After these many retries during the previous MQTT_TRY_INTERVAL the board will reset
#define MQTT_RECONNECT_DELAY_MIN 5000 // Try to reconnect in 5 seconds upon disconnection
#define MQTT_RECONNECT_DELAY_STEP 5000 // Increase the reconnect delay in 5 seconds after each failed attempt
#define MQTT_RECONNECT_DELAY_MAX 120000 // Set reconnect time to 2 minutes at most
#define MQTT_SKIP_RETAINED 1 // Skip retained messages on connection
#define MQTT_SKIP_TIME 1000 // Skip messages for 1 second anter connection


+ 98
- 115
code/espurna/mqtt.ino View File

@ -31,7 +31,7 @@ WiFiClientSecure _mqtt_client_secure;
#endif // MQTT_USE_ASYNC
bool _mqtt_enabled = MQTT_ENABLED;
unsigned char _mqtt_connection_tries = 0;
unsigned long _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
String _mqtt_topic;
String _mqtt_setter;
String _mqtt_getter;
@ -216,6 +216,7 @@ void _mqttCallback(unsigned int type, const char * topic, const char * payload)
void _mqttOnConnect() {
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
#if MQTT_SKIP_RETAINED
_mqtt_connected_at = millis();
@ -316,137 +317,134 @@ bool mqttEnabled() {
void mqttConnect() {
if (_mqtt_enabled & !_mqtt.connected()) {
// Disable MQTT after MQTT_MAX_TRIES attemps in a row
#if MQTT_MAX_TRIES > 0
static unsigned long last_try = millis();
if (millis() - last_try < MQTT_TRY_INTERVAL) {
if (++_mqtt_connection_tries > MQTT_MAX_TRIES) {
DEBUG_MSG_P(PSTR("[MQTT] MQTT_MAX_TRIES met, disabling MQTT\n"));
mqttEnabled(false);
_mqtt_connection_tries = 0;
return;
}
} else {
_mqtt_connection_tries = 0;
}
last_try = millis();
#endif
// Do not connect if disabled
if (!_mqtt_enabled) return;
if (_mqtt_user) free(_mqtt_user);
if (_mqtt_pass) free(_mqtt_pass);
// Do not connect if already connected
if (_mqtt.connected()) return;
char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
if (strlen(host) == 0) return;
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
_mqtt_user = strdup(getSetting("mqttUser", MQTT_USER).c_str());
_mqtt_pass = strdup(getSetting("mqttPassword", MQTT_PASS).c_str());
if (_mqtt_will) free(_mqtt_will);
_mqtt_will = strdup((_mqtt_topic + MQTT_TOPIC_STATUS).c_str());
// Check reconnect interval
static unsigned long last = 0;
if (millis() - last < _mqtt_reconnect_delay) return;
last = millis();
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
// Increase the reconnect delay
_mqtt_reconnect_delay += MQTT_RECONNECT_DELAY_STEP;
if (_mqtt_reconnect_delay > MQTT_RECONNECT_DELAY_MAX) {
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX;
}
#if MQTT_USE_ASYNC
if (_mqtt_user) free(_mqtt_user);
if (_mqtt_pass) free(_mqtt_pass);
_mqtt.setServer(host, port);
_mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
_mqtt.setWill(_mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
_mqtt.setCredentials(_mqtt_user, _mqtt_pass);
}
char * host = strdup(getSetting("mqttServer", MQTT_SERVER).c_str());
if (strlen(host) == 0) return;
unsigned int port = getSetting("mqttPort", MQTT_PORT).toInt();
_mqtt_user = strdup(getSetting("mqttUser", MQTT_USER).c_str());
_mqtt_pass = strdup(getSetting("mqttPassword", MQTT_PASS).c_str());
if (_mqtt_will) free(_mqtt_will);
_mqtt_will = strdup((_mqtt_topic + MQTT_TOPIC_STATUS).c_str());
#if ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P(PSTR("[MQTT] Connecting to broker at %s:%d\n"), host, port);
Serial.println(millis() / 1000);
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
_mqtt.setSecure(secure);
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
unsigned char fp[20] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
_mqtt.addServerFingerprint(fp);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
}
#if MQTT_USE_ASYNC
_mqtt.setServer(host, port);
_mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
_mqtt.setWill(_mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
_mqtt.setCredentials(_mqtt_user, _mqtt_pass);
}
#if ASYNC_TCP_SSL_ENABLED
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
_mqtt.setSecure(secure);
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
unsigned char fp[20] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
_mqtt.addServerFingerprint(fp);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
}
}
#endif // ASYNC_TCP_SSL_ENABLED
#endif // ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);
_mqtt.connect();
_mqtt.connect();
#else // not MQTT_USE_ASYNC
#else // not MQTT_USE_ASYNC
bool response = true;
bool response = true;
#if ASYNC_TCP_SSL_ENABLED
#if ASYNC_TCP_SSL_ENABLED
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
if (_mqtt_client_secure.connect(host, port)) {
char fp[60] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
if (_mqtt_client_secure.verify(fp, host)) {
_mqtt.setClient(_mqtt_client_secure);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
response = false;
}
_mqtt_client_secure.stop();
yield();
bool secure = getSetting("mqttUseSSL", MQTT_SSL_ENABLED).toInt() == 1;
if (secure) {
DEBUG_MSG_P(PSTR("[MQTT] Using SSL\n"));
if (_mqtt_client_secure.connect(host, port)) {
char fp[60] = {0};
if (mqttFormatFP(getSetting("mqttFP", MQTT_SSL_FINGERPRINT).c_str(), fp)) {
if (_mqtt_client_secure.verify(fp, host)) {
_mqtt.setClient(_mqtt_client_secure);
} else {
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
DEBUG_MSG_P(PSTR("[MQTT] Invalid fingerprint\n"));
response = false;
}
_mqtt_client_secure.stop();
yield();
} else {
DEBUG_MSG_P(PSTR("[MQTT] Client connection failed\n"));
DEBUG_MSG_P(PSTR("[MQTT] Wrong fingerprint\n"));
response = false;
}
} else {
_mqtt.setClient(_mqtt_client);
DEBUG_MSG_P(PSTR("[MQTT] Client connection failed\n"));
response = false;
}
#else // not ASYNC_TCP_SSL_ENABLED
} else {
_mqtt.setClient(_mqtt_client);
}
#endif // ASYNC_TCP_SSL_ENABLED
if (response) {
#else // not ASYNC_TCP_SSL_ENABLED
_mqtt.setServer(host, port);
_mqtt.setClient(_mqtt_client);
if ((strlen(_mqtt_user) > 0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_user, _mqtt_pass, _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
} else {
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
}
#endif // ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);
if (response) {
}
_mqtt.setServer(host, port);
if (response) {
_mqttOnConnect();
_mqtt_connected = true;
if ((strlen(_mqtt_user) >> <span class="mi">0) && (strlen(_mqtt_pass) > 0)) {
DEBUG_MSG_P(PSTR("[MQTT] Connecting as user %s\n"), _mqtt_user);
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_user, _mqtt_pass, _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
} else {
DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
response = _mqtt.connect(getIdentifier().c_str(), _mqtt_will, MQTT_QOS, MQTT_RETAIN, "0");
}
#endif // MQTT_USE_ASYNC
DEBUG_MSG_P(PSTR("[MQTT] Will topic: %s\n"), _mqtt_will);
DEBUG_MSG_P(PSTR("[MQTT] QoS: %d\n"), MQTT_QOS);
DEBUG_MSG_P(PSTR("[MQTT] Retain flag: %d\n"), MQTT_RETAIN);
free(host);
}
}
if (response) {
_mqttOnConnect();
} else {
DEBUG_MSG_P(PSTR("[MQTT] Connection failed\n"));
}
#endif // MQTT_USE_ASYNC
free(host);
}
@ -463,7 +461,6 @@ void mqttConfigure() {
_mqtt_forward = !_mqtt_getter.equals(_mqtt_setter);
// Enable
_mqtt_connection_tries = 0;
if (getSetting("mqttServer", MQTT_SERVER).length() == 0) {
mqttEnabled(false);
} else {
@ -553,22 +550,14 @@ void mqttSetup() {
void mqttLoop() {
#if MQTT_USE_ASYNC
if (WiFi.status() != WL_CONNECTED) return;
if (!_mqtt_enabled) return;
if (WiFi.status() != WL_CONNECTED) return;
if (_mqtt.connected()) return;
#if MQTT_USE_ASYNC
static unsigned long last = 0;
if (millis() - last > MQTT_RECONNECT_DELAY) {
last = millis();
mqttConnect();
}
mqttConnect();
#else // not MQTT_USE_ASYNC
if (WiFi.status() != WL_CONNECTED) return;
if (_mqtt.connected()) {
_mqtt.loop();
@ -580,13 +569,7 @@ void mqttLoop() {
_mqtt_connected = false;
}
if (_mqtt_enabled) {
static unsigned long last = 0;
if (millis() - last > MQTT_RECONNECT_DELAY) {
last = millis();
mqttConnect();
}
}
mqttConnect();
}


Loading…
Cancel
Save