Browse Source

Publish/subscriber pattern for MQTT events (relay & websockets)

fastled
Xose Pérez 8 years ago
parent
commit
e8b1a0c41d
4 changed files with 142 additions and 95 deletions
  1. +4
    -0
      code/src/config/general.h
  2. +28
    -43
      code/src/mqtt.ino
  3. +94
    -51
      code/src/relay.ino
  4. +16
    -1
      code/src/web.ino

+ 4
- 0
code/src/config/general.h View File

@ -63,6 +63,10 @@
#define MQTT_FSVERSION_TOPIC "/fsversion" #define MQTT_FSVERSION_TOPIC "/fsversion"
#define MQTT_HEARTBEAT_TOPIC "/heartbeat" #define MQTT_HEARTBEAT_TOPIC "/heartbeat"
#define MQTT_CONNECT_EVENT 0
#define MQTT_DISCONNECT_EVENT 1
#define MQTT_MESSAGE_EVENT 2
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// NTP // NTP
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------


+ 28
- 43
code/src/mqtt.ino View File

@ -9,11 +9,12 @@ Copyright (C) 2016 by Xose Pérez <xose dot perez at gmail dot com>
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include <AsyncMqttClient.h> #include <AsyncMqttClient.h>
#include <vector>
AsyncMqttClient mqtt; AsyncMqttClient mqtt;
String mqttTopic; String mqttTopic;
bool isCallbackMessage = false;
std::vector<void (*)(unsigned int, const char *, const char *)> _mqtt_callbacks;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// MQTT // MQTT
@ -33,22 +34,26 @@ void buildTopics() {
mqttTopic.replace("{identifier}", getSetting("hostname")); mqttTopic.replace("{identifier}", getSetting("hostname"));
} }
void mqttSend(char * topic, char * message) {
void mqttSend(const char * topic, const char * message) {
if (!mqtt.connected()) return; if (!mqtt.connected()) return;
if (isCallbackMessage) return;
String path = mqttTopic + String(topic); String path = mqttTopic + String(topic);
DEBUG_MSG("[MQTT] Sending %s %s\n", (char *) path.c_str(), message); DEBUG_MSG("[MQTT] Sending %s %s\n", (char *) path.c_str(), message);
mqtt.publish(path.c_str(), MQTT_QOS, MQTT_RETAIN, message); mqtt.publish(path.c_str(), MQTT_QOS, MQTT_RETAIN, message);
} }
void _mqttOnConnect(bool sessionPresent) {
void mqttSubscribe(const char * topic) {
String path = mqttTopic + String(topic);
DEBUG_MSG("[MQTT] Subscribing to %s\n", (char *) path.c_str());
mqtt.subscribe(path.c_str(), MQTT_QOS);
}
char buffer[50];
void mqttRegister(void (*callback)(unsigned int, const char *, const char *)) {
_mqtt_callbacks.push_back(callback);
}
DEBUG_MSG("[MQTT] Connected!\n");
void _mqttOnConnect(bool sessionPresent) {
// Send status via webSocket
wsSend((char *) "{\"mqttStatus\": true}");
DEBUG_MSG("[MQTT] Connected!\n");
// Build MQTT topics // Build MQTT topics
buildTopics(); buildTopics();
@ -56,57 +61,37 @@ void _mqttOnConnect(bool sessionPresent) {
// Say hello and report our IP and VERSION // Say hello and report our IP and VERSION
mqttSend((char *) MQTT_IP_TOPIC, (char *) getIP().c_str()); mqttSend((char *) MQTT_IP_TOPIC, (char *) getIP().c_str());
mqttSend((char *) MQTT_VERSION_TOPIC, (char *) APP_VERSION); mqttSend((char *) MQTT_VERSION_TOPIC, (char *) APP_VERSION);
char buffer[50];
getFSVersion(buffer); getFSVersion(buffer);
mqttSend((char *) MQTT_FSVERSION_TOPIC, buffer); mqttSend((char *) MQTT_FSVERSION_TOPIC, buffer);
// Publish current relay status
relayMQTT();
// Subscribe to relay topics
sprintf(buffer, "%s/relay/#", mqttTopic.c_str());
DEBUG_MSG("[MQTT] Subscribing to %s\n", buffer);
mqtt.subscribe(buffer, MQTT_QOS);
// Send connect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(*_mqtt_callbacks[i])(MQTT_CONNECT_EVENT, NULL, NULL);
}
} }
void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) { void _mqttOnDisconnect(AsyncMqttClientDisconnectReason reason) {
// Send status via webSocket
wsSend((char *) "{\"mqttStatus\": false}");
DEBUG_MSG("[MQTT] Disconnected!\n");
// Send disconnect event to subscribers
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(*_mqtt_callbacks[i])(MQTT_DISCONNECT_EVENT, NULL, NULL);
}
} }
void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { void _mqttOnMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
static bool isFirstMessage = true;
DEBUG_MSG("[MQTT] Received %s %c\n", topic, payload[0]); DEBUG_MSG("[MQTT] Received %s %c\n", topic, payload[0]);
// If relayMode is not SAME avoid responding to a retained message
if (isFirstMessage) {
isFirstMessage = false;
byte relayMode = getSetting("relayMode", String(RELAY_MODE)).toInt();
if (relayMode != RELAY_MODE_SAME) return;
}
// Get relay ID
unsigned int relayID = topic[strlen(topic)-1] - '0';
if (relayID >= relayCount()) relayID = 0;
// Action to perform
if ((char)payload[0] == '0') {
isCallbackMessage = true;
relayStatus(relayID, false);
}
if ((char)payload[0] == '1') {
isCallbackMessage = true;
relayStatus(relayID, true);
// Send message event to subscribers
// Topic is set to the specific part each one might be checking
for (unsigned char i = 0; i < _mqtt_callbacks.size(); i++) {
(*_mqtt_callbacks[i])(MQTT_MESSAGE_EVENT, topic + mqttTopic.length(), payload);
} }
if ((char)payload[0] == '2') {
relayToggle(relayID);
}
isCallbackMessage = false;
} }


+ 94
- 51
code/src/relay.ino View File

@ -50,28 +50,6 @@ void relayWS() {
wsSend((char *) output.c_str()); wsSend((char *) output.c_str());
} }
void relaySave() {
unsigned char bit = 1;
unsigned char mask = 0;
for (unsigned int i=0; i < _relays.size(); i++) {
if (relayStatus(i)) mask += bit;
bit += bit;
}
EEPROM.write(0, mask);
EEPROM.commit();
}
void relayRetrieve() {
recursive = true;
unsigned char bit = 1;
unsigned char mask = EEPROM.read(0);
for (unsigned int i=0; i < _relays.size(); i++) {
relayStatus(i, ((mask & bit) == bit));
bit += bit;
}
recursive = false;
}
bool relayStatus(unsigned char id) { bool relayStatus(unsigned char id) {
#ifdef SONOFF_DUAL #ifdef SONOFF_DUAL
return ((dualRelayStatus & (1 << id)) > 0); return ((dualRelayStatus & (1 << id)) > 0);
@ -80,6 +58,42 @@ bool relayStatus(unsigned char id) {
#endif #endif
} }
bool relayStatus(unsigned char id, bool status, bool report = true) {
bool changed = false;
if (relayStatus(id) != status) {
DEBUG_MSG("[RELAY] %d => %s\n", id, status ? "ON" : "OFF");
changed = true;
#ifdef SONOFF_DUAL
dualRelayStatus ^= (1 << id);
Serial.flush();
Serial.write(0xA0);
Serial.write(0x04);
Serial.write(dualRelayStatus);
Serial.write(0xA1);
Serial.flush();
#else
digitalWrite(_relays[id], status);
#endif
if (!recursive) {
relaySync(id);
relaySave();
}
}
if (report) relayMQTT(id);
if (!recursive) relayWS();
return changed;
}
void relaySync(unsigned char id) { void relaySync(unsigned char id) {
if (_relays.size() > 1) { if (_relays.size() > 1) {
@ -117,47 +131,74 @@ void relaySync(unsigned char id) {
} }
bool relayStatus(unsigned char id, bool status) {
void relaySave() {
unsigned char bit = 1;
unsigned char mask = 0;
for (unsigned int i=0; i < _relays.size(); i++) {
if (relayStatus(i)) mask += bit;
bit += bit;
}
EEPROM.write(0, mask);
EEPROM.commit();
}
void relayRetrieve() {
recursive = true;
unsigned char bit = 1;
unsigned char mask = EEPROM.read(0);
for (unsigned int i=0; i < _relays.size(); i++) {
relayStatus(i, ((mask & bit) == bit));
bit += bit;
}
recursive = false;
}
void relayToggle(unsigned char id) {
relayStatus(id, !relayStatus(id));
}
bool changed = false;
unsigned char relayCount() {
return _relays.size();
}
if (relayStatus(id) != status) {
void relayMQTTCallback(unsigned int type, const char * topic, const char * payload) {
DEBUG_MSG("[RELAY] %d => %s\n", id, status ? "ON" : "OFF");
changed = true;
static bool isFirstMessage = true;
#ifdef SONOFF_DUAL
if (type == MQTT_CONNECT_EVENT) {
relayMQTT();
mqttSubscribe("/relay/#");
}
dualRelayStatus ^= (1 << id);
Serial.flush();
Serial.write(0xA0);
Serial.write(0x04);
Serial.write(dualRelayStatus);
Serial.write(0xA1);
Serial.flush();
if (type == MQTT_MESSAGE_EVENT) {
#else
digitalWrite(_relays[id], status);
#endif
// Match topic
if (memcmp("/relay/", topic, 7) != 0) return;
if (!recursive) {
relaySync(id);
relaySave();
// If relayMode is not SAME avoid responding to a retained message
if (isFirstMessage) {
isFirstMessage = false;
byte relayMode = getSetting("relayMode", String(RELAY_MODE)).toInt();
if (relayMode != RELAY_MODE_SAME) return;
} }
}
// Get relay ID
unsigned int relayID = topic[strlen(topic)-1] - '0';
if (relayID >= relayCount()) relayID = 0;
relayMQTT(id);
if (!recursive) relayWS();
return changed;
}
// Action to perform
if ((char)payload[0] == '0') {
relayStatus(relayID, false, false);
}
if ((char)payload[0] == '1') {
relayStatus(relayID, true, false);
}
if ((char)payload[0] == '2') {
relayToggle(relayID);
}
void relayToggle(unsigned char id) {
relayStatus(id, !relayStatus(id));
}
}
unsigned char relayCount() {
return _relays.size();
} }
void relaySetup() { void relaySetup() {
@ -196,4 +237,6 @@ void relaySetup() {
if (relayMode == RELAY_MODE_SAME) relayRetrieve(); if (relayMode == RELAY_MODE_SAME) relayRetrieve();
mqttRegister(relayMQTTCallback);
} }

+ 16
- 1
code/src/web.ino View File

@ -39,6 +39,18 @@ bool wsSend(uint32_t client_id, char * payload) {
ws.text(client_id, payload); ws.text(client_id, payload);
} }
void wsMQTTCallback(unsigned int type, const char * topic, const char * payload) {
if (type == MQTT_CONNECT_EVENT) {
wsSend((char *) "{\"mqttStatus\": true}");
}
if (type == MQTT_DISCONNECT_EVENT) {
wsSend((char *) "{\"mqttStatus\": false}");
}
}
void _wsParse(uint32_t client_id, uint8_t * payload, size_t length) { void _wsParse(uint32_t client_id, uint8_t * payload, size_t length) {
// Parse JSON input // Parse JSON input
@ -448,8 +460,11 @@ ArRequestHandlerFunction _onRelayStatusWrapper(unsigned int relayID) {
void webSetup() { void webSetup() {
// Setup websocket plugin
// Setup websocket
ws.onEvent(_wsEvent); ws.onEvent(_wsEvent);
mqttRegister(wsMQTTCallback);
// Setup webserver
server.addHandler(&ws); server.addHandler(&ws);
// Serve home (basic authentication protection) // Serve home (basic authentication protection)


Loading…
Cancel
Save