From 0ae6e3b48eb6109efa6e86bf122f272f12343ffc Mon Sep 17 00:00:00 2001 From: Max Prokhorov Date: Mon, 16 Dec 2019 14:09:09 +0300 Subject: [PATCH] Influxdb: fix sensor send frequency, use async client, send data in batches (#2061) * broker: drop REAL_TIME flag, use separate Broker entities * influxdb: use async client, send data in batches --- code/espurna/broker.h | 8 +- code/espurna/config/general.h | 4 - code/espurna/influxdb.ino | 219 +++++++++++++++++++++++------ code/espurna/libs/SyncClientWrap.h | 37 ----- code/espurna/rpnrules.ino | 2 +- code/espurna/sensor.ino | 8 +- 6 files changed, 188 insertions(+), 90 deletions(-) delete mode 100644 code/espurna/libs/SyncClientWrap.h diff --git a/code/espurna/broker.h b/code/espurna/broker.h index 11b987a3..445b246b 100644 --- a/code/espurna/broker.h +++ b/code/espurna/broker.h @@ -17,7 +17,8 @@ Copyright (C) 2017-2019 by Xose Pérez enum class TBrokerType { SYSTEM, STATUS, - SENSOR, + SENSOR_READ, + SENSOR_REPORT, DATETIME, CONFIG }; @@ -49,7 +50,10 @@ TBrokerCallbacks TBroker::callbacks; // --- Some known types. Bind them here to avoid .ino screwing with order --- using StatusBroker = TBroker; -using SensorBroker = TBroker; + +using SensorReadBroker = TBroker; +using SensorReportBroker = TBroker; + using TimeBroker = TBroker; using ConfigBroker = TBroker; diff --git a/code/espurna/config/general.h b/code/espurna/config/general.h index f57d50c6..e1850a7a 100644 --- a/code/espurna/config/general.h +++ b/code/espurna/config/general.h @@ -1129,10 +1129,6 @@ #define BROKER_SUPPORT 1 // The broker is a poor-man's pubsub manager #endif -#ifndef BROKER_REAL_TIME -#define BROKER_REAL_TIME 1 // Report real time data -#endif - // ----------------------------------------------------------------------------- // SETTINGS // ----------------------------------------------------------------------------- diff --git a/code/espurna/influxdb.ino b/code/espurna/influxdb.ino index eee2e068..8662efe4 100644 --- a/code/espurna/influxdb.ino +++ b/code/espurna/influxdb.ino @@ -8,13 +8,104 @@ Copyright (C) 2017-2019 by Xose Pérez #if INFLUXDB_SUPPORT -#include "ESPAsyncTCP.h" +#include +#include +#include #include "broker.h" -#include "libs/SyncClientWrap.h" + +const char INFLUXDB_REQUEST_TEMPLATE[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n"; + +constexpr const unsigned long INFLUXDB_CLIENT_TIMEOUT = 5000; +constexpr const size_t INFLUXDB_DATA_BUFFER_SIZE = 256; bool _idb_enabled = false; -SyncClientWrap * _idb_client; +String _idb_host; +uint16_t _idb_port = 0; + +std::map _idb_values; +String _idb_data; +bool _idb_flush = false; + +std::unique_ptr _idb_client = nullptr; +bool _idb_connecting = false; +bool _idb_connected = false; + +uint32_t _idb_client_ts = 0; + +// ----------------------------------------------------------------------------- + +void _idbInitClient() { + + _idb_client = std::make_unique(); + + _idb_client->onDisconnect([](void * s, AsyncClient * client) { + DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n")); + _idb_flush = false; + _idb_data = ""; + _idb_client_ts = 0; + _idb_connected = false; + _idb_connecting = false; + }, nullptr); + + _idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) { + DEBUG_MSG_P(PSTR("[INFLUXDB] Network timeout after %ums\n"), time); + client->close(true); + }, nullptr); + + _idb_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) { + // ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1 + const char idb_success[] = "HTTP/1.1 204"; + const bool result = (len > sizeof(idb_success) && (0 == strncmp((char*) response, idb_success, sizeof(idb_success)))); + DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - _idb_client_ts); + _idb_client_ts = millis(); + client->close(); + }, nullptr); + + _idb_client->onPoll([](void * arg, AsyncClient * client) { + unsigned long ts = millis() - _idb_client_ts; + if (ts > INFLUXDB_CLIENT_TIMEOUT) { + DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts); + client->close(true); + return; + } + + if (_idb_data.length()) { + client->write(_idb_data.c_str(), _idb_data.length()); + _idb_data = ""; + } + }); + + _idb_client->onConnect([](void * arg, AsyncClient * client) { + + _idb_client_ts = millis(); + _idb_connected = true; + _idb_connecting = false; + + DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"), + IPAddress(client->getRemoteAddress()).toString().c_str(), + client->getRemotePort() + ); + + constexpr const int BUFFER_SIZE = 256; + char headers[BUFFER_SIZE]; + int len = snprintf_P(headers, sizeof(headers), INFLUXDB_REQUEST_TEMPLATE, + getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(), + getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), + getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(), + _idb_host.c_str(), _idb_port, _idb_data.length() + ); + if ((len < 0) || (len > BUFFER_SIZE - 1)) { + client->close(true); + return; + } + + client->write(headers, len); + + }); + +} + // ----------------------------------------------------------------------------- @@ -41,6 +132,7 @@ void _idbConfigure() { _idb_enabled = false; setSetting("idbEnabled", 0); } + if (_idb_enabled && !_idb_client) _idbInitClient(); } #if BROKER_SUPPORT @@ -59,51 +151,79 @@ void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value) bool idbSend(const char * topic, const char * payload) { - if (!_idb_enabled) return true; - if (!wifiConnected() || (WiFi.getMode() != WIFI_STA)) return true; + if (!_idb_enabled) return false; + if (_idb_connected) return false; - String h = getSetting("idbHost", INFLUXDB_HOST); - #if MDNS_CLIENT_SUPPORT - h = mdnsResolve(h); - #endif - char * host = strdup(h.c_str()); - unsigned int port = getSetting("idbPort", INFLUXDB_PORT).toInt(); - DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host, port); + _idb_values[topic] = payload; + _idb_flush = true; - bool success = false; + return true; +} - _idb_client->setTimeout(2); - if (_idb_client->connect((const char *) host, (unsigned int) port)) { +void _idbSend(const String& host, const uint16_t port) { + if (_idb_connected || _idb_connecting) return; - char data[128]; - snprintf(data, sizeof(data), "%s,device=%s value=%s", topic, getSetting("hostname").c_str(), String(payload).c_str()); - DEBUG_MSG_P(PSTR("[INFLUXDB] Data: %s\n"), data); + DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port); - char request[256]; - snprintf(request, sizeof(request), "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n%s", - getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(), - getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(), - host, port, strlen(data), data); - - if (_idb_client->printf(request) > 0) { - while (_idb_client->connected() && _idb_client->available() == 0) delay(1); - while (_idb_client->available()) _idb_client->read(); - if (_idb_client->connected()) _idb_client->stop(); - success = true; - } else { - DEBUG_MSG_P(PSTR("[INFLUXDB] Sent failed\n")); - } + // TODO: cache `Host: :` instead of storing things separately? + _idb_host = host; + _idb_port = port; - _idb_client->stop(); - while (_idb_client->connected()) yield(); + _idb_client_ts = millis(); + _idb_connecting = _idb_client->connect(host.c_str(), port); - } else { - DEBUG_MSG_P(PSTR("[INFLUXDB] Connection failed\n")); + if (!_idb_connecting) { + DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port); + _idb_client->close(true); } +} + +void _idbFlush() { + // Clean-up client object when not in use + if (_idb_client && !_idb_enabled && !_idb_connected && !_idb_connecting) { + _idb_client = nullptr; + } + + // Wait until current connection is finished + if (!_idb_flush) return; + if (_idb_connected || _idb_connecting) return; + + // Wait until connected + if (!wifiConnected()) return; + + // TODO: MDNS_CLIENT_SUPPORT is deprecated + String host = getSetting("idbHost", INFLUXDB_HOST); + #if MDNS_CLIENT_SUPPORT + host = mdnsResolve(host); + #endif + + const uint16_t port = getSetting("idbPort", INFLUXDB_PORT).toInt(); + + // TODO: should we always store specific pairs like tspk keeps relay / sensor readings? + // note that we also send heartbeat data, persistent values should be flagged + const String device = getSetting("hostname"); + + _idb_data = ""; + for (auto& pair : _idb_values) { + if (!isNumber(pair.second.c_str())) { + String quoted; + quoted.reserve(pair.second.length() + 2); + quoted += '"'; + quoted += pair.second; + quoted += '"'; + pair.second = quoted; + } - free(host); - return success; + char buffer[128] = {0}; + snprintf_P(buffer, sizeof(buffer), + PSTR("%s,device=%s value=%s\n"), + pair.first.c_str(), device.c_str(), pair.second.c_str() + ); + _idb_data += buffer; + } + _idb_values.clear(); + _idbSend(host, port); } bool idbSend(const char * topic, unsigned char id, const char * payload) { @@ -118,8 +238,6 @@ bool idbEnabled() { void idbSetup() { - _idb_client = new SyncClientWrap(); - _idbConfigure(); #if WEB_SUPPORT @@ -131,11 +249,28 @@ void idbSetup() { #if BROKER_SUPPORT StatusBroker::Register(_idbBrokerStatus); - SensorBroker::Register(_idbBrokerSensor); + SensorReportBroker::Register(_idbBrokerSensor); #endif - // Main callbacks espurnaRegisterReload(_idbConfigure); + espurnaRegisterLoop(_idbFlush); + + _idb_data.reserve(INFLUXDB_DATA_BUFFER_SIZE); + + #if TERMINAL_SUPPORT + terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) { + if (e->argc != 4) { + terminalError(F("idb.send ")); + return; + } + + const String topic = e->argv[1]; + const auto id = atoi(e->argv[2]); + const String value = e->argv[3]; + + idbSend(topic.c_str(), id, value.c_str()); + }); + #endif } diff --git a/code/espurna/libs/SyncClientWrap.h b/code/espurna/libs/SyncClientWrap.h deleted file mode 100644 index b252eaf5..00000000 --- a/code/espurna/libs/SyncClientWrap.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - -SyncClientWrap - -Temporary wrap to fix https://github.com/me-no-dev/ESPAsyncTCP/issues/109 -*/ - -#pragma once - -#include - -// ref Core 2.5.0: cores/esp8266/IPAddress.h -#ifndef CONST -#include - -#if LWIP_VERSION_MAJOR == 1 -#define CONST -#else -#define CONST const -#endif - -#endif - -class SyncClientWrap: public SyncClient { - - public: - SyncClientWrap() {} - ~SyncClientWrap() {} - - // int connect(const char*, uint16_t); - using SyncClient::connect; - - int connect(CONST IPAddress& ip, uint16_t port) { IPAddress _ip(ip); return SyncClient::connect(_ip, port); } - bool flush(unsigned int maxWaitMs = 0) { SyncClient::flush(); return true; } - bool stop(unsigned int maxWaitMs = 0) { SyncClient::stop(); return true; } - -}; diff --git a/code/espurna/rpnrules.ino b/code/espurna/rpnrules.ino index 5e275f44..7e5907bc 100644 --- a/code/espurna/rpnrules.ino +++ b/code/espurna/rpnrules.ino @@ -307,7 +307,7 @@ void rpnSetup() { #endif StatusBroker::Register(_rpnBrokerStatus); - SensorBroker::Register(_rpnBrokerCallback); + SensorReadBroker::Register(_rpnBrokerCallback); espurnaRegisterReload(_rpnConfigure); espurnaRegisterLoop(_rpnLoop); diff --git a/code/espurna/sensor.ino b/code/espurna/sensor.ino index 9858968f..9301cb21 100644 --- a/code/espurna/sensor.ino +++ b/code/espurna/sensor.ino @@ -1543,8 +1543,8 @@ void _sensorReport(unsigned char index, double value) { char buffer[64]; dtostrf(value, 1, decimals, buffer); - #if BROKER_SUPPORT && (not BROKER_REAL_TIME) - SensorBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value, buffer); + #if BROKER_SUPPORT + SensorReportBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value, buffer); #endif #if MQTT_SUPPORT @@ -1801,12 +1801,12 @@ void sensorLoop() { // ------------------------------------------------------------- value_show = _magnitudeProcess(magnitude.type, magnitude.decimals, value_raw); - #if BROKER_SUPPORT && BROKER_REAL_TIME + #if BROKER_SUPPORT { char buffer[64]; dtostrf(value_show, 1-sizeof(buffer), magnitude.decimals, buffer); - SensorBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value_show, buffer); + SensorReadBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value_show, buffer); } #endif