Browse Source

influxdb: fix http response parsing, refactor module scope (#2153)

mcspr-patch-1
Max Prokhorov 4 years ago
committed by GitHub
parent
commit
be7ca13d76
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 66 deletions
  1. +73
    -65
      code/espurna/influxdb.ino
  2. +4
    -1
      code/espurna/libs/AsyncClientHelpers.h

+ 73
- 65
code/espurna/influxdb.ino View File

@ -8,44 +8,50 @@ Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>
#if INFLUXDB_SUPPORT #if INFLUXDB_SUPPORT
#include <ESPAsyncTCP.h>
#include <map> #include <map>
#include <memory> #include <memory>
#include "broker.h" #include "broker.h"
#include "libs/AsyncClientHelpers.h"
const char INFLUXDB_REQUEST_TEMPLATE[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n";
const char InfluxDb_http_success[] = "HTTP/1.1 204";
const char InfluxDb_http_template[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n";
constexpr const unsigned long INFLUXDB_CLIENT_TIMEOUT = 5000;
constexpr const size_t INFLUXDB_DATA_BUFFER_SIZE = 256;
class AsyncInfluxDB : public AsyncClient {
public:
bool _idb_enabled = false;
String _idb_host;
uint16_t _idb_port = 0;
constexpr static const unsigned long ClientTimeout = 5000;
constexpr static const size_t DataBufferSize = 256;
AsyncClientState state = AsyncClientState::Disconnected;
String host;
uint16_t port = 0;
std::map<String, String> _idb_values;
String _idb_data;
bool _idb_flush = false;
std::map<String, String> values;
String payload;
std::unique_ptr<AsyncClient> _idb_client = nullptr;
bool _idb_connecting = false;
bool _idb_connected = false;
bool flush = false;
uint32_t timestamp = 0;
};
uint32_t _idb_client_ts = 0;
bool _idb_enabled = false;
std::unique_ptr<AsyncInfluxDB> _idb_client = nullptr;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
void _idbInitClient() { void _idbInitClient() {
_idb_client = std::make_unique<AsyncClient>();
_idb_client = std::make_unique<AsyncInfluxDB>();
_idb_client->onDisconnect([](void * s, AsyncClient * client) {
_idb_client->payload.reserve(AsyncInfluxDB::DataBufferSize);
_idb_client->onDisconnect([](void * s, AsyncClient * ptr) {
auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n")); DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n"));
_idb_flush = false;
_idb_data = "";
_idb_client_ts = 0;
_idb_connected = false;
_idb_connecting = false;
client->flush = false;
client->payload = "";
client->timestamp = 0;
client->state = AsyncClientState::Disconnected;
}, nullptr); }, nullptr);
_idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) { _idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
@ -53,34 +59,41 @@ void _idbInitClient() {
client->close(true); client->close(true);
}, nullptr); }, nullptr);
_idb_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) {
_idb_client->onData([](void * arg, AsyncClient * ptr, void * response, size_t len) {
// ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1 // ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1
const char idb_success[] = "HTTP/1.1 204";
const bool result = (len > sizeof(idb_success) && (0 == strncmp((char*) response, idb_success, sizeof(idb_success))));
DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - _idb_client_ts);
_idb_client_ts = millis();
client->close();
auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
if (client->state == AsyncClientState::Connected) {
client->state = AsyncClientState::Disconnecting;
const bool result = (len > sizeof(InfluxDb_http_success) && (0 == strncmp((char*) response, InfluxDb_http_success, strlen(InfluxDb_http_success))));
DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - client->timestamp);
client->timestamp = millis();
client->close();
}
}, nullptr); }, nullptr);
_idb_client->onPoll([](void * arg, AsyncClient * client) {
unsigned long ts = millis() - _idb_client_ts;
if (ts > INFLUXDB_CLIENT_TIMEOUT) {
_idb_client->onPoll([](void * arg, AsyncClient * ptr) {
auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
unsigned long ts = millis() - client->timestamp;
if (ts > AsyncInfluxDB::ClientTimeout) {
DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts); DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts);
client->close(true); client->close(true);
return; return;
} }
if (_idb_data.length()) {
client->write(_idb_data.c_str(), _idb_data.length());
_idb_data = "";
if (client->payload.length()) {
client->write(client->payload.c_str(), client->payload.length());
client->payload = "";
} }
}); });
_idb_client->onConnect([](void * arg, AsyncClient * client) {
_idb_client->onConnect([](void * arg, AsyncClient * ptr) {
auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
_idb_client_ts = millis();
_idb_connected = true;
_idb_connecting = false;
client->timestamp = millis();
client->state = AsyncClientState::Connected;
DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"), DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"),
IPAddress(client->getRemoteAddress()).toString().c_str(), IPAddress(client->getRemoteAddress()).toString().c_str(),
@ -89,11 +102,11 @@ void _idbInitClient() {
constexpr const int BUFFER_SIZE = 256; constexpr const int BUFFER_SIZE = 256;
char headers[BUFFER_SIZE]; char headers[BUFFER_SIZE];
int len = snprintf_P(headers, sizeof(headers), INFLUXDB_REQUEST_TEMPLATE,
int len = snprintf_P(headers, sizeof(headers), InfluxDb_http_template,
getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(), getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), getSetting("idbUsername", INFLUXDB_USERNAME).c_str(),
getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(), getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
_idb_host.c_str(), _idb_port, _idb_data.length()
client->host.c_str(), client->port, client->payload.length()
); );
if ((len < 0) || (len > BUFFER_SIZE - 1)) { if ((len < 0) || (len > BUFFER_SIZE - 1)) {
client->close(true); client->close(true);
@ -151,27 +164,28 @@ void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value)
bool idbSend(const char * topic, const char * payload) { bool idbSend(const char * topic, const char * payload) {
if (!_idb_enabled) return false; if (!_idb_enabled) return false;
if (_idb_connecting || _idb_connected) return false;
if (_idb_client->state != AsyncClientState::Disconnected) return false;
_idb_values[topic] = payload;
_idb_flush = true;
_idb_client->values[topic] = payload;
_idb_client->flush = true;
return true; return true;
} }
void _idbSend(const String& host, const uint16_t port) { void _idbSend(const String& host, const uint16_t port) {
if (_idb_connected || _idb_connecting) return;
if (_idb_client->state != AsyncClientState::Disconnected) return;
DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port); DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port);
// TODO: cache `Host: <host>:<port>` instead of storing things separately?
_idb_host = host;
_idb_port = port;
_idb_client_ts = millis();
_idb_connecting = _idb_client->connect(host.c_str(), port);
// TODO: cache `Host: <host>:<port>` header instead of storing things separately?
_idb_client->host = host;
_idb_client->port = port;
_idb_client->timestamp = millis();
_idb_client->state = _idb_client->connect(host.c_str(), port)
? AsyncClientState::Connecting
: AsyncClientState::Disconnected;
if (!_idb_connecting) {
if (_idb_client->state == AsyncClientState::Disconnected) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port); DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port);
_idb_client->close(true); _idb_client->close(true);
} }
@ -179,31 +193,27 @@ void _idbSend(const String& host, const uint16_t port) {
void _idbFlush() { void _idbFlush() {
// Clean-up client object when not in use // Clean-up client object when not in use
if (_idb_client && !_idb_enabled && !_idb_connected && !_idb_connecting) {
if (_idb_client && !_idb_enabled && (_idb_client->state == AsyncClientState::Disconnected)) {
_idb_client = nullptr; _idb_client = nullptr;
} }
// Wait until current connection is finished // Wait until current connection is finished
if (!_idb_flush) return;
if (_idb_connected || _idb_connecting) return;
if (!_idb_client) return;
if (!_idb_client->flush) return;
if (_idb_client->state != AsyncClientState::Disconnected) return;
// Wait until connected // Wait until connected
if (!wifiConnected()) return; if (!wifiConnected()) return;
// TODO: MDNS_CLIENT_SUPPORT is deprecated
String host = getSetting("idbHost", INFLUXDB_HOST);
#if MDNS_CLIENT_SUPPORT
host = mdnsResolve(host);
#endif
const auto host = getSetting("idbHost", INFLUXDB_HOST);
const auto port = getSetting<uint16_t>("idbPort", INFLUXDB_PORT); const auto port = getSetting<uint16_t>("idbPort", INFLUXDB_PORT);
// TODO: should we always store specific pairs like tspk keeps relay / sensor readings? // TODO: should we always store specific pairs like tspk keeps relay / sensor readings?
// note that we also send heartbeat data, persistent values should be flagged // note that we also send heartbeat data, persistent values should be flagged
const String device = getSetting("hostname"); const String device = getSetting("hostname");
_idb_data = "";
for (auto& pair : _idb_values) {
_idb_client->payload = "";
for (auto& pair : _idb_client->values) {
if (!isNumber(pair.second.c_str())) { if (!isNumber(pair.second.c_str())) {
String quoted; String quoted;
quoted.reserve(pair.second.length() + 2); quoted.reserve(pair.second.length() + 2);
@ -218,9 +228,9 @@ void _idbFlush() {
PSTR("%s,device=%s value=%s\n"), PSTR("%s,device=%s value=%s\n"),
pair.first.c_str(), device.c_str(), pair.second.c_str() pair.first.c_str(), device.c_str(), pair.second.c_str()
); );
_idb_data += buffer;
_idb_client->payload += buffer;
} }
_idb_values.clear();
_idb_client->values.clear();
_idbSend(host, port); _idbSend(host, port);
} }
@ -254,8 +264,6 @@ void idbSetup() {
espurnaRegisterReload(_idbConfigure); espurnaRegisterReload(_idbConfigure);
espurnaRegisterLoop(_idbFlush); espurnaRegisterLoop(_idbFlush);
_idb_data.reserve(INFLUXDB_DATA_BUFFER_SIZE);
#if TERMINAL_SUPPORT #if TERMINAL_SUPPORT
terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) { terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) {
if (e->argc != 4) { if (e->argc != 4) {


+ 4
- 1
code/espurna/libs/AsyncClientHelpers.h View File

@ -4,10 +4,13 @@
#pragma once #pragma once
#include <ESPAsyncTCP.h>
enum class AsyncClientState { enum class AsyncClientState {
Disconnected, Disconnected,
Connecting, Connecting,
Connected
Connected,
Disconnecting
}; };

Loading…
Cancel
Save