Browse Source

New mqttEnqueue API

i18n
Xose Pérez 6 years ago
parent
commit
c5db2758de
2 changed files with 117 additions and 59 deletions
  1. +5
    -0
      code/espurna/config/general.h
  2. +112
    -59
      code/espurna/mqtt.ino

+ 5
- 0
code/espurna/config/general.h View File

@ -479,6 +479,11 @@ PROGMEM const char* const custom_reset_string[] = {
#define MQTT_USE_JSON_DELAY 100 // Wait this many ms before grouping messages #define MQTT_USE_JSON_DELAY 100 // Wait this many ms before grouping messages
#define MQTT_QUEUE_MAX_SIZE 10 // Size of the MQTT queue when MQTT_USE_JSON is enabled #define MQTT_QUEUE_MAX_SIZE 10 // Size of the MQTT queue when MQTT_USE_JSON is enabled
// These are the properties that will be send when useJson is true
#define MQTT_ENQUEUE_IP 1
#define MQTT_ENQUEUE_MAC 1
#define MQTT_ENQUEUE_HOSTNAME 1
#define MQTT_ENQUEUE_DATETIME 1
// These particles will be concatenated to the MQTT_TOPIC base to form the actual topic // These particles will be concatenated to the MQTT_TOPIC base to form the actual topic
#define MQTT_TOPIC_JSON "data" #define MQTT_TOPIC_JSON "data"


+ 112
- 59
code/espurna/mqtt.ino View File

@ -39,6 +39,7 @@ unsigned char _mqtt_qos = MQTT_QOS;
bool _mqtt_retain = MQTT_RETAIN; bool _mqtt_retain = MQTT_RETAIN;
unsigned char _mqtt_keepalive = MQTT_KEEPALIVE; unsigned char _mqtt_keepalive = MQTT_KEEPALIVE;
String _mqtt_topic; String _mqtt_topic;
String _mqtt_topic_json;
String _mqtt_setter; String _mqtt_setter;
String _mqtt_getter; String _mqtt_getter;
bool _mqtt_forward; bool _mqtt_forward;
@ -63,35 +64,6 @@ Ticker _mqtt_flush_ticker;
// Private // Private
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void _mqttFlush() {
if (_mqtt_queue.size() == 0) return;
DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
root[element.topic] = element.message;
}
#if NTP_SUPPORT
if (ntpConnected()) root[MQTT_TOPIC_TIME] = ntpDateTime();
#endif
root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
root[MQTT_TOPIC_IP] = getIP();
String output;
root.printTo(output);
mqttSendRaw(mqttTopic(MQTT_TOPIC_JSON, false).c_str(), output.c_str());
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
free(element.topic);
free(element.message);
}
_mqtt_queue.clear();
}
void _mqttConnect() { void _mqttConnect() {
// Do not connect if disabled // Do not connect if disabled
@ -266,6 +238,7 @@ void _mqttConfigure() {
_mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1; _mqtt_enabled = getSetting("mqttEnabled", MQTT_ENABLED).toInt() == 1;
} }
_mqtt_use_json = (getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1); _mqtt_use_json = (getSetting("mqttUseJson", MQTT_USE_JSON).toInt() == 1);
mqttQueueTopic(MQTT_TOPIC_JSON);
_mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN; _mqtt_reconnect_delay = MQTT_RECONNECT_DELAY_MIN;
@ -406,30 +379,6 @@ void _mqttOnMessage(char* topic, char* payload, unsigned int len) {
// Public API // Public API
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void mqttEnabled(bool status) {
_mqtt_enabled = status;
setSetting("mqttEnabled", status ? 1 : 0);
}
bool mqttEnabled() {
return _mqtt_enabled;
}
bool mqttConnected() {
return _mqtt.connected();
}
void mqttDisconnect() {
if (_mqtt.connected()) {
DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
_mqtt.disconnect();
}
}
bool mqttForward() {
return _mqtt_forward;
}
String mqttTopicKey(char * topic) { String mqttTopicKey(char * topic) {
String pattern = _mqtt_topic + _mqtt_setter; String pattern = _mqtt_topic + _mqtt_setter;
@ -463,6 +412,8 @@ String mqttTopic(const char * topic, unsigned int index, bool is_set) {
return mqttTopic(buffer, is_set); return mqttTopic(buffer, is_set);
} }
// -----------------------------------------------------------------------------
void mqttSendRaw(const char * topic, const char * message) { void mqttSendRaw(const char * topic, const char * message) {
if (_mqtt.connected()) { if (_mqtt.connected()) {
#if MQTT_USE_ASYNC #if MQTT_USE_ASYNC
@ -475,21 +426,91 @@ void mqttSendRaw(const char * topic, const char * message) {
} }
} }
void mqttFlush() {
if (_mqtt_queue.size() == 0) return;
DynamicJsonBuffer jsonBuffer;
JsonObject& root = jsonBuffer.createObject();
// Add enqueued messages
for (unsigned char i=0; i<_mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
root[element.topic] = element.message;
}
// Add extra propeties
#if NTP_SUPPORT && MQTT_ENQUEUE_DATETIME
if (ntpConnected()) root[MQTT_TOPIC_TIME] = ntpDateTime();
#endif
#if MQTT_ENQUEUE_MAC
root[MQTT_TOPIC_MAC] = WiFi.macAddress();
#endif
#if MQTT_ENQUEUE_HOSTNAME
root[MQTT_TOPIC_HOSTNAME] = getSetting("hostname");
#endif
#if MQTT_ENQUEUE_IP
root[MQTT_TOPIC_IP] = getIP();
#endif
// Send
String output;
root.printTo(output);
mqttSendRaw(_mqtt_topic_json.c_str(), output.c_str());
// Clear queue
for (unsigned char i = 0; i < _mqtt_queue.size(); i++) {
mqtt_message_t element = _mqtt_queue[i];
free(element.topic);
free(element.message);
}
_mqtt_queue.clear();
}
void mqttQueueTopic(const char * topic) {
String t = mqttTopic(topic, false);
if (!t.equals(_mqtt_topic_json)) {
mqttFlush();
_mqtt_topic_json = t;
}
}
void mqttEnqueue(const char * topic, const char * message) {
// Force flusing the queue if the MQTT_QUEUE_MAX_SIZE has been reached
if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) mqttFlush();
// Enqueue new message
mqtt_message_t element;
element.topic = strdup(topic);
element.message = strdup(message);
_mqtt_queue.push_back(element);
}
void mqttSend(const char * topic, const char * message, bool force) { void mqttSend(const char * topic, const char * message, bool force) {
bool useJson = force ? false : _mqtt_use_json; bool useJson = force ? false : _mqtt_use_json;
// Equeue message
if (useJson) { if (useJson) {
if (_mqtt_queue.size() >= MQTT_QUEUE_MAX_SIZE) _mqttFlush();
// Set default queue topic
mqttQueueTopic(MQTT_TOPIC_JSON);
mqtt_message_t element;
element.topic = strdup(topic);
element.message = strdup(message);
_mqtt_queue.push_back(element);
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, _mqttFlush);
// Enqueue new message
mqttEnqueue(topic, message);
// Reset flush timer
_mqtt_flush_ticker.once_ms(MQTT_USE_JSON_DELAY, mqttFlush);
// Send it right away
} else { } else {
mqttSendRaw(mqttTopic(topic, false).c_str(), message); mqttSendRaw(mqttTopic(topic, false).c_str(), message);
} }
} }
void mqttSend(const char * topic, const char * message) { void mqttSend(const char * topic, const char * message) {
@ -506,6 +527,8 @@ void mqttSend(const char * topic, unsigned int index, const char * message) {
mqttSend(topic, index, message, false); mqttSend(topic, index, message, false);
} }
// -----------------------------------------------------------------------------
void mqttSubscribeRaw(const char * topic) { void mqttSubscribeRaw(const char * topic) {
if (_mqtt.connected() && (strlen(topic) > 0)) { if (_mqtt.connected() && (strlen(topic) > 0)) {
#if MQTT_USE_ASYNC #if MQTT_USE_ASYNC
@ -534,6 +557,36 @@ void mqttUnsubscribeRaw(const char * topic) {
} }
} }
void mqttUnsubscribe(const char * topic) {
mqttUnsubscribeRaw(mqttTopic(topic, true).c_str());
}
// -----------------------------------------------------------------------------
void mqttEnabled(bool status) {
_mqtt_enabled = status;
setSetting("mqttEnabled", status ? 1 : 0);
}
bool mqttEnabled() {
return _mqtt_enabled;
}
bool mqttConnected() {
return _mqtt.connected();
}
void mqttDisconnect() {
if (_mqtt.connected()) {
DEBUG_MSG_P(PSTR("[MQTT] Disconnecting\n"));
_mqtt.disconnect();
}
}
bool mqttForward() {
return _mqtt_forward;
}
void mqttRegister(mqtt_callback_f callback) { void mqttRegister(mqtt_callback_f callback) {
_mqtt_callbacks.push_back(callback); _mqtt_callbacks.push_back(callback);
} }


Loading…
Cancel
Save