@ -13,6 +13,7 @@ Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
# include <ESP8266mDNS.h>
# include <ArduinoJson.h>
# include <vector>
# include <utility>
# include <Ticker.h>
# if MQTT_USE_ASYNC // Using AsyncMqttClient
@ -46,10 +47,12 @@ String _mqtt_topic_json;
String _mqtt_setter ;
String _mqtt_getter ;
bool _mqtt_forward ;
char * _mqtt_user = 0 ;
char * _mqtt_pass = 0 ;
char * _mqtt_will ;
char * _mqtt_clientid ;
String _mqtt_user ;
String _mqtt_pass ;
String _mqtt_will ;
String _mqtt_server ;
uint16_t _mqtt_port ;
String _mqtt_clientid ;
std : : vector < mqtt_callback_f > _mqtt_callbacks ;
@ -82,41 +85,23 @@ void _mqttConnect() {
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MAX ;
}
String h = getSetting ( " mqttServer " , MQTT_SERVER ) ;
# if MDNS_CLIENT_SUPPORT
h = mdnsResolve ( h ) ;
_mqtt_server = mdnsResolve ( _mqtt_server ) ;
# endif
char * host = strdup ( h . c_str ( ) ) ;
unsigned int port = getSetting ( " mqttPort " , MQTT_PORT ) . toInt ( ) ;
if ( _mqtt_user ) free ( _mqtt_user ) ;
if ( _mqtt_pass ) free ( _mqtt_pass ) ;
if ( _mqtt_will ) free ( _mqtt_will ) ;
if ( _mqtt_clientid ) free ( _mqtt_clientid ) ;
String user = getSetting ( " mqttUser " , MQTT_USER ) ;
_mqttPlaceholders ( & user ) ;
_mqtt_user = strdup ( user . c_str ( ) ) ;
_mqtt_pass = strdup ( getSetting ( " mqttPassword " , MQTT_PASS ) . c_str ( ) ) ;
_mqtt_will = strdup ( mqttTopic ( MQTT_TOPIC_STATUS , false ) . c_str ( ) ) ;
String clientid = getSetting ( " mqttClientID " , getIdentifier ( ) ) ;
_mqttPlaceholders ( & clientid ) ;
_mqtt_clientid = strdup ( clientid . c_str ( ) ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Connecting to broker at %s:%d \n " ) , host , port ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Connecting to broker at %s:%u \n " ) , _mqtt_server . c_str ( ) , _mqtt_port ) ;
# if MQTT_USE_ASYNC
_mqtt_connecting = true ;
_mqtt . setServer ( host , port ) ;
_mqtt . setClientId ( _mqtt_clientid ) ;
_mqtt . setServer ( _mqtt_server . c_str ( ) , _mqtt_port ) ;
_mqtt . setClientId ( _mqtt_clientid . c_str ( ) ) ;
_mqtt . setKeepAlive ( _mqtt_keepalive ) ;
_mqtt . setCleanSession ( false ) ;
_mqtt . setWill ( _mqtt_will , _mqtt_qos , _mqtt_retain , " 0 " ) ;
if ( ( strlen ( _mqtt_user ) > 0 ) & & ( strlen ( _mqtt_pass ) > 0 ) ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Connecting as user %s \n " ) , _mqtt_user ) ;
_mqtt . setCredentials ( _mqtt_user , _mqtt_pass ) ;
_mqtt . setWill ( _mqtt_will . c_str ( ) , _mqtt_qos , _mqtt_retain , " 0 " ) ;
if ( _mqtt_user . length ( ) & & _mqtt_pass . length ( ) ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Connecting as user %s \n " ) , _mqtt_user . c_str ( ) ) ;
_mqtt . setCredentials ( _mqtt_user . c_str ( ) , _mqtt_pass . c_str ( ) ) ;
}
# if ASYNC_TCP_SSL_ENABLED
@ -135,11 +120,11 @@ void _mqttConnect() {
# endif // ASYNC_TCP_SSL_ENABLED
DEBUG_MSG_P ( PSTR ( " [MQTT] Client ID: %s \n " ) , _mqtt_clientid ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Client ID: %s \n " ) , _mqtt_clientid . c_str ( ) ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] QoS: %d \n " ) , _mqtt_qos ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Retain flag: %d \n " ) , _mqtt_retain ? 1 : 0 ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Keepalive time: %ds \n " ) , _mqtt_keepalive ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Will topic: %s \n " ) , _mqtt_will ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Will topic: %s \n " ) , _mqtt_will . c_str ( ) ) ;
_mqtt . connect ( ) ;
@ -152,10 +137,10 @@ void _mqttConnect() {
bool secure = getSetting ( " mqttUseSSL " , MQTT_SSL_ENABLED ) . toInt ( ) = = 1 ;
if ( secure ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Using SSL \n " ) ) ;
if ( _mqtt_client_secure . connect ( host , port ) ) {
if ( _mqtt_client_secure . connect ( _mqtt_server . c_str ( ) , _mqtt_ port) ) {
char fp [ 60 ] = { 0 } ;
if ( sslFingerPrintChar ( getSetting ( " mqttFP " , MQTT_SSL_FINGERPRINT ) . c_str ( ) , fp ) ) {
if ( _mqtt_client_secure . verify ( fp , host ) ) {
if ( _mqtt_client_secure . verify ( fp , _mqtt_server . c_str ( ) ) ) {
_mqtt . setClient ( _mqtt_client_secure ) ;
} else {
DEBUG_MSG_P ( PSTR ( " [MQTT] Invalid fingerprint \n " ) ) ;
@ -184,20 +169,20 @@ void _mqttConnect() {
if ( response ) {
_mqtt . setServer ( host , port ) ;
_mqtt . setServer ( _mqtt_server . c_str ( ) , _mqtt_ port) ;
if ( ( strlen ( _mqtt_user ) > 0 ) & & ( strlen ( _mqtt_pass ) > 0 ) ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Connecting as user %s \n " ) , _mqtt_user ) ;
response = _mqtt . connect ( _mqtt_clientid , _mqtt_user , _mqtt_pass , _mqtt_will , _mqtt_qos , _mqtt_retain , " 0 " ) ;
if ( _mqtt_user . length ( ) & & _mqtt_pass . length ( ) ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Connecting as user %s \n " ) , _mqtt_user . c_str ( ) ) ;
response = _mqtt . connect ( _mqtt_clientid . c_str ( ) , _mqtt_user . c_str ( ) , _mqtt_pass . c_str ( ) , _mqtt_will . c_str ( ) , _mqtt_qos , _mqtt_retain , " 0 " ) ;
} else {
response = _mqtt . connect ( _mqtt_clientid , _mqtt_will , _mqtt_qos , _mqtt_retain , " 0 " ) ;
response = _mqtt . connect ( _mqtt_clientid . c_str ( ) , _mqtt_will . c_str ( ) , _mqtt_qos , _mqtt_retain , " 0 " ) ;
}
DEBUG_MSG_P ( PSTR ( " [MQTT] Client ID: %s \n " ) , _mqtt_clientid ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Client ID: %s \n " ) , _mqtt_clientid . c_str ( ) ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] QoS: %d \n " ) , _mqtt_qos ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Retain flag: %d \n " ) , _mqtt_retain ? 1 : 0 ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Keepalive time: %ds \n " ) , _mqtt_keepalive ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Will topic: %s \n " ) , _mqtt_will ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Will topic: %s \n " ) , _mqtt_will . c_str ( ) ) ;
}
@ -210,50 +195,114 @@ void _mqttConnect() {
# endif // MQTT_USE_ASYNC
free ( host ) ;
}
void _mqttPlaceholders ( String * text ) {
text - > replace ( " {hostname} " , getSetting ( " hostname " ) ) ;
text - > replace ( " {magnitude} " , " # " ) ;
void _mqttPlaceholders ( String & text ) {
text . replace ( " {hostname} " , getSetting ( " hostname " ) ) ;
text . replace ( " {magnitude} " , " # " ) ;
String mac = WiFi . macAddress ( ) ;
mac . replace ( " : " , " " ) ;
text - > replace ( " {mac} " , mac ) ;
text . replace ( " {mac} " , mac ) ;
}
template < typename T >
void _mqttApplySetting ( T & current , T & updated ) {
if ( current ! = updated ) {
current = std : : move ( updated ) ;
mqttDisconnect ( ) ;
}
}
template < typename T >
void _mqttApplySetting ( T & current , const T & updated ) {
if ( current ! = updated ) {
current = updated ;
mqttDisconnect ( ) ;
}
}
template < typename T >
void _mqttApplyTopic ( T & current , const char * magnitude ) {
String updated = mqttTopic ( magnitude , false ) ;
if ( current ! = updated ) {
mqttFlush ( ) ;
current = std : : move ( updated ) ;
}
}
void _mqttConfigure ( ) {
// Get base topic
_mqtt_topic = getSetting ( " mqttTopic " , MQTT_TOPIC ) ;
if ( _mqtt_topic . endsWith ( " / " ) ) _mqtt_topic . remove ( _mqtt_topic . length ( ) - 1 ) ;
// Enable only when server is set
{
String server = getSetting ( " mqttServer " , MQTT_SERVER ) ;
uint16_t port = getSetting ( " mqttPort " , MQTT_PORT ) . toInt ( ) ;
bool enabled = false ;
if ( server . length ( ) ) {
enabled = getSetting ( " mqttEnabled " , MQTT_ENABLED ) . toInt ( ) = = 1 ;
}
_mqttApplySetting ( _mqtt_server , server ) ;
_mqttApplySetting ( _mqtt_enabled , enabled ) ;
_mqttApplySetting ( _mqtt_port , port ) ;
if ( ! enabled ) return ;
}
// Get base topic and apply placeholders
{
String topic = getSetting ( " mqttTopic " , MQTT_TOPIC ) ;
if ( topic . endsWith ( " / " ) ) topic . remove ( topic . length ( ) - 1 ) ;
// Placeholders
_mqttPlaceholders ( & _mqtt_topic ) ;
if ( _mqtt_topic . indexOf ( " # " ) = = - 1 ) _mqtt_topic = _mqtt_topic + " /# " ;
// Replace things inside curly braces (like {hostname}, {mac} etc.)
_mqttPlaceholders ( topic ) ;
// Getters and setters
_mqtt_setter = getSetting ( " mqttSetter " , MQTT_SETTER ) ;
_mqtt_getter = getSetting ( " mqttGetter " , MQTT_GETTER ) ;
_mqtt_forward = ! _mqtt_getter . equals ( _mqtt_setter ) & & RELAY_REPORT_STATUS ;
if ( topic . indexOf ( " # " ) = = - 1 ) topic . concat ( " /# " ) ;
_mqttApplySetting ( _mqtt_topic , topic ) ;
_mqttApplyTopic ( _mqtt_will , MQTT_TOPIC_STATUS ) ;
}
// Getter and setter
{
String setter = getSetting ( " mqttSetter " , MQTT_SETTER ) ;
String getter = getSetting ( " mqttGetter " , MQTT_GETTER ) ;
bool forward = ! setter . equals ( getter ) & & RELAY_REPORT_STATUS ;
_mqttApplySetting ( _mqtt_setter , setter ) ;
_mqttApplySetting ( _mqtt_getter , getter ) ;
_mqttApplySetting ( _mqtt_forward , forward ) ;
}
// MQTT options
_mqtt_qos = getSetting ( " mqttQoS " , MQTT_QOS ) . toInt ( ) ;
_mqtt_retain = getSetting ( " mqttRetain " , MQTT_RETAIN ) . toInt ( ) = = 1 ;
_mqtt_keepalive = getSetting ( " mqttKeep " , MQTT_KEEPALIVE ) . toInt ( ) ;
if ( getSetting ( " mqttClientID " ) . length ( ) = = 0 ) delSetting ( " mqttClientID " ) ;
// Enable
if ( getSetting ( " mqttServer " , MQTT_SERVER ) . length ( ) = = 0 ) {
mqttEnabled ( false ) ;
} else {
_mqtt_enabled = getSetting ( " mqttEnabled " , MQTT_ENABLED ) . toInt ( ) = = 1 ;
{
String user = getSetting ( " mqttUser " , MQTT_USER ) ;
_mqttPlaceholders ( user ) ;
String pass = getSetting ( " mqttPassword " , MQTT_PASS ) ;
unsigned char qos = getSetting ( " mqttQoS " , MQTT_QOS ) . toInt ( ) ;
bool retain = getSetting ( " mqttRetain " , MQTT_RETAIN ) . toInt ( ) = = 1 ;
unsigned long keepalive = getSetting ( " mqttKeep " , MQTT_KEEPALIVE ) . toInt ( ) ;
String id = getSetting ( " mqttClientID " , getIdentifier ( ) ) ;
_mqttPlaceholders ( id ) ;
_mqttApplySetting ( _mqtt_user , user ) ;
_mqttApplySetting ( _mqtt_pass , pass ) ;
_mqttApplySetting ( _mqtt_qos , qos ) ;
_mqttApplySetting ( _mqtt_retain , retain ) ;
_mqttApplySetting ( _mqtt_keepalive , keepalive ) ;
_mqttApplySetting ( _mqtt_clientid , id ) ;
}
// MQTT JSON
{
_mqttApplySetting ( _mqtt_use_json , getSetting ( " mqttUseJson " , MQTT_USE_JSON ) . toInt ( ) = = 1 ) ;
_mqttApplyTopic ( _mqtt_topic_json , MQTT_TOPIC_JSON ) ;
}
_mqtt_use_json = ( getSetting ( " mqttUseJson " , MQTT_USE_JSON ) . toInt ( ) = = 1 ) ;
mqttQueueTopic ( MQTT_TOPIC_JSON ) ;
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN ;
@ -491,9 +540,6 @@ void mqttSend(const char * topic, const char * message, bool force, bool retain)
// Equeue message
if ( useJson ) {
// Set default queue topic
mqttQueueTopic ( MQTT_TOPIC_JSON ) ;
// Enqueue new message
mqttEnqueue ( topic , message ) ;
@ -608,14 +654,6 @@ void mqttFlush() {
}
void mqttQueueTopic ( const char * topic ) {
String t = mqttTopic ( topic , false ) ;
if ( ! t . equals ( _mqtt_topic_json ) ) {
mqttFlush ( ) ;
_mqtt_topic_json = t ;
}
}
int8_t mqttEnqueue ( const char * topic , const char * message , unsigned char parent ) {
// Queue is not meant to send message "offline"
@ -709,7 +747,11 @@ void mqttRegister(mqtt_callback_f callback) {
void mqttSetBroker ( IPAddress ip , unsigned int port ) {
setSetting ( " mqttServer " , ip . toString ( ) ) ;
_mqtt_server = ip . toString ( ) ;
setSetting ( " mqttPort " , port ) ;
_mqtt_port = port ;
mqttEnabled ( MQTT_AUTOCONNECT ) ;
}
@ -717,11 +759,6 @@ void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
if ( getSetting ( " mqttServer " , MQTT_SERVER ) . length ( ) = = 0 ) mqttSetBroker ( ip , port ) ;
}
void mqttReset ( ) {
_mqttConfigure ( ) ;
mqttDisconnect ( ) ;
}
// -----------------------------------------------------------------------------
// Initialization
// -----------------------------------------------------------------------------