Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
8.8 KiB

  1. /*
  2. INFLUXDB MODULE
  3. Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. */
  5. #if INFLUXDB_SUPPORT
  6. #include <map>
  7. #include <memory>
  8. #include "broker.h"
  9. #include "libs/AsyncClientHelpers.h"
  10. const char InfluxDb_http_success[] = "HTTP/1.1 204";
  11. const char InfluxDb_http_template[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n";
  12. class AsyncInfluxDB : public AsyncClient {
  13. public:
  14. constexpr static const unsigned long ClientTimeout = 5000;
  15. constexpr static const size_t DataBufferSize = 256;
  16. AsyncClientState state = AsyncClientState::Disconnected;
  17. String host;
  18. uint16_t port = 0;
  19. std::map<String, String> values;
  20. String payload;
  21. bool flush = false;
  22. uint32_t timestamp = 0;
  23. };
  24. bool _idb_enabled = false;
  25. std::unique_ptr<AsyncInfluxDB> _idb_client = nullptr;
  26. // -----------------------------------------------------------------------------
  27. void _idbInitClient() {
  28. _idb_client = std::make_unique<AsyncInfluxDB>();
  29. _idb_client->payload.reserve(AsyncInfluxDB::DataBufferSize);
  30. _idb_client->onDisconnect([](void * s, AsyncClient * ptr) {
  31. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  32. DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n"));
  33. client->flush = false;
  34. client->payload = "";
  35. client->timestamp = 0;
  36. client->state = AsyncClientState::Disconnected;
  37. }, nullptr);
  38. _idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
  39. DEBUG_MSG_P(PSTR("[INFLUXDB] Network timeout after %ums\n"), time);
  40. client->close(true);
  41. }, nullptr);
  42. _idb_client->onData([](void * arg, AsyncClient * ptr, void * response, size_t len) {
  43. // ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1
  44. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  45. if (client->state == AsyncClientState::Connected) {
  46. client->state = AsyncClientState::Disconnecting;
  47. const bool result = (len > sizeof(InfluxDb_http_success) && (0 == strncmp((char*) response, InfluxDb_http_success, strlen(InfluxDb_http_success))));
  48. DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - client->timestamp);
  49. client->timestamp = millis();
  50. client->close();
  51. }
  52. }, nullptr);
  53. _idb_client->onPoll([](void * arg, AsyncClient * ptr) {
  54. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  55. unsigned long ts = millis() - client->timestamp;
  56. if (ts > AsyncInfluxDB::ClientTimeout) {
  57. DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts);
  58. client->close(true);
  59. return;
  60. }
  61. if (client->payload.length()) {
  62. client->write(client->payload.c_str(), client->payload.length());
  63. client->payload = "";
  64. }
  65. });
  66. _idb_client->onConnect([](void * arg, AsyncClient * ptr) {
  67. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  68. client->timestamp = millis();
  69. client->state = AsyncClientState::Connected;
  70. DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"),
  71. IPAddress(client->getRemoteAddress()).toString().c_str(),
  72. client->getRemotePort()
  73. );
  74. constexpr const int BUFFER_SIZE = 256;
  75. char headers[BUFFER_SIZE];
  76. int len = snprintf_P(headers, sizeof(headers), InfluxDb_http_template,
  77. getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
  78. getSetting("idbUsername", INFLUXDB_USERNAME).c_str(),
  79. getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
  80. client->host.c_str(), client->port, client->payload.length()
  81. );
  82. if ((len < 0) || (len > BUFFER_SIZE - 1)) {
  83. client->close(true);
  84. return;
  85. }
  86. client->write(headers, len);
  87. });
  88. }
  89. // -----------------------------------------------------------------------------
  90. bool _idbWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  91. return (strncmp(key, "idb", 3) == 0);
  92. }
  93. void _idbWebSocketOnVisible(JsonObject& root) {
  94. root["idbVisible"] = 1;
  95. }
  96. void _idbWebSocketOnConnected(JsonObject& root) {
  97. root["idbEnabled"] = getSetting("idbEnabled", 1 == INFLUXDB_ENABLED);
  98. root["idbHost"] = getSetting("idbHost", INFLUXDB_HOST);
  99. root["idbPort"] = getSetting("idbPort", INFLUXDB_PORT);
  100. root["idbDatabase"] = getSetting("idbDatabase", INFLUXDB_DATABASE);
  101. root["idbUsername"] = getSetting("idbUsername", INFLUXDB_USERNAME);
  102. root["idbPassword"] = getSetting("idbPassword", INFLUXDB_PASSWORD);
  103. }
  104. void _idbConfigure() {
  105. _idb_enabled = getSetting("idbEnabled", 1 == INFLUXDB_ENABLED);
  106. if (_idb_enabled && (getSetting("idbHost", INFLUXDB_HOST).length() == 0)) {
  107. _idb_enabled = false;
  108. setSetting("idbEnabled", 0);
  109. }
  110. if (_idb_enabled && !_idb_client) _idbInitClient();
  111. }
  112. #if BROKER_SUPPORT
  113. void _idbBrokerSensor(const String& topic, unsigned char id, double, const char* value) {
  114. idbSend(topic.c_str(), id, value);
  115. }
  116. void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value) {
  117. idbSend(topic.c_str(), id, String(int(value)).c_str());
  118. }
  119. #endif // BROKER_SUPPORT
  120. // -----------------------------------------------------------------------------
  121. bool idbSend(const char * topic, const char * payload) {
  122. if (!_idb_enabled) return false;
  123. if (_idb_client->state != AsyncClientState::Disconnected) return false;
  124. _idb_client->values[topic] = payload;
  125. _idb_client->flush = true;
  126. return true;
  127. }
  128. void _idbSend(const String& host, const uint16_t port) {
  129. if (_idb_client->state != AsyncClientState::Disconnected) return;
  130. DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port);
  131. // TODO: cache `Host: <host>:<port>` header instead of storing things separately?
  132. _idb_client->host = host;
  133. _idb_client->port = port;
  134. _idb_client->timestamp = millis();
  135. _idb_client->state = _idb_client->connect(host.c_str(), port)
  136. ? AsyncClientState::Connecting
  137. : AsyncClientState::Disconnected;
  138. if (_idb_client->state == AsyncClientState::Disconnected) {
  139. DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port);
  140. _idb_client->close(true);
  141. }
  142. }
  143. void _idbFlush() {
  144. // Clean-up client object when not in use
  145. if (_idb_client && !_idb_enabled && (_idb_client->state == AsyncClientState::Disconnected)) {
  146. _idb_client = nullptr;
  147. }
  148. // Wait until current connection is finished
  149. if (!_idb_client) return;
  150. if (!_idb_client->flush) return;
  151. if (_idb_client->state != AsyncClientState::Disconnected) return;
  152. // Wait until connected
  153. if (!wifiConnected()) return;
  154. const auto host = getSetting("idbHost", INFLUXDB_HOST);
  155. const auto port = getSetting<uint16_t>("idbPort", INFLUXDB_PORT);
  156. // TODO: should we always store specific pairs like tspk keeps relay / sensor readings?
  157. // note that we also send heartbeat data, persistent values should be flagged
  158. const String device = getSetting("hostname");
  159. _idb_client->payload = "";
  160. for (auto& pair : _idb_client->values) {
  161. if (!isNumber(pair.second.c_str())) {
  162. String quoted;
  163. quoted.reserve(pair.second.length() + 2);
  164. quoted += '"';
  165. quoted += pair.second;
  166. quoted += '"';
  167. pair.second = quoted;
  168. }
  169. char buffer[128] = {0};
  170. snprintf_P(buffer, sizeof(buffer),
  171. PSTR("%s,device=%s value=%s\n"),
  172. pair.first.c_str(), device.c_str(), pair.second.c_str()
  173. );
  174. _idb_client->payload += buffer;
  175. }
  176. _idb_client->values.clear();
  177. _idbSend(host, port);
  178. }
  179. bool idbSend(const char * topic, unsigned char id, const char * payload) {
  180. char measurement[64];
  181. snprintf(measurement, sizeof(measurement), "%s,id=%d", topic, id);
  182. return idbSend(measurement, payload);
  183. }
  184. bool idbEnabled() {
  185. return _idb_enabled;
  186. }
  187. void idbSetup() {
  188. _idbConfigure();
  189. #if WEB_SUPPORT
  190. wsRegister()
  191. .onVisible(_idbWebSocketOnVisible)
  192. .onConnected(_idbWebSocketOnConnected)
  193. .onKeyCheck(_idbWebSocketOnKeyCheck);
  194. #endif
  195. #if BROKER_SUPPORT
  196. StatusBroker::Register(_idbBrokerStatus);
  197. SensorReportBroker::Register(_idbBrokerSensor);
  198. #endif
  199. espurnaRegisterReload(_idbConfigure);
  200. espurnaRegisterLoop(_idbFlush);
  201. #if TERMINAL_SUPPORT
  202. terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) {
  203. if (e->argc != 4) {
  204. terminalError(F("idb.send <topic> <id> <value>"));
  205. return;
  206. }
  207. const String topic = e->argv[1];
  208. const auto id = atoi(e->argv[2]);
  209. const String value = e->argv[3];
  210. idbSend(topic.c_str(), id, value.c_str());
  211. });
  212. #endif
  213. }
  214. #endif