@ -72,12 +72,12 @@ namespace {
# if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
struct MqttPidCallback {
struct MqttPidCallbackHandler {
uint16_t pid ;
mqtt_pid_callback_f run ;
MqttPidCallback callback ;
} ;
using MqttPidCallbacks = std : : forward_list < MqttPidCallback > ;
using MqttPidCallbacks = std : : forward_list < MqttPidCallbackHandler > ;
MqttPidCallbacks _mqtt_publish_callbacks ;
MqttPidCallbacks _mqtt_subscribe_callbacks ;
@ -91,7 +91,7 @@ espurna::duration::Seconds _mqtt_heartbeat_interval;
String _mqtt_payload_online ;
String _mqtt_payload_offline ;
std : : forward_list < mqtt_callback_f > _mqtt_callbacks ;
std : : forward_list < MqttCallback > _mqtt_callbacks ;
} // namespace
@ -124,17 +124,15 @@ namespace mqtt {
namespace build {
namespace {
constexpr espurna : : duration : : Milliseconds SkipTime { MQTT_SKIP_TIME } ;
static constexpr espurna : : duration : : Milliseconds SkipTime { MQTT_SKIP_TIME } ;
constexpr espurna : : duration : : Milliseconds ReconnectDelayMin { MQTT_RECONNECT_DELAY_MIN } ;
constexpr espurna : : duration : : Milliseconds ReconnectDelayMax { MQTT_RECONNECT_DELAY_MAX } ;
constexpr espurna : : duration : : Milliseconds ReconnectStep { MQTT_RECONNECT_DELAY_STEP } ;
static constexpr espurna : : duration : : Milliseconds ReconnectDelayMin { MQTT_RECONNECT_DELAY_MIN } ;
static constexpr espurna : : duration : : Milliseconds ReconnectDelayMax { MQTT_RECONNECT_DELAY_MAX } ;
static constexpr espurna : : duration : : Milliseconds ReconnectStep { MQTT_RECONNECT_DELAY_STEP } ;
constexpr size_t MessageLogMax { 128ul } ;
static constexpr size_t MessageLogMax { 128ul } ;
const __FlashStringHelper * server ( ) {
return F ( MQTT_SERVER ) ;
}
alignas ( 4 ) static constexpr char Server [ ] PROGMEM = MQTT_SERVER ;
constexpr uint16_t port ( ) {
return MQTT_PORT ;
@ -148,25 +146,12 @@ constexpr bool autoconnect() {
return 1 = = MQTT_AUTOCONNECT ;
}
const __FlashStringHelper * topic ( ) {
return F ( MQTT_TOPIC ) ;
}
const __FlashStringHelper * getter ( ) {
return F ( MQTT_GETTER ) ;
}
const __FlashStringHelper * setter ( ) {
return F ( MQTT_SETTER ) ;
}
alignas ( 4 ) static constexpr char Topic [ ] PROGMEM = MQTT_TOPIC ;
alignas ( 4 ) static constexpr char Getter [ ] PROGMEM = MQTT_GETTER ;
alignas ( 4 ) static constexpr char Setter [ ] PROGMEM = MQTT_SETTER ;
const __FlashStringHelper * user ( ) {
return F ( MQTT_USER ) ;
}
const __FlashStringHelper * password ( ) {
return F ( MQTT_PASS ) ;
}
alignas ( 4 ) static constexpr char User [ ] PROGMEM = MQTT_USER ;
alignas ( 4 ) static constexpr char Password [ ] PROGMEM = MQTT_PASS ;
constexpr int qos ( ) {
return MQTT_QOS ;
@ -176,8 +161,8 @@ constexpr bool retain() {
return 1 = = MQTT_RETAIN ;
}
constexpr KeepAlive KeepaliveMin { 15 } ;
constexpr KeepAlive KeepaliveMax { KeepAlive : : max ( ) } ;
static constexpr KeepAlive KeepaliveMin { 15 } ;
static constexpr KeepAlive KeepaliveMax { KeepAlive : : max ( ) } ;
constexpr KeepAlive keepalive ( ) {
return KeepAlive { MQTT_KEEPALIVE } ;
@ -186,29 +171,21 @@ constexpr KeepAlive keepalive() {
static_assert ( keepalive ( ) > = KeepaliveMin , " " ) ;
static_assert ( keepalive ( ) < = KeepaliveMax , " " ) ;
const __FlashStringHelper * topicWill ( ) {
return F ( MQTT_TOPIC_STATUS ) ;
}
alignas ( 4 ) static constexpr char TopicWill [ ] PROGMEM = MQTT_TOPIC_STATUS ;
constexpr bool json ( ) {
return 1 = = MQTT_USE_JSON ;
}
const __FlashStringHelper * topicJson ( ) {
return F ( MQTT_TOPIC_JSON ) ;
}
static constexpr auto JsonDelay = espurna : : duration : : Milliseconds ( MQTT_USE_JSON_DELAY ) ;
alignas ( 4 ) static constexpr char TopicJson [ ] PROGMEM = MQTT_TOPIC_JSON ;
constexpr espurna : : duration : : Milliseconds skipTime ( ) {
return espurna : : duration : : Milliseconds ( MQTT_SKIP_TIME ) ;
}
const __FlashStringHelper * payloadOnline ( ) {
return F ( MQTT_STATUS_ONLINE ) ;
}
const __FlashStringHelper * payloadOffline ( ) {
return F ( MQTT_STATUS_OFFLINE ) ;
}
alignas ( 4 ) static constexpr char PayloadOnline [ ] PROGMEM = MQTT_STATUS_ONLINE ;
alignas ( 4 ) static constexpr char PayloadOffline [ ] PROGMEM = MQTT_STATUS_OFFLINE ;
constexpr bool secure ( ) {
return 1 = = MQTT_SSL_ENABLED ;
@ -218,9 +195,7 @@ int secureClientCheck() {
return MQTT_SECURE_CLIENT_CHECK ;
}
const __FlashStringHelper * fingerprint ( ) {
return F ( MQTT_SSL_FINGERPRINT ) ;
}
alignas ( 4 ) static constexpr char Fingerprint [ ] PROGMEM = MQTT_SSL_FINGERPRINT ;
constexpr uint16_t mfln ( ) {
return MQTT_SECURE_CLIENT_MFLN ;
@ -272,7 +247,7 @@ alignas(4) static constexpr char SecureClientMfln[] PROGMEM = "mqttScMFLN";
namespace {
String server ( ) {
return getSetting ( keys : : Server , build : : server ( ) ) ;
return getSetting ( keys : : Server , espurna : : StringView ( build : : Server ) ) ;
}
uint16_t port ( ) {
@ -288,23 +263,23 @@ bool autoconnect() {
}
String topic ( ) {
return getSetting ( keys : : Topic , build : : topic ( ) ) ;
return getSetting ( keys : : Topic , espurna : : StringView ( build : : Topic ) ) ;
}
String getter ( ) {
return getSetting ( keys : : Getter , build : : getter ( ) ) ;
return getSetting ( keys : : Getter , espurna : : StringView ( build : : Getter ) ) ;
}
String setter ( ) {
return getSetting ( keys : : Setter , build : : setter ( ) ) ;
return getSetting ( keys : : Setter , espurna : : StringView ( build : : Setter ) ) ;
}
String user ( ) {
return getSetting ( keys : : User , build : : user ( ) ) ;
return getSetting ( keys : : User , espurna : : StringView ( build : : User ) ) ;
}
String password ( ) {
return getSetting ( keys : : Password , build : : password ( ) ) ;
return getSetting ( keys : : Password , espurna : : StringView ( build : : Password ) ) ;
}
int qos ( ) {
@ -326,7 +301,7 @@ String clientId() {
}
String topicWill ( ) {
return getSetting ( keys : : TopicWill , build : : topicWill ( ) ) ;
return getSetting ( keys : : TopicWill , espurna : : StringView ( build : : TopicWill ) ) ;
}
bool json ( ) {
@ -334,7 +309,7 @@ bool json() {
}
String topicJson ( ) {
return getSetting ( keys : : TopicJson , build : : topicJson ( ) ) ;
return getSetting ( keys : : TopicJson , espurna : : StringView ( build : : TopicJson ) ) ;
}
espurna : : heartbeat : : Mode heartbeatMode ( ) {
@ -350,11 +325,11 @@ espurna::duration::Milliseconds skipTime() {
}
String payloadOnline ( ) {
return getSetting ( keys : : PayloadOnline , build : : payloadOnline ( ) ) ;
return getSetting ( keys : : PayloadOnline , espurna : : StringView ( build : : PayloadOnline ) ) ;
}
String payloadOffline ( ) {
return getSetting ( keys : : PayloadOffline , build : : payloadOffline ( ) ) ;
return getSetting ( keys : : PayloadOffline , espurna : : StringView ( build : : PayloadOffline ) ) ;
}
[[gnu::unused]]
@ -369,7 +344,7 @@ int secureClientCheck() {
[[gnu::unused]]
String fingerprint ( ) {
return getSetting ( keys : : Fingerprint , build : : fingerprint ( ) ) ;
return getSetting ( keys : : Fingerprint , espurna : : StringView ( build : : Fingerprint ) ) ;
}
[[gnu::unused]]
@ -487,13 +462,32 @@ static void _mqttApplySetting(Lhs& lhs, Rhs&& rhs) {
}
}
template < typename Rhs >
static void _mqttApplyTopic ( String & lhs , Rhs & & rhs ) {
auto topic = mqttTopic ( rhs , false ) ;
if ( lhs ! = topic ) {
mqttFlush ( ) ;
lhs = std : : move ( topic ) ;
// Can't have **any** MQTT placeholders but our own `{magnitude}`
bool _mqttValidTopicString ( espurna : : StringView value ) {
size_t hash = 0 ;
size_t plus = 0 ;
for ( auto it = value . begin ( ) ; it ! = value . end ( ) ; + + it ) {
switch ( * it ) {
case ' # ' :
+ + hash ;
break ;
case ' + ' :
+ + plus ;
break ;
}
}
return ( hash < = 1 ) & & ( plus = = 0 ) ;
}
bool _mqttApplyValidTopicString ( String & lhs , String & & rhs ) {
if ( _mqttValidTopicString ( rhs ) ) {
_mqttApplySetting ( lhs , std : : move ( rhs ) ) ;
return true ;
}
mqttDisconnect ( ) ;
return false ;
}
} // namespace
@ -715,6 +709,8 @@ void _mqttMdnsStop();
void _mqttConfigure ( ) {
_mqtt_enabled = false ;
// Make sure we have both the server to connect to things are enabled
{
_mqttApplySetting ( _mqtt_settings . server , mqtt : : settings : : server ( ) ) ;
@ -734,7 +730,6 @@ void _mqttConfigure() {
_mqttMdnsSchedule ( ) ;
}
# endif
_mqtt_enabled = false ;
return ;
}
}
@ -743,22 +738,34 @@ void _mqttConfigure() {
{
// Replace things inside curly braces (like {hostname}, {mac} etc.)
auto topic = _mqttPlaceholders ( mqtt : : settings : : topic ( ) ) ;
if ( ! _mqttValidTopicString ( topic ) ) {
mqttDisconnect ( ) ;
return ;
}
// Topic **must** end with some kind of word
if ( topic . endsWith ( " / " ) ) {
topic . remove ( topic . length ( ) - 1 ) ;
}
// For simple topics, sssume right-hand side contains magnitude
if ( topic . indexOf ( " # " ) = = - 1 ) {
topic . concat ( " /# " ) ;
}
_mqttApplySetting ( _mqtt_settings . topic , topic ) ;
_mqttApplySetting ( _mqtt_settings . topic , std : : move ( topic ) ) ;
}
// Getter and setter
_mqttApplySett ing ( _mqtt_settings . getter , mqtt : : settings : : getter ( ) ) ;
_mqttApplySett ing ( _mqtt_settings . setter , mqtt : : settings : : setter ( ) ) ;
_mqttApplyValidTopicStr ing ( _mqtt_settings . getter , mqtt : : settings : : getter ( ) ) ;
_mqttApplyValidTopicStr ing ( _mqtt_settings . setter , mqtt : : settings : : setter ( ) ) ;
_mqttApplySetting ( _mqtt_forward ,
! _mqtt_settings . setter . equals ( _mqtt_settings . getter ) ) ;
! _mqtt_settings . setter . equals ( _mqtt_settings . getter ) ) ;
// Last will aka status topic
// (note that *must* be after topic updates)
_mqttApplyValidTopicString ( _mqtt_settings . will ,
mqttTopic ( mqtt : : settings : : topicWill ( ) ) ) ;
// MQTT options
_mqttApplySetting ( _mqtt_settings . user , _mqttPlaceholders ( mqtt : : settings : : user ( ) ) ) ;
@ -770,12 +777,10 @@ void _mqttConfigure() {
_mqttApplySetting ( _mqtt_settings . retain , mqtt : : settings : : retain ( ) ) ;
_mqttApplySetting ( _mqtt_settings . keepalive , mqtt : : settings : : keepalive ( ) ) ;
_mqttApplyTopic ( _mqtt_settings . will , mqtt : : settings : : topicWill ( ) ) ;
// MQTT JSON
_mqttApplySetting ( _mqtt_use_json , mqtt : : settings : : json ( ) ) ;
if ( _mqtt_use_json ) {
_mqttApplyTopic ( _mqtt_settings . topic_json , mqtt : : settings : : topicJson ( ) ) ;
_mqttApplyValid TopicString ( _mqtt_settings . topic_json , mqtt : : settings : : topicJson ( ) ) ;
}
// Heartbeat messages
@ -1006,13 +1011,13 @@ void _mqttCommandsSetup() {
namespace {
void _mqttCallback ( unsigned int type , const char * topic , char * payload ) {
void _mqttCallback ( unsigned int type , espurna : : StringView topic , espurna : : StringView payload ) {
if ( type = = MQTT_CONNECT_EVENT ) {
mqttSubscribe ( MQTT_TOPIC_ACTION ) ;
}
if ( type = = MQTT_MESSAGE_EVENT ) {
String t = mqttMagnitude ( topic ) ;
auto t = mqttMagnitude ( topic ) ;
if ( t . equals ( MQTT_TOPIC_ACTION ) ) {
rpcHandleAction ( payload ) ;
}
@ -1117,8 +1122,10 @@ void _mqttOnConnect() {
systemHeartbeat ( _mqttHeartbeat , _mqtt_heartbeat_mode , _mqtt_heartbeat_interval ) ;
// Notify all subscribers about the connection
for ( auto & callback : _mqtt_callbacks ) {
callback ( MQTT_CONNECT_EVENT , nullptr , nullptr ) ;
for ( const auto callback : _mqtt_callbacks ) {
callback ( MQTT_CONNECT_EVENT ,
espurna : : StringView ( ) ,
espurna : : StringView ( ) ) ;
}
DEBUG_MSG_P ( PSTR ( " [MQTT] Connected! \n " ) ) ;
@ -1136,8 +1143,10 @@ void _mqttOnDisconnect() {
systemStopHeartbeat ( _mqttHeartbeat ) ;
// Notify all subscribers about the disconnect
for ( auto & callback : _mqtt_callbacks ) {
callback ( MQTT_DISCONNECT_EVENT , nullptr , nullptr ) ;
for ( const auto callback : _mqtt_callbacks ) {
callback ( MQTT_DISCONNECT_EVENT ,
espurna : : StringView ( ) ,
espurna : : StringView ( ) ) ;
}
DEBUG_MSG_P ( PSTR ( " [MQTT] Disconnected! \n " ) ) ;
@ -1158,7 +1167,7 @@ void _mqttPidCallback(MqttPidCallbacks& callbacks, uint16_t pid) {
while ( it ! = end ) {
if ( ( * it ) . pid = = pid ) {
( * it ) . run ( ) ;
( * it ) . callback ( ) ;
it = callbacks . erase_after ( prev ) ;
} else {
prev = it ;
@ -1191,29 +1200,38 @@ bool _mqttMaybeSkipRetained(char* topic) {
// TODO: Current callback model does not allow to pass message length. Instead, implement a topic filter and record all subscriptions. That way we don't need to filter out events and could implement per-event callbacks.
void _mqttOnMessageAsync ( char * topic , char * payload , AsyncMqttClientMessageProperties , size_t len , size_t index , size_t total ) {
if ( ! len | | ( len > MQTT_BUFFER_MAX_SIZE ) | | ( total > MQTT_BUFFER_MAX_SIZE ) ) return ;
if ( _mqttMaybeSkipRetained ( topic ) ) return ;
static constexpr size_t BufferSize { MQTT_BUFFER_MAX_SIZE } ;
static_assert ( BufferSize > 0 , " " ) ;
static char message [ ( ( MQTT_BUFFER_MAX_SIZE + 1 ) + 31 ) & - 32 ] = { 0 } ;
memmove ( message + index , ( char * ) payload , len ) ;
if ( ! len | | ( len > BufferSize ) | | ( total > BufferSize ) ) {
return ;
}
if ( _mqttMaybeSkipRetained ( topic ) ) {
return ;
}
alignas ( 4 ) static char buffer [ ( ( BufferSize + 3 ) & ~ 3 ) + 4 ] = { 0 } ;
std : : copy ( payload , payload + len , buffer ) ;
// Not done yet
if ( total ! = ( len + index ) ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Buffered %s => %u / %u bytes \n " ) , topic , len , total ) ;
return ;
}
message [ len + index ] = ' \0 ' ;
buffer [ len + index ] = ' \0 ' ;
if ( len < mqtt : : build : : MessageLogMax ) {
DEBUG_MSG_P ( PSTR ( " [MQTT] Received %s => %s \n " ) , topic , message ) ;
DEBUG_MSG_P ( PSTR ( " [MQTT] Received %s => %s \n " ) , topic , buffer ) ;
} else {
DEBUG_MSG_P ( PSTR ( " [MQTT] Received %s => (%u bytes) \n " ) , topic , len ) ;
}
// Call subscribers with the message buffer
for ( auto & callback : _mqtt_callbacks ) {
callback ( MQTT_MESSAGE_EVENT , topic , message ) ;
auto topic_view = espurna : : StringView { topic } ;
auto message_view = espurna : : StringView { & buffer [ 0 ] , & buffer [ total ] } ;
for ( const auto callback : _mqtt_callbacks ) {
callback ( MQTT_MESSAGE_EVENT , topic_view , message_view ) ;
}
}
# else
@ -1247,89 +1265,91 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
// Public API
// -----------------------------------------------------------------------------
/**
Returns the magnitude part of a topic
@ param topic the full MQTT topic
@ return String object with the magnitude part .
*/
String mqttMagnitude ( const char * topic ) {
String output ;
String pattern = _mqtt_settings . topic + _mqtt_settings . setter ;
int position = pattern . indexOf ( " # " ) ;
if ( position > = 0 ) {
String start = pattern . substring ( 0 , position ) ;
String end = pattern . substring ( position + 1 ) ;
// Return {magnitude} (aka #) part of the topic string
// e.g.
// * <TOPIC>/#/set - generic topic placement
// ^
// * <LHS>/#/<RHS>/set - when {magnitude} is used
// ^
// * #/<RHS>/set - when magnitude is at the start
// ^
// * #/set - when *only* {magnitude} is used (or, empty topic string)
// ^
// Depends on the topic and setter settings values.
// Note that function is ignoring the fact that these strings may not contain the
// root topic b/c MQTT handles that instead of us (and it's good idea to trust it).
espurna : : StringView mqttMagnitude ( espurna : : StringView topic ) {
using espurna : : StringView ;
StringView out ;
const auto pattern = _mqtt_settings . topic + _mqtt_settings . setter ;
auto it = std : : find ( pattern . begin ( ) , pattern . end ( ) , ' # ' ) ;
if ( it = = pattern . end ( ) ) {
return out ;
}
String magnitude ( topic ) ;
if ( magnitude . startsWith ( start ) & & magnitude . endsWith ( end ) ) {
output = std : : move ( magnitude ) ;
output . replace ( start , " " ) ;
output . replace ( end , " " ) ;
}
const auto start = StringView ( pattern . begin ( ) , it ) ;
if ( start . length ( ) ) {
topic = StringView ( topic . begin ( ) + start . length ( ) , topic . end ( ) ) ;
}
return output ;
}
const auto end = StringView ( it + 1 , pattern . end ( ) ) ;
if ( end . length ( ) ) {
topic = StringView ( topic . begin ( ) , topic . end ( ) - end . length ( ) ) ;
}
// Retrieve lefthand side of the extracted magnitude value
espurna : : StringView mqttMagnitudeTail ( espurna : : StringView magnitude , espurna : : StringView topic ) {
return espurna : : StringView ( magnitude . begin ( ) + topic . length ( ) , magnitude . end ( ) ) ;
out = StringView ( topic . begin ( ) , topic . end ( ) ) ;
return out ;
}
/**
Returns a full MQTT topic from the magnitude
@ param magnitude the magnitude part of the topic .
@ param is_set whether to build a command topic ( true )
or a state topic ( false ) .
@ return String full MQTT topic .
*/
String mqttTopic ( const String & magnitude , bool is_set ) {
String output ;
output . reserve ( magnitude . length ( )
// Creates a proper MQTT topic for on the given 'magnitude'
static String _mqttTopicWith ( String magnitude ) {
String out ;
out . reserve ( magnitude . length ( )
+ _mqtt_settings . topic . length ( )
+ _mqtt_settings . setter . length ( )
+ _mqtt_settings . getter . length ( ) ) ;
output + = _mqtt_settings . topic ;
output . replace ( " # " , magnitude ) ;
output + = is_set ? _mqtt_settings . setter : _mqtt_settings . getter ;
out + = _mqtt_settings . topic ;
out . replace ( " # " , magnitude ) ;
return output ;
return out ;
}
String mqttTopic ( const char * magnitude , bool is_set ) {
return mqttTopic ( String ( magnitude ) , is_set ) ;
// When magnitude is a status topic aka getter
static String _mqttTopicGetter ( String magnitude ) {
return _mqttTopicWith ( magnitude ) + _mqtt_settings . getter ;
}
/**
Returns a full MQTT topic from the magnitude
// When magnitude is an input topic aka setter
String _mqttTopicSetter ( String magnitude ) {
return _mqttTopicWith ( magnitude ) + _mqtt_settings . setter ;
}
@ param magnitude the magnitude part of the topic .
@ param index index of the magnitude when more than one such magnitudes .
@ param is_set whether to build a command topic ( true )
or a state topic ( false ) .
@ return String full MQTT topic .
*/
String mqttTopic ( const String & magnitude , unsigned int index , bool is_set ) {
String output ;
output . reserve ( magnitude . length ( ) + ( sizeof ( decltype ( index ) ) * 4 ) ) ;
output + = magnitude ;
output + = ' / ' ;
output + = index ;
return mqttTopic ( output , is_set ) ;
// When magnitude is indexed, append its index to the topic
static String _mqttTopicIndexed ( String topic , size_t index ) {
return topic + ' / ' + String ( index , 10 ) ;
}
String mqttTopic ( const char * magnitude , unsigned int index , bool is_set ) {
return mqttTopic ( String ( magnitude ) , index , is_set ) ;
String mqttTopic ( const String & magnitude ) {
return _mqttTopicGetter ( magnitude ) ;
}
String mqttTopic ( const String & magnitude , size_t index ) {
return _mqttTopicGetter ( _mqttTopicIndexed ( magnitude , index ) ) ;
}
String mqttTopicSetter ( const String & magnitude ) {
return _mqttTopicSetter ( magnitude ) ;
}
String mqttTopicSetter ( const String & magnitude , size_t index ) {
return _mqttTopicSetter ( _mqttTopicIndexed ( magnitude , index ) ) ;
}
// -----------------------------------------------------------------------------
uint16_t mqttSendRaw ( const char * topic , const char * message , bool retain , int qos ) {
uint16_t mqttSendRaw ( const char * topic , const char * message , bool retain , int qos ) {
if ( _mqtt . connected ( ) ) {
const unsigned int packetId {
# if MQTT_LIBRARY == MQTT_LIBRARY_ASYNCMQTTCLIENT
@ -1362,34 +1382,33 @@ uint16_t mqttSendRaw(const char * topic, const char * message, bool retain, int
return false ;
}
uint16_t mqttSendRaw ( const char * topic , const char * message , bool retain ) {
uint16_t mqttSendRaw ( const char * topic , const char * message , bool retain ) {
return mqttSendRaw ( topic , message , retain , _mqtt_settings . qos ) ;
}
uint16_t mqttSendRaw ( const char * topic , const char * message ) {
uint16_t mqttSendRaw ( const char * topic , const char * message ) {
return mqttSendRaw ( topic , message , _mqtt_settings . retain ) ;
}
bool mqttSend ( const char * topic , const char * message , bool force , bool retain ) {
bool mqttSend ( const char * topic , const char * message , bool force , bool retain ) {
if ( ! force & & _mqtt_use_json ) {
mqttEnqueue ( topic , message ) ;
_mqtt_json_payload_flush . once (
espurna : : duration : : Milliseconds ( MQTT_USE_JSON_DELAY ) , mqttFlush ) ;
_mqtt_json_payload_flush . once ( mqtt : : build : : JsonDelay , mqttFlush ) ;
return true ;
}
return mqttSendRaw ( mqttTopic ( topic , false ) . c_str ( ) , message , retain ) > 0 ;
return mqttSendRaw ( mqttTopic ( topic ) . c_str ( ) , message , retain ) > 0 ;
}
bool mqttSend ( const char * topic , const char * message , bool force ) {
bool mqttSend ( const char * topic , const char * message , bool force ) {
return mqttSend ( topic , message , force , _mqtt_settings . retain ) ;
}
bool mqttSend ( const char * topic , const char * message ) {
bool mqttSend ( const char * topic , const char * message ) {
return mqttSend ( topic , message , false ) ;
}
bool mqttSend ( const char * topic , unsigned int index , const char * message , bool force , bool retain ) {
bool mqttSend ( const char * topic , unsigned int index , const char * message , bool force , bool retain ) {
const size_t TopicLen { strlen ( topic ) } ;
String out ;
out . reserve ( TopicLen + 5 ) ;
@ -1401,11 +1420,11 @@ bool mqttSend(const char * topic, unsigned int index, const char * message, bool
return mqttSend ( out . c_str ( ) , message , force , retain ) ;
}
bool mqttSend ( const char * topic , unsigned int index , const char * message , bool force ) {
bool mqttSend ( const char * topic , unsigned int index , const char * message , bool force ) {
return mqttSend ( topic , index , message , force , _mqtt_settings . retain ) ;
}
bool mqttSend ( const char * topic , unsigned int index , const char * message ) {
bool mqttSend ( const char * topic , unsigned int index , const char * message ) {
return mqttSend ( topic , index , message , false ) ;
}
@ -1467,7 +1486,7 @@ void mqttFlush() {
mqttSendRaw ( _mqtt_settings . topic_json . c_str ( ) , output . c_str ( ) , false ) ;
}
void mqttEnqueue ( const char * topic , const char * message ) {
void mqttEnqueue ( espurna : : StringView topic , espurna : : StringView payload ) {
// Queue is not meant to send message "offline"
// We must prevent the queue does not get full while offline
if ( _mqtt . connected ( ) ) {
@ -1475,11 +1494,13 @@ void mqttEnqueue(const char* topic, const char* message) {
mqttFlush ( ) ;
}
_mqtt_json_payload . remove_if ( [ topic ] ( const MqttPayload & payload ) {
return payload . topic ( ) = = topic ;
} ) ;
_mqtt_json_payload . remove_if (
[ topic ] ( const MqttPayload & payload ) {
return topic = = payload . topic ( ) ;
} ) ;
_mqtt_json_payload . emplace_front ( topic , message ) ;
_mqtt_json_payload . emplace_front (
topic . toString ( ) , payload . toString ( ) ) ;
+ + _mqtt_json_payload_count ;
}
}
@ -1502,11 +1523,11 @@ uint16_t mqttSubscribeRaw(const char* topic) {
return mqttSubscribeRaw ( topic , _mqtt_settings . qos ) ;
}
bool mqttSubscribe ( const char * topic ) {
return mqttSubscribeRaw ( mqttTopic ( topic , true ) . c_str ( ) , _mqtt_settings . qos ) ;
bool mqttSubscribe ( const char * topic ) {
return mqttSubscribeRaw ( mqttTopicSetter ( topic ) . c_str ( ) , _mqtt_settings . qos ) ;
}
uint16_t mqttUnsubscribeRaw ( const char * topic ) {
uint16_t mqttUnsubscribeRaw ( const char * topic ) {
uint16_t pid { 0u } ;
if ( _mqtt . connected ( ) & & ( strlen ( topic ) > 0 ) ) {
pid = _mqtt . unsubscribe ( topic ) ;
@ -1516,8 +1537,8 @@ uint16_t mqttUnsubscribeRaw(const char * topic) {
return pid ;
}
bool mqttUnsubscribe ( const char * topic ) {
return mqttUnsubscribeRaw ( mqttTopic ( topic , true ) . c_str ( ) ) ;
bool mqttUnsubscribe ( const char * topic ) {
return mqttUnsubscribeRaw ( mqttTopicSetter ( topic ) . c_str ( ) ) ;
}
// -----------------------------------------------------------------------------
@ -1550,7 +1571,7 @@ bool mqttForward() {
@ param standalone function pointer
*/
void mqttRegister ( mqtt_callback_f callback ) {
void mqttRegister ( MqttCallback callback ) {
_mqtt_callbacks . push_front ( callback ) ;
}
@ -1561,9 +1582,12 @@ void mqttRegister(mqtt_callback_f callback) {
@ param callable object
*/
void mqttOnPublish ( uint16_t pid , mqtt_pid_callback_f callback ) {
auto callable = MqttPidCallback { pid , callback } ;
_mqtt_publish_callbacks . push_front ( std : : move ( callable ) ) ;
void mqttOnPublish ( uint16_t pid , MqttPidCallback callback ) {
_mqtt_publish_callbacks . push_front (
MqttPidCallbackHandler {
. pid = pid ,
. callback = std : : move ( callback ) ,
} ) ;
}
/**
@ -1571,9 +1595,12 @@ void mqttOnPublish(uint16_t pid, mqtt_pid_callback_f callback) {
@ param callable object
*/
void mqttOnSubscribe ( uint16_t pid , mqtt_pid_callback_f callback ) {
auto callable = MqttPidCallback { pid , callback } ;
_mqtt_subscribe_callbacks . push_front ( std : : move ( callable ) ) ;
void mqttOnSubscribe ( uint16_t pid , MqttPidCallback callback ) {
_mqtt_subscribe_callbacks . push_front (
MqttPidCallbackHandler {
. pid = pid ,
. callback = std : : move ( callback ) ,
} ) ;
}
# endif
@ -1772,7 +1799,7 @@ void mqttSetup() {
. onConnected ( _mqttWebSocketOnConnected )
. onKeyCheck ( _mqttWebSocketOnKeyCheck ) ;
mqttRegister ( [ ] ( unsigned int type , const char * , char * ) {
mqttRegister ( [ ] ( unsigned int type , espurna : : StringView , espurna : : StringView ) {
if ( ( type = = MQTT_CONNECT_EVENT ) | | ( type = = MQTT_DISCONNECT_EVENT ) ) {
wsPost ( _mqttWebSocketOnData ) ;
}