Browse Source

Using PubSubClient by default

fastled
Xose Pérez 7 years ago
parent
commit
04ba425678
3 changed files with 86 additions and 20 deletions
  1. +2
    -0
      code/espurna/config/general.h
  2. +83
    -20
      code/espurna/mqtt.ino
  3. +1
    -0
      code/platformio.ini

+ 2
- 0
code/espurna/config/general.h View File

@ -119,6 +119,8 @@
// MQTT // MQTT
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
#define MQTT_USE_ASYNC 0
#define MQTT_SERVER "" #define MQTT_SERVER ""
#define MQTT_PORT 1883 #define MQTT_PORT 1883
#define MQTT_TOPIC "/test/switch/{identifier}" #define MQTT_TOPIC "/test/switch/{identifier}"


+ 83
- 20
code/espurna/mqtt.ino View File

@ -7,10 +7,17 @@ Copyright (C) 2016-2017 by Xose Pérez <xose dot perez at gmail dot com>
*/ */
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include <AsyncMqttClient.h>
#include <vector> #include <vector>
#if MQTT_USE_ASYNC
#include <AsyncMqttClient.h>
AsyncMqttClient mqtt; AsyncMqttClient mqtt;
#else
#include <PubSubClient.h>
WiFiClient mqttWiFiClient;
PubSubClient mqtt(mqttWiFiClient);
bool _mqttConnected = false;
#endif
String mqttTopic; String mqttTopic;
std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks; std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
@ -19,7 +26,7 @@ std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
#endif #endif
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// MQTT
// Public API
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
bool mqttConnected() { bool mqttConnected() {
@ -44,7 +51,11 @@ char * mqttSubtopic(char * topic) {
void mqttSendRaw(const char * topic, const char * message) { void mqttSendRaw(const char * topic, const char * message) {
if (mqtt.connected()) { if (mqtt.connected()) {
DEBUG_MSG("[MQTT] Sending %s => %s\n", topic, message); DEBUG_MSG("[MQTT] Sending %s => %s\n", topic, message);
mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message);
#if MQTT_USE_ASYNC
mqtt.publish(topic, MQTT_QOS, MQTT_RETAIN, message);
#else
mqtt.publish(topic, message, MQTT_RETAIN);
#endif
} }
} }
@ -65,11 +76,15 @@ void mqttSubscribe(const char * topic) {
mqttSubscribeRaw(path.c_str()); mqttSubscribeRaw(path.c_str());
} }
// -----------------------------------------------------------------------------
// Callbacks
// -----------------------------------------------------------------------------
void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) { void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) {
_mqtt_callbacks.push_back(callback); _mqtt_callbacks.push_back(callback);
} }
void _mqttOnConnect(bool sessionPresent) {
void _mqttOnConnect() {
DEBUG_MSG("[MQTT] Connected!\n"); DEBUG_MSG("[MQTT] Connected!\n");
@ -94,7 +109,7 @@ void _mqttOnConnect(bool sessionPresent) {
} }
void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) {
void _mqttOnDisconnect() {
DEBUG_MSG("[MQTT] Disconnected!\n"); DEBUG_MSG("[MQTT] Disconnected!\n");
@ -105,10 +120,10 @@ void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) {
} }
void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
char message[len+1];
strlcpy(message, payload, len+1);
char message[len + 1];
strlcpy(message, (char *) payload, len + 1);
DEBUG_MSG("[MQTT] Received %s => %s", topic, message); DEBUG_MSG("[MQTT] Received %s => %s", topic, message);
#if MQTT_SKIP_RETAINED #if MQTT_SKIP_RETAINED
@ -122,7 +137,7 @@ void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties
// Check system topics // Check system topics
char * p = mqttSubtopic(topic); char * p = mqttSubtopic(topic);
if (strcmp(p, MQTT_ACTION_TOPIC) == 0) { if (strcmp(p, MQTT_ACTION_TOPIC) == 0) {
if (strcmp(payload, MQTT_ACTION_RESET) == 0) {
if (strcmp(message, MQTT_ACTION_RESET) == 0) {
ESP.reset(); ESP.reset();
} }
} }
@ -167,14 +182,38 @@ void mqttConnect() {
DEBUG_MSG("[MQTT] Connecting to broker at %s", host); DEBUG_MSG("[MQTT] Connecting to broker at %s", host);
mqtt.setServer(host, port); mqtt.setServer(host, port);
mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
mqtt.setWill((mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
if ((strlen(user) > 0) && (strlen(pass) > 0)) {
DEBUG_MSG(" as user '%s'.", user);
mqtt.setCredentials(user, pass);
}
DEBUG_MSG("\n");
mqtt.connect();
#if MQTT_USE_ASYNC
mqtt.setKeepAlive(MQTT_KEEPALIVE).setCleanSession(false);
mqtt.setWill((mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
if ((strlen(user) > 0) && (strlen(pass) > 0)) {
DEBUG_MSG(" as user '%s'.", user);
mqtt.setCredentials(user, pass);
}
DEBUG_MSG("\n");
mqtt.connect();
#else
bool response;
if ((strlen(user) > 0) && (strlen(pass) > 0)) {
DEBUG_MSG(" as user '%s'\n", user);
response = mqtt.connect(getIdentifier().c_str(), user, pass, (mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
} else {
DEBUG_MSG("\n");
response = mqtt.connect(getIdentifier().c_str(), (mqttTopic + MQTT_HEARTBEAT_TOPIC).c_str(), MQTT_QOS, MQTT_RETAIN, "0");
}
if (response) {
_mqttOnConnect();
_mqttConnected = true;
} else {
DEBUG_MSG("[MQTT] Connection failed\n");
}
#endif
free(host); free(host);
free(user); free(user);
@ -185,9 +224,21 @@ void mqttConnect() {
} }
void mqttSetup() { void mqttSetup() {
mqtt.onConnect(_mqttOnConnect);
mqtt.onDisconnect(_mqttOnDisconnect);
mqtt.onMessage(_mqttOnMessage);
#if MQTT_USE_ASYNC
mqtt.onConnect([](bool sessionPresent) {
_mqttOnConnect();
});
mqtt.onDisconnect([](AsyncMqttClientDisconnectReason reason) {
_mqttOnDisconnect();
});
mqtt.onMessage([](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
_mqttOnMessage(topic, payload, len);
});
#else
mqtt.setCallback([](char* topic, byte* payload, unsigned int length) {
_mqttOnMessage(topic, (char *) payload, length);
});
#endif
buildTopics(); buildTopics();
} }
@ -199,12 +250,24 @@ void mqttLoop() {
if (!mqtt.connected()) { if (!mqtt.connected()) {
#if not MQTT_USE_ASYNC
if (_mqttConnected) {
_mqttOnDisconnect();
_mqttConnected = false;
}
#endif
unsigned long currPeriod = millis() / MQTT_RECONNECT_DELAY; unsigned long currPeriod = millis() / MQTT_RECONNECT_DELAY;
if (currPeriod != lastPeriod) { if (currPeriod != lastPeriod) {
lastPeriod = currPeriod; lastPeriod = currPeriod;
mqttConnect(); mqttConnect();
} }
#if not MQTT_USE_ASYNC
} else {
mqtt.loop();
#endif
} }
} }


+ 1
- 0
code/platformio.ini View File

@ -12,6 +12,7 @@ lib_deps =
https://github.com/me-no-dev/ESPAsyncTCP#36b6b5a https://github.com/me-no-dev/ESPAsyncTCP#36b6b5a
https://github.com/me-no-dev/ESPAsyncWebServer#bab5457 https://github.com/me-no-dev/ESPAsyncWebServer#bab5457
https://github.com/marvinroger/async-mqtt-client#f1b4576 https://github.com/marvinroger/async-mqtt-client#f1b4576
PubSubClient
Embedis Embedis
NtpClientLib NtpClientLib
OneWire OneWire


Loading…
Cancel
Save