From be7ca13d769f809e6475da5d6dbecd2ef8fc4724 Mon Sep 17 00:00:00 2001 From: Max Prokhorov Date: Tue, 18 Feb 2020 12:16:13 +0300 Subject: [PATCH] influxdb: fix http response parsing, refactor module scope (#2153) --- code/espurna/influxdb.ino | 138 +++++++++++++------------ code/espurna/libs/AsyncClientHelpers.h | 5 +- 2 files changed, 77 insertions(+), 66 deletions(-) diff --git a/code/espurna/influxdb.ino b/code/espurna/influxdb.ino index a82f85bc..25f67277 100644 --- a/code/espurna/influxdb.ino +++ b/code/espurna/influxdb.ino @@ -8,44 +8,50 @@ Copyright (C) 2017-2019 by Xose PĂ©rez #if INFLUXDB_SUPPORT -#include #include #include #include "broker.h" +#include "libs/AsyncClientHelpers.h" -const char INFLUXDB_REQUEST_TEMPLATE[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n"; +const char InfluxDb_http_success[] = "HTTP/1.1 204"; +const char InfluxDb_http_template[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n"; -constexpr const unsigned long INFLUXDB_CLIENT_TIMEOUT = 5000; -constexpr const size_t INFLUXDB_DATA_BUFFER_SIZE = 256; +class AsyncInfluxDB : public AsyncClient { + public: -bool _idb_enabled = false; -String _idb_host; -uint16_t _idb_port = 0; + constexpr static const unsigned long ClientTimeout = 5000; + constexpr static const size_t DataBufferSize = 256; + + AsyncClientState state = AsyncClientState::Disconnected; + String host; + uint16_t port = 0; -std::map _idb_values; -String _idb_data; -bool _idb_flush = false; + std::map values; + String payload; -std::unique_ptr _idb_client = nullptr; -bool _idb_connecting = false; -bool _idb_connected = false; + bool flush = false; + uint32_t timestamp = 0; +}; -uint32_t _idb_client_ts = 0; +bool _idb_enabled = false; +std::unique_ptr _idb_client = nullptr; // ----------------------------------------------------------------------------- void _idbInitClient() { - _idb_client = std::make_unique(); + _idb_client = std::make_unique(); - _idb_client->onDisconnect([](void * s, AsyncClient * client) { + _idb_client->payload.reserve(AsyncInfluxDB::DataBufferSize); + + _idb_client->onDisconnect([](void * s, AsyncClient * ptr) { + auto *client = reinterpret_cast(ptr); DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n")); - _idb_flush = false; - _idb_data = ""; - _idb_client_ts = 0; - _idb_connected = false; - _idb_connecting = false; + client->flush = false; + client->payload = ""; + client->timestamp = 0; + client->state = AsyncClientState::Disconnected; }, nullptr); _idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) { @@ -53,34 +59,41 @@ void _idbInitClient() { client->close(true); }, nullptr); - _idb_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) { + _idb_client->onData([](void * arg, AsyncClient * ptr, void * response, size_t len) { // ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1 - const char idb_success[] = "HTTP/1.1 204"; - const bool result = (len > sizeof(idb_success) && (0 == strncmp((char*) response, idb_success, sizeof(idb_success)))); - DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - _idb_client_ts); - _idb_client_ts = millis(); - client->close(); + auto *client = reinterpret_cast(ptr); + if (client->state == AsyncClientState::Connected) { + client->state = AsyncClientState::Disconnecting; + + const bool result = (len > sizeof(InfluxDb_http_success) && (0 == strncmp((char*) response, InfluxDb_http_success, strlen(InfluxDb_http_success)))); + DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - client->timestamp); + + client->timestamp = millis(); + client->close(); + } }, nullptr); - _idb_client->onPoll([](void * arg, AsyncClient * client) { - unsigned long ts = millis() - _idb_client_ts; - if (ts > INFLUXDB_CLIENT_TIMEOUT) { + _idb_client->onPoll([](void * arg, AsyncClient * ptr) { + auto *client = reinterpret_cast(ptr); + unsigned long ts = millis() - client->timestamp; + if (ts > AsyncInfluxDB::ClientTimeout) { DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts); client->close(true); return; } - if (_idb_data.length()) { - client->write(_idb_data.c_str(), _idb_data.length()); - _idb_data = ""; + if (client->payload.length()) { + client->write(client->payload.c_str(), client->payload.length()); + client->payload = ""; } }); - _idb_client->onConnect([](void * arg, AsyncClient * client) { + _idb_client->onConnect([](void * arg, AsyncClient * ptr) { + + auto *client = reinterpret_cast(ptr); - _idb_client_ts = millis(); - _idb_connected = true; - _idb_connecting = false; + client->timestamp = millis(); + client->state = AsyncClientState::Connected; DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"), IPAddress(client->getRemoteAddress()).toString().c_str(), @@ -89,11 +102,11 @@ void _idbInitClient() { constexpr const int BUFFER_SIZE = 256; char headers[BUFFER_SIZE]; - int len = snprintf_P(headers, sizeof(headers), INFLUXDB_REQUEST_TEMPLATE, + int len = snprintf_P(headers, sizeof(headers), InfluxDb_http_template, getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(), getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(), - _idb_host.c_str(), _idb_port, _idb_data.length() + client->host.c_str(), client->port, client->payload.length() ); if ((len < 0) || (len > BUFFER_SIZE - 1)) { client->close(true); @@ -151,27 +164,28 @@ void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value) bool idbSend(const char * topic, const char * payload) { if (!_idb_enabled) return false; - if (_idb_connecting || _idb_connected) return false; + if (_idb_client->state != AsyncClientState::Disconnected) return false; - _idb_values[topic] = payload; - _idb_flush = true; + _idb_client->values[topic] = payload; + _idb_client->flush = true; return true; } void _idbSend(const String& host, const uint16_t port) { - if (_idb_connected || _idb_connecting) return; + if (_idb_client->state != AsyncClientState::Disconnected) return; DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port); - // TODO: cache `Host: :` instead of storing things separately? - _idb_host = host; - _idb_port = port; - - _idb_client_ts = millis(); - _idb_connecting = _idb_client->connect(host.c_str(), port); + // TODO: cache `Host: :` header instead of storing things separately? + _idb_client->host = host; + _idb_client->port = port; + _idb_client->timestamp = millis(); + _idb_client->state = _idb_client->connect(host.c_str(), port) + ? AsyncClientState::Connecting + : AsyncClientState::Disconnected; - if (!_idb_connecting) { + if (_idb_client->state == AsyncClientState::Disconnected) { DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port); _idb_client->close(true); } @@ -179,31 +193,27 @@ void _idbSend(const String& host, const uint16_t port) { void _idbFlush() { // Clean-up client object when not in use - if (_idb_client && !_idb_enabled && !_idb_connected && !_idb_connecting) { + if (_idb_client && !_idb_enabled && (_idb_client->state == AsyncClientState::Disconnected)) { _idb_client = nullptr; } // Wait until current connection is finished - if (!_idb_flush) return; - if (_idb_connected || _idb_connecting) return; + if (!_idb_client) return; + if (!_idb_client->flush) return; + if (_idb_client->state != AsyncClientState::Disconnected) return; // Wait until connected if (!wifiConnected()) return; - // TODO: MDNS_CLIENT_SUPPORT is deprecated - String host = getSetting("idbHost", INFLUXDB_HOST); - #if MDNS_CLIENT_SUPPORT - host = mdnsResolve(host); - #endif - + const auto host = getSetting("idbHost", INFLUXDB_HOST); const auto port = getSetting("idbPort", INFLUXDB_PORT); // TODO: should we always store specific pairs like tspk keeps relay / sensor readings? // note that we also send heartbeat data, persistent values should be flagged const String device = getSetting("hostname"); - _idb_data = ""; - for (auto& pair : _idb_values) { + _idb_client->payload = ""; + for (auto& pair : _idb_client->values) { if (!isNumber(pair.second.c_str())) { String quoted; quoted.reserve(pair.second.length() + 2); @@ -218,9 +228,9 @@ void _idbFlush() { PSTR("%s,device=%s value=%s\n"), pair.first.c_str(), device.c_str(), pair.second.c_str() ); - _idb_data += buffer; + _idb_client->payload += buffer; } - _idb_values.clear(); + _idb_client->values.clear(); _idbSend(host, port); } @@ -254,8 +264,6 @@ void idbSetup() { espurnaRegisterReload(_idbConfigure); espurnaRegisterLoop(_idbFlush); - _idb_data.reserve(INFLUXDB_DATA_BUFFER_SIZE); - #if TERMINAL_SUPPORT terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) { if (e->argc != 4) { diff --git a/code/espurna/libs/AsyncClientHelpers.h b/code/espurna/libs/AsyncClientHelpers.h index 446d59a8..ed732ffc 100644 --- a/code/espurna/libs/AsyncClientHelpers.h +++ b/code/espurna/libs/AsyncClientHelpers.h @@ -4,10 +4,13 @@ #pragma once +#include + enum class AsyncClientState { Disconnected, Connecting, - Connected + Connected, + Disconnecting };