Browse Source

Refactor MQTT module

i18n
Xose Pérez 6 years ago
parent
commit
a716633c1b
3 changed files with 272 additions and 262 deletions
  1. +10
    -10
      code/espurna/homeassitant.ino
  2. +261
    -248
      code/espurna/mqtt.ino
  3. +1
    -4
      code/espurna/ws.ino

+ 10
- 10
code/espurna/homeassitant.ino View File

@ -41,11 +41,11 @@ void _haSend() {
root["platform"] = "mqtt"; root["platform"] = "mqtt";
if (relayCount()) { if (relayCount()) {
root["state_topic"] = getTopic(MQTT_TOPIC_RELAY, 0, false);
root["command_topic"] = getTopic(MQTT_TOPIC_RELAY, 0, true);
root["state_topic"] = mqttGetTopic(MQTT_TOPIC_RELAY, 0, false);
root["command_topic"] = mqttGetTopic(MQTT_TOPIC_RELAY, 0, true);
root["payload_on"] = String("1"); root["payload_on"] = String("1");
root["payload_off"] = String("0"); root["payload_off"] = String("0");
root["availability_topic"] = getTopic(MQTT_TOPIC_STATUS, false);
root["availability_topic"] = mqttGetTopic(MQTT_TOPIC_STATUS, false);
root["payload_available"] = String("1"); root["payload_available"] = String("1");
root["payload_not_available"] = String("0"); root["payload_not_available"] = String("0");
} }
@ -53,16 +53,16 @@ void _haSend() {
#if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE #if LIGHT_PROVIDER != LIGHT_PROVIDER_NONE
if (lightHasColor()) { if (lightHasColor()) {
root["brightness_state_topic"] = getTopic(MQTT_TOPIC_BRIGHTNESS, false);
root["brightness_command_topic"] = getTopic(MQTT_TOPIC_BRIGHTNESS, true);
root["rgb_state_topic"] = getTopic(MQTT_TOPIC_COLOR_RGB, false);
root["rgb_command_topic"] = getTopic(MQTT_TOPIC_COLOR_RGB, true);
root["color_temp_command_topic"] = getTopic(MQTT_TOPIC_MIRED, true);
root["brightness_state_topic"] = mqttGetTopic(MQTT_TOPIC_BRIGHTNESS, false);
root["brightness_command_topic"] = mqttGetTopic(MQTT_TOPIC_BRIGHTNESS, true);
root["rgb_state_topic"] = mqttGetTopic(MQTT_TOPIC_COLOR_RGB, false);
root["rgb_command_topic"] = mqttGetTopic(MQTT_TOPIC_COLOR_RGB, true);
root["color_temp_command_topic"] = mqttGetTopic(MQTT_TOPIC_MIRED, true);
} }
if (lightChannels() > 3) { if (lightChannels() > 3) {
root["white_value_state_topic"] = getTopic(MQTT_TOPIC_CHANNEL, 3, false);
root["white_value_command_topic"] = getTopic(MQTT_TOPIC_CHANNEL, 3, true);
root["white_value_state_topic"] = mqttGetTopic(MQTT_TOPIC_CHANNEL, 3, false);
root["white_value_command_topic"] = mqttGetTopic(MQTT_TOPIC_CHANNEL, 3, true);
} }
#endif // LIGHT_PROVIDER != LIGHT_PROVIDER_NONE #endif // LIGHT_PROVIDER != LIGHT_PROVIDER_NONE


+ 261
- 248
code/espurna/mqtt.ino View File

@ -60,57 +60,9 @@ std::vector<mqtt_message_t> _mqtt_queue;
Ticker _mqtt_flush_ticker; Ticker _mqtt_flush_ticker;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Public API
// Private
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool mqttConnected() {
return _mqtt.connected();
}
void mqttDisconnect() {
if (_mqtt.connected()) {
DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
_mqtt.disconnect();
}
}
bool mqttForward() {
return _mqtt_forward;
}
String mqttSubtopic(char * topic) {
String response;
String t = String(topic);
if (t.startsWith(_mqtt_topic) && t.endsWith(_mqtt_setter)) {
response = t.substring(_mqtt_topic.length(), t.length() - _mqtt_setter.length());
}
return response;
}
void mqttSendRaw(const char * topic, const char * message) {
if (_mqtt.connected()) {
#if MQTT_USE_ASYNC
unsigned int packetId = _mqtt.publish(topic, _mqtt_qos, _mqtt_retain, message);
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId);
#else
_mqtt.publish(topic, message, _mqtt_retain);
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
#endif
}
}
String getTopic(const char * topic, bool set) {
String output = _mqtt_topic + String(topic);
if (set) output += _mqtt_setter;
return output;
}
String getTopic(const char * topic, unsigned int index, bool set) {
char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
return getTopic(buffer, set);
}
void _mqttFlush() { void _mqttFlush() {
if (_mqtt_queue.size() == 0) return; if (_mqtt_queue.size() == 0) return;
@ -141,190 +93,7 @@ void _mqttFlush() {
} }
void mqttSend(const char * topic, const char * message, bool force) {
bool useJson = force ? false : _mqtt_use_json;
if (useJson) {
if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) _mqttFlush();
mqtt_message_t element;
element.topic = strdup(topic);
element.message = strdup(message);
_mqtt_queue.push_back(element);
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush);
} else {
String path = _mqtt_topic + String(topic) + _mqtt_getter;
mqttSendRaw(path.c_str(), message);
}
}
void mqttSend(const char * topic, const char * message) {
mqttSend(topic, message, false);
}
void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
mqttSend(buffer, message, force);
}
void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSend(topic, index, message, false);
}
void mqttSubscribeRaw(const char * topic) {
if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_USE_ASYNC
unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
#else
_mqtt.subscribe(topic, _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
#endif
}
}
void mqttSubscribe(const char * topic) {
String path = _mqtt_topic + String(topic) + _mqtt_setter;
mqttSubscribeRaw(path.c_str());
}
void mqttUnsubscribeRaw(const char * topic) {
if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_USE_ASYNC
unsigned int packetId = _mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
#else
_mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
#endif
}
}
void mqttRegister(mqtt_callback_f callback) {
_mqtt_callbacks.push_back(callback);
}
// -----------------------------------------------------------------------------
// Callbacks
// -----------------------------------------------------------------------------
#if WEB_SUPPORT
void _mqttWebSocketOnSend(JsonObject& root) {
root["mqttVisible"] = 1;
root["mqttStatus"] = mqttConnected();
root["mqttEnabled"] = mqttEnabled();
root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
root["mqttUser"] = getSetting("mqttUser");
root["mqttClientID"] = getSetting("mqttClientID");
root["mqttPassword"] = getSetting("mqttPassword");
root["mqttKeep"] = _mqtt_keepalive;
root["mqttRetain"] = _mqtt_retain;
root["mqttQoS"] = _mqtt_qos;
#if ASYNC_TCP_SSL_ENABLED
root["mqttsslVisible"] = 1;
root["mqttUseSSL"] = getSetting("mqttUseSSL", 0).toInt() == 1;
root["mqttFP"] = getSetting("mqttFP");
#endif
root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
root["mqttUseJson"] = getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1;
}
#endif
void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
if (type == MQTT_CONNECT_EVENT) {
// Subscribe to internal action topics
mqttSubscribe(MQTT_TOPIC_ACTION);
// Send heartbeat messages
heartbeat();
}
if (type == MQTT_MESSAGE_EVENT) {
// Match topic
String t = mqttSubtopic((char *) topic);
// Actions
if (t.equals(MQTT_TOPIC_ACTION)) {
if (strcmp(payload, MQTT_ACTION_RESET) == 0) {
deferredReset(100, CUSTOM_RESET_MQTT);
}
}
}
}
void _mqttOnConnect() {
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
#if MQTT_SKIP_RETAINED
_mqtt_connected_at = millis();
#endif
// Clean subscriptions
mqttUnsubscribeRaw("#");
// Send connect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
}
}
void _mqttOnDisconnect() {
DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
// Send disconnect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
}
}
void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
if (len == 0) return;
char message[len + 1];
strlcpy(message, (char *) payload, len + 1);
#if MQTT_SKIP_RETAINED
if (millis() - _mqtt_connected_at < MQTT_SKIP_TIME) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message);
return;
}
#endif
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
// Send message event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
}
}
void mqttEnabled(bool status) {
_mqtt_enabled = status;
setSetting("mqttEnabled", status ? 1 : 0);
}
bool mqttEnabled() {
return _mqtt_enabled;
}
void mqttConnect() {
void _mqttConnect() {
// Do not connect if disabled // Do not connect if disabled
if (!_mqtt_enabled) return; if (!_mqtt_enabled) return;
@ -467,7 +236,7 @@ void mqttConnect() {
} }
void mqttConfigure() {
void _mqttConfigure() {
// Replace identifier // Replace identifier
_mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC); _mqtt_topic = getSetting("mqttTopic", MQTT_TOPIC);
@ -497,18 +266,37 @@ void mqttConfigure() {
} }
void mqttSetBroker(IPAddress ip, unsigned int port) {
setSetting("mqttServer", ip.toString());
setSetting("mqttPort", port);
mqttEnabled(MQTT_AUTOCONNECT);
}
// -----------------------------------------------------------------------------
// WEB
// -----------------------------------------------------------------------------
void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
if (!hasSetting("mqttServer")) mqttSetBroker(ip, port);
#if WEB_SUPPORT
void _mqttWebSocketOnSend(JsonObject& root) {
root["mqttVisible"] = 1;
root["mqttStatus"] = mqttConnected();
root["mqttEnabled"] = mqttEnabled();
root["mqttServer"] = getSetting("mqttServer", MQTT_SERVER);
root["mqttPort"] = getSetting("mqttPort", MQTT_PORT);
root["mqttUser"] = getSetting("mqttUser");
root["mqttClientID"] = getSetting("mqttClientID");
root["mqttPassword"] = getSetting("mqttPassword");
root["mqttKeep"] = _mqtt_keepalive;
root["mqttRetain"] = _mqtt_retain;
root["mqttQoS"] = _mqtt_qos;
#if ASYNC_TCP_SSL_ENABLED
root["mqttsslVisible"] = 1;
root["mqttUseSSL"] = getSetting("mqttUseSSL", 0).toInt() == 1;
root["mqttFP"] = getSetting("mqttFP");
#endif
root["mqttTopic"] = getSetting("mqttTopic", MQTT_TOPIC);
root["mqttUseJson"] = getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1;
} }
#endif
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Comands
// SETTINGS
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#if TERMINAL_SUPPORT #if TERMINAL_SUPPORT
@ -516,7 +304,7 @@ void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
void _mqttInitCommands() { void _mqttInitCommands() {
settingsRegisterCommand(F("MQTT.RESET"), [](Embedis* e) { settingsRegisterCommand(F("MQTT.RESET"), [](Embedis* e) {
mqttConfigure();
_mqttConfigure();
mqttDisconnect(); mqttDisconnect();
DEBUG_MSG_P(PSTR("+OK\n")); DEBUG_MSG_P(PSTR("+OK\n"));
}); });
@ -526,7 +314,232 @@ void _mqttInitCommands() {
#endif // TERMINAL_SUPPORT #endif // TERMINAL_SUPPORT
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Setup
// MQTT Callbacks
// -----------------------------------------------------------------------------
void _mqttCallback(unsigned int type, const char * topic, const char * payload) {
if (type == MQTT_CONNECT_EVENT) {
// Subscribe to internal action topics
mqttSubscribe(MQTT_TOPIC_ACTION);
// Send heartbeat messages
heartbeat();
}
if (type == MQTT_MESSAGE_EVENT) {
// Match topic
String t = mqttSubtopic((char *) topic);
// Actions
if (t.equals(MQTT_TOPIC_ACTION)) {
if (strcmp(payload, MQTT_ACTION_RESET) == 0) {
deferredReset(100, CUSTOM_RESET_MQTT);
}
}
}
}
void _mqttOnConnect() {
DEBUG_MSG_P(PSTR("[MQTT] Connected!\n"));
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
#if MQTT_SKIP_RETAINED
_mqtt_connected_at = millis();
#endif
// Clean subscriptions
mqttUnsubscribeRaw("#");
// Send connect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
}
}
void _mqttOnDisconnect() {
DEBUG_MSG_P(PSTR("[MQTT] Disconnected!\n"));
// Send disconnect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
}
}
void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
if (len == 0) return;
char message[len + 1];
strlcpy(message, (char *) payload, len + 1);
#if MQTT_SKIP_RETAINED
if (millis() - _mqtt_connected_at < MQTT_SKIP_TIME) {
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s - SKIPPED\n"), topic, message);
return;
}
#endif
DEBUG_MSG_P(PSTR("[MQTT] Received %s => %s\n"), topic, message);
// Send message event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic, message);
}
}
// -----------------------------------------------------------------------------
// Public API
// -----------------------------------------------------------------------------
void mqttEnabled(bool status) {
_mqtt_enabled = status;
setSetting("mqttEnabled", status ? 1 : 0);
}
bool mqttEnabled() {
return _mqtt_enabled;
}
bool mqttConnected() {
return _mqtt.connected();
}
void mqttDisconnect() {
if (_mqtt.connected()) {
DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
_mqtt.disconnect();
}
}
bool mqttForward() {
return _mqtt_forward;
}
String mqttSubtopic(char * topic) {
String response;
String t = String(topic);
if (t.startsWith(_mqtt_topic) && t.endsWith(_mqtt_setter)) {
response = t.substring(_mqtt_topic.length(), t.length() - _mqtt_setter.length());
}
return response;
}
String mqttGetTopic(const char * topic, bool set) {
String output = _mqtt_topic + String(topic);
if (set) output += _mqtt_setter;
return output;
}
String mqttGetTopic(const char * topic, unsigned int index, bool set) {
char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
return mqttGetTopic(buffer, set);
}
void mqttSendRaw(const char * topic, const char * message) {
if (_mqtt.connected()) {
#if MQTT_USE_ASYNC
unsigned int packetId = _mqtt.publish(topic, _mqtt_qos, _mqtt_retain, message);
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s (PID %d)\n"), topic, message, packetId);
#else
_mqtt.publish(topic, message, _mqtt_retain);
DEBUG_MSG_P(PSTR("[MQTT] Sending %s => %s\n"), topic, message);
#endif
}
}
void mqttSend(const char * topic, const char * message, bool force) {
bool useJson = force ? false : _mqtt_use_json;
if (useJson) {
if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) _mqttFlush();
mqtt_message_t element;
element.topic = strdup(topic);
element.message = strdup(message);
_mqtt_queue.push_back(element);
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush);
} else {
String path = _mqtt_topic + String(topic) + _mqtt_getter;
mqttSendRaw(path.c_str(), message);
}
}
void mqttSend(const char * topic, const char * message) {
mqttSend(topic, message, false);
}
void mqttSend(const char * topic, unsigned int index, const char * message, bool force) {
char buffer[strlen(topic)+5];
snprintf_P(buffer, sizeof(buffer), PSTR("%s/%d"), topic, index);
mqttSend(buffer, message, force);
}
void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSend(topic, index, message, false);
}
void mqttSubscribeRaw(const char * topic) {
if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_USE_ASYNC
unsigned int packetId = _mqtt.subscribe(topic, _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s (PID %d)\n"), topic, packetId);
#else
_mqtt.subscribe(topic, _mqtt_qos);
DEBUG_MSG_P(PSTR("[MQTT] Subscribing to %s\n"), topic);
#endif
}
}
void mqttSubscribe(const char * topic) {
String path = _mqtt_topic + String(topic) + _mqtt_setter;
mqttSubscribeRaw(path.c_str());
}
void mqttUnsubscribeRaw(const char * topic) {
if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_USE_ASYNC
unsigned int packetId = _mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s (PID %d)\n"), topic, packetId);
#else
_mqtt.unsubscribe(topic);
DEBUG_MSG_P(PSTR("[MQTT] Unsubscribing to %s\n"), topic);
#endif
}
}
void mqttRegister(mqtt_callback_f callback) {
_mqtt_callbacks.push_back(callback);
}
void mqttSetBroker(IPAddress ip, unsigned int port) {
setSetting("mqttServer", ip.toString());
setSetting("mqttPort", port);
mqttEnabled(MQTT_AUTOCONNECT);
}
void mqttSetBrokerIfNone(IPAddress ip, unsigned int port) {
if (!hasSetting("mqttServer")) mqttSetBroker(ip, port);
}
void mqttReset() {
_mqttConfigure();
mqttDisconnect();
}
// -----------------------------------------------------------------------------
// Initialization
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void mqttSetup() { void mqttSetup() {
@ -583,12 +596,12 @@ void mqttSetup() {
#endif // MQTT_USE_ASYNC #endif // MQTT_USE_ASYNC
mqttConfigure();
_mqttConfigure();
mqttRegister(_mqttCallback); mqttRegister(_mqttCallback);
#if WEB_SUPPORT #if WEB_SUPPORT
wsOnSendRegister(_mqttWebSocketOnSend); wsOnSendRegister(_mqttWebSocketOnSend);
wsOnAfterParseRegister(mqttConfigure);
wsOnAfterParseRegister(_mqttConfigure);
#endif #endif
#if TERMINAL_SUPPORT #if TERMINAL_SUPPORT
@ -603,7 +616,7 @@ void mqttLoop() {
#if MQTT_USE_ASYNC #if MQTT_USE_ASYNC
mqttConnect();
_mqttConnect();
#else // not MQTT_USE_ASYNC #else // not MQTT_USE_ASYNC
@ -618,7 +631,7 @@ void mqttLoop() {
_mqtt_connected = false; _mqtt_connected = false;
} }
mqttConnect();
_mqttConnect();
} }


+ 1
- 4
code/espurna/ws.ino View File

@ -182,10 +182,7 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
// This should got to callback as well // This should got to callback as well
// but first change management has to be in place // but first change management has to be in place
#if MQTT_SUPPORT #if MQTT_SUPPORT
if (changedMQTT) {
mqttConfigure();
mqttDisconnect();
}
if (changedMQTT) mqttReset();
#endif #endif
// Persist settings // Persist settings


Loading…
Cancel
Save