/* INFLUXDB MODULE Copyright (C) 2017-2019 by Xose PĂ©rez */ #include "influxdb.h" #if INFLUXDB_SUPPORT #include #include #include "broker.h" #include "mqtt.h" #include "rpc.h" #include "sensor.h" #include "terminal.h" #include "ws.h" #include #include "libs/AsyncClientHelpers.h" const char InfluxDb_http_success[] = "HTTP/1.1 204"; const char InfluxDb_http_template[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n"; class AsyncInfluxDB : public AsyncClient { public: constexpr static const unsigned long ClientTimeout = 5000; constexpr static const size_t DataBufferSize = 256; AsyncClientState state = AsyncClientState::Disconnected; String host; uint16_t port = 0; std::map values; String payload; bool flush = false; uint32_t timestamp = 0; }; bool _idb_enabled = false; std::unique_ptr _idb_client = nullptr; // ----------------------------------------------------------------------------- void _idbInitClient() { _idb_client = std::make_unique(); _idb_client->payload.reserve(AsyncInfluxDB::DataBufferSize); _idb_client->onDisconnect([](void * s, AsyncClient * ptr) { auto *client = reinterpret_cast(ptr); DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n")); client->flush = false; client->payload = ""; client->timestamp = 0; client->state = AsyncClientState::Disconnected; }, nullptr); _idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) { DEBUG_MSG_P(PSTR("[INFLUXDB] Network timeout after %ums\n"), time); client->close(true); }, nullptr); _idb_client->onData([](void * arg, AsyncClient * ptr, void * response, size_t len) { // ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1 auto *client = reinterpret_cast(ptr); if (client->state == AsyncClientState::Connected) { client->state = AsyncClientState::Disconnecting; const bool result = (len > sizeof(InfluxDb_http_success) && (0 == strncmp((char*) response, InfluxDb_http_success, strlen(InfluxDb_http_success)))); DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - client->timestamp); client->timestamp = millis(); client->close(); } }, nullptr); _idb_client->onPoll([](void * arg, AsyncClient * ptr) { auto *client = reinterpret_cast(ptr); unsigned long ts = millis() - client->timestamp; if (ts > AsyncInfluxDB::ClientTimeout) { DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts); client->close(true); return; } if (client->payload.length()) { client->write(client->payload.c_str(), client->payload.length()); client->payload = ""; } }); _idb_client->onConnect([](void * arg, AsyncClient * ptr) { auto *client = reinterpret_cast(ptr); client->timestamp = millis(); client->state = AsyncClientState::Connected; DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"), IPAddress(client->getRemoteAddress()).toString().c_str(), client->getRemotePort() ); constexpr const int BUFFER_SIZE = 256; char headers[BUFFER_SIZE]; int len = snprintf_P(headers, sizeof(headers), InfluxDb_http_template, getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(), getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(), client->host.c_str(), client->port, client->payload.length() ); if ((len < 0) || (len > BUFFER_SIZE - 1)) { client->close(true); return; } client->write(headers, len); }); } // ----------------------------------------------------------------------------- bool _idbWebSocketOnKeyCheck(const char * key, JsonVariant& value) { return (strncmp(key, "idb", 3) == 0); } void _idbWebSocketOnVisible(JsonObject& root) { root["idbVisible"] = 1; } void _idbWebSocketOnConnected(JsonObject& root) { root["idbEnabled"] = getSetting("idbEnabled", 1 == INFLUXDB_ENABLED); root["idbHost"] = getSetting("idbHost", INFLUXDB_HOST); root["idbPort"] = getSetting("idbPort", INFLUXDB_PORT); root["idbDatabase"] = getSetting("idbDatabase", INFLUXDB_DATABASE); root["idbUsername"] = getSetting("idbUsername", INFLUXDB_USERNAME); root["idbPassword"] = getSetting("idbPassword", INFLUXDB_PASSWORD); } void _idbConfigure() { _idb_enabled = getSetting("idbEnabled", 1 == INFLUXDB_ENABLED); if (_idb_enabled && (getSetting("idbHost", INFLUXDB_HOST).length() == 0)) { _idb_enabled = false; setSetting("idbEnabled", 0); } if (_idb_enabled && !_idb_client) _idbInitClient(); } void _idbBrokerSensor(const String& topic, unsigned char id, double, const char* value) { idbSend(topic.c_str(), id, value); } void _idbSendStatus(size_t id, bool status) { idbSend(MQTT_TOPIC_RELAY, id, status ? "1" : "0"); // "status" ? } // ----------------------------------------------------------------------------- bool idbSend(const char * topic, const char * payload) { if (!_idb_enabled) return false; if (_idb_client->state != AsyncClientState::Disconnected) return false; _idb_client->values[topic] = payload; _idb_client->flush = true; return true; } void _idbSend(const String& host, const uint16_t port) { if (_idb_client->state != AsyncClientState::Disconnected) return; DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port); // TODO: cache `Host: :` header instead of storing things separately? _idb_client->host = host; _idb_client->port = port; _idb_client->timestamp = millis(); _idb_client->state = _idb_client->connect(host.c_str(), port) ? AsyncClientState::Connecting : AsyncClientState::Disconnected; if (_idb_client->state == AsyncClientState::Disconnected) { DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port); _idb_client->close(true); } } void _idbFlush() { // Clean-up client object when not in use if (_idb_client && !_idb_enabled && (_idb_client->state == AsyncClientState::Disconnected)) { _idb_client = nullptr; } // Wait until current connection is finished if (!_idb_client) return; if (!_idb_client->flush) return; if (_idb_client->state != AsyncClientState::Disconnected) return; // Wait until connected if (!wifiConnected()) return; const auto host = getSetting("idbHost", INFLUXDB_HOST); const auto port = getSetting("idbPort", static_cast(INFLUXDB_PORT)); // TODO: should we always store specific pairs like tspk keeps relay / sensor readings? // note that we also send heartbeat data, persistent values should be flagged const String device = getSetting("hostname"); _idb_client->payload = ""; for (auto& pair : _idb_client->values) { if (!isNumber(pair.second)) { String quoted; quoted.reserve(pair.second.length() + 2); quoted += '"'; quoted += pair.second; quoted += '"'; pair.second = quoted; } char buffer[128] = {0}; snprintf_P(buffer, sizeof(buffer), PSTR("%s,device=%s value=%s\n"), pair.first.c_str(), device.c_str(), pair.second.c_str() ); _idb_client->payload += buffer; } _idb_client->values.clear(); _idbSend(host, port); } bool idbSend(const char * topic, unsigned char id, const char * payload) { char measurement[64]; snprintf(measurement, sizeof(measurement), "%s,id=%d", topic, id); return idbSend(measurement, payload); } bool idbEnabled() { return _idb_enabled; } bool _idbHeartbeat(heartbeat::Mask mask) { if (mask & heartbeat::Report::Uptime) idbSend(MQTT_TOPIC_UPTIME, String(systemUptime()).c_str()); if (mask & heartbeat::Report::Freeheap) { auto stats = systemHeapStats(); idbSend(MQTT_TOPIC_FREEHEAP, String(stats.available).c_str()); } if (mask & heartbeat::Report::Rssi) idbSend(MQTT_TOPIC_RSSI, String(WiFi.RSSI()).c_str()); if ((mask & heartbeat::Report::Vcc) && (ADC_MODE_VALUE == ADC_VCC)) idbSend(MQTT_TOPIC_VCC, String(ESP.getVcc()).c_str()); if (mask & heartbeat::Report::Loadavg) idbSend(MQTT_TOPIC_LOADAVG, String(systemLoadAverage()).c_str()); if (mask & heartbeat::Report::Ssid) idbSend(MQTT_TOPIC_SSID, WiFi.SSID().c_str()); if (mask & heartbeat::Report::Bssid) idbSend(MQTT_TOPIC_BSSID, WiFi.BSSIDstr().c_str()); return true; } void idbSetup() { systemHeartbeat(_idbHeartbeat); systemHeartbeat(_idbHeartbeat, getSetting("idbHbMode", heartbeat::currentMode()), getSetting("idbHbIntvl", heartbeat::currentInterval())); _idbConfigure(); #if WEB_SUPPORT wsRegister() .onVisible(_idbWebSocketOnVisible) .onConnected(_idbWebSocketOnConnected) .onKeyCheck(_idbWebSocketOnKeyCheck); #endif #if RELAY_SUPPORT relaySetStatusChange(_idbSendStatus); #endif #if SENSOR_SUPPORT SensorReportBroker::Register(_idbBrokerSensor); #endif espurnaRegisterReload(_idbConfigure); espurnaRegisterLoop(_idbFlush); #if TERMINAL_SUPPORT terminalRegisterCommand(F("IDB.SEND"), [](const terminal::CommandContext& ctx) { if (ctx.argc != 4) { terminalError(F("idb.send ")); return; } idbSend(ctx.argv[1].c_str(), ctx.argv[2].toInt(), ctx.argv[3].c_str()); }); #endif } #endif