Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
8.8 KiB

Terminal: change command-line parser (#2247) Change the underlying command line handling: - switch to a custom parser, inspired by redis / sds - update terminalRegisterCommand signature, pass only bare minimum - clean-up `help` & `commands`. update settings `set`, `get` and `del` - allow our custom test suite to run command-line tests - clean-up Stream IO to allow us to print large things into debug stream (for example, `eeprom.dump`) - send parsing errors to the debug log As a proof of concept, introduce `TERMINAL_MQTT_SUPPORT` and `TERMINAL_WEB_API_SUPPORT` - MQTT subscribes to the `<root>/cmd/set` and sends response to the `<root>/cmd`. We can't output too much, as we don't have any large-send API. - Web API listens to the `/api/cmd?apikey=...&line=...` (or PUT, params inside the body). This one is intended as a possible replacement of the `API_SUPPORT`. Internals introduce a 'task' around the AsyncWebServerRequest object that will simulate what WiFiClient does and push data into it continuously, switching between CONT and SYS. Both are experimental. We only accept a single command and not every command is updated to use Print `ctx.output` object. We are also somewhat limited by the Print / Stream overall, perhaps I am overestimating the usefulness of Arduino compatibility to such an extent :) Web API handler can also sometimes show only part of the result, whenever the command tries to yield() by itself waiting for something. Perhaps we would need to create a custom request handler for that specific use-case.
4 years ago
Terminal: change command-line parser (#2247) Change the underlying command line handling: - switch to a custom parser, inspired by redis / sds - update terminalRegisterCommand signature, pass only bare minimum - clean-up `help` & `commands`. update settings `set`, `get` and `del` - allow our custom test suite to run command-line tests - clean-up Stream IO to allow us to print large things into debug stream (for example, `eeprom.dump`) - send parsing errors to the debug log As a proof of concept, introduce `TERMINAL_MQTT_SUPPORT` and `TERMINAL_WEB_API_SUPPORT` - MQTT subscribes to the `<root>/cmd/set` and sends response to the `<root>/cmd`. We can't output too much, as we don't have any large-send API. - Web API listens to the `/api/cmd?apikey=...&line=...` (or PUT, params inside the body). This one is intended as a possible replacement of the `API_SUPPORT`. Internals introduce a 'task' around the AsyncWebServerRequest object that will simulate what WiFiClient does and push data into it continuously, switching between CONT and SYS. Both are experimental. We only accept a single command and not every command is updated to use Print `ctx.output` object. We are also somewhat limited by the Print / Stream overall, perhaps I am overestimating the usefulness of Arduino compatibility to such an extent :) Web API handler can also sometimes show only part of the result, whenever the command tries to yield() by itself waiting for something. Perhaps we would need to create a custom request handler for that specific use-case.
4 years ago
  1. /*
  2. INFLUXDB MODULE
  3. Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>
  4. */
  5. #include "influxdb.h"
  6. #if INFLUXDB_SUPPORT
  7. #include <map>
  8. #include <memory>
  9. #include "broker.h"
  10. #include "rpc.h"
  11. #include "sensor.h"
  12. #include "terminal.h"
  13. #include "ws.h"
  14. #include "libs/AsyncClientHelpers.h"
  15. #include <ESPAsyncTCP.h>
  16. const char InfluxDb_http_success[] = "HTTP/1.1 204";
  17. const char InfluxDb_http_template[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n";
  18. class AsyncInfluxDB : public AsyncClient {
  19. public:
  20. constexpr static const unsigned long ClientTimeout = 5000;
  21. constexpr static const size_t DataBufferSize = 256;
  22. AsyncClientState state = AsyncClientState::Disconnected;
  23. String host;
  24. uint16_t port = 0;
  25. std::map<String, String> values;
  26. String payload;
  27. bool flush = false;
  28. uint32_t timestamp = 0;
  29. };
  30. bool _idb_enabled = false;
  31. std::unique_ptr<AsyncInfluxDB> _idb_client = nullptr;
  32. // -----------------------------------------------------------------------------
  33. void _idbInitClient() {
  34. _idb_client = std::make_unique<AsyncInfluxDB>();
  35. _idb_client->payload.reserve(AsyncInfluxDB::DataBufferSize);
  36. _idb_client->onDisconnect([](void * s, AsyncClient * ptr) {
  37. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  38. DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n"));
  39. client->flush = false;
  40. client->payload = "";
  41. client->timestamp = 0;
  42. client->state = AsyncClientState::Disconnected;
  43. }, nullptr);
  44. _idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
  45. DEBUG_MSG_P(PSTR("[INFLUXDB] Network timeout after %ums\n"), time);
  46. client->close(true);
  47. }, nullptr);
  48. _idb_client->onData([](void * arg, AsyncClient * ptr, void * response, size_t len) {
  49. // ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1
  50. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  51. if (client->state == AsyncClientState::Connected) {
  52. client->state = AsyncClientState::Disconnecting;
  53. const bool result = (len > sizeof(InfluxDb_http_success) && (0 == strncmp((char*) response, InfluxDb_http_success, strlen(InfluxDb_http_success))));
  54. DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - client->timestamp);
  55. client->timestamp = millis();
  56. client->close();
  57. }
  58. }, nullptr);
  59. _idb_client->onPoll([](void * arg, AsyncClient * ptr) {
  60. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  61. unsigned long ts = millis() - client->timestamp;
  62. if (ts > AsyncInfluxDB::ClientTimeout) {
  63. DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts);
  64. client->close(true);
  65. return;
  66. }
  67. if (client->payload.length()) {
  68. client->write(client->payload.c_str(), client->payload.length());
  69. client->payload = "";
  70. }
  71. });
  72. _idb_client->onConnect([](void * arg, AsyncClient * ptr) {
  73. auto *client = reinterpret_cast<AsyncInfluxDB*>(ptr);
  74. client->timestamp = millis();
  75. client->state = AsyncClientState::Connected;
  76. DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"),
  77. IPAddress(client->getRemoteAddress()).toString().c_str(),
  78. client->getRemotePort()
  79. );
  80. constexpr const int BUFFER_SIZE = 256;
  81. char headers[BUFFER_SIZE];
  82. int len = snprintf_P(headers, sizeof(headers), InfluxDb_http_template,
  83. getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
  84. getSetting("idbUsername", INFLUXDB_USERNAME).c_str(),
  85. getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
  86. client->host.c_str(), client->port, client->payload.length()
  87. );
  88. if ((len < 0) || (len > BUFFER_SIZE - 1)) {
  89. client->close(true);
  90. return;
  91. }
  92. client->write(headers, len);
  93. });
  94. }
  95. // -----------------------------------------------------------------------------
  96. bool _idbWebSocketOnKeyCheck(const char * key, JsonVariant& value) {
  97. return (strncmp(key, "idb", 3) == 0);
  98. }
  99. void _idbWebSocketOnVisible(JsonObject& root) {
  100. root["idbVisible"] = 1;
  101. }
  102. void _idbWebSocketOnConnected(JsonObject& root) {
  103. root["idbEnabled"] = getSetting("idbEnabled", 1 == INFLUXDB_ENABLED);
  104. root["idbHost"] = getSetting("idbHost", INFLUXDB_HOST);
  105. root["idbPort"] = getSetting("idbPort", INFLUXDB_PORT);
  106. root["idbDatabase"] = getSetting("idbDatabase", INFLUXDB_DATABASE);
  107. root["idbUsername"] = getSetting("idbUsername", INFLUXDB_USERNAME);
  108. root["idbPassword"] = getSetting("idbPassword", INFLUXDB_PASSWORD);
  109. }
  110. void _idbConfigure() {
  111. _idb_enabled = getSetting("idbEnabled", 1 == INFLUXDB_ENABLED);
  112. if (_idb_enabled && (getSetting("idbHost", INFLUXDB_HOST).length() == 0)) {
  113. _idb_enabled = false;
  114. setSetting("idbEnabled", 0);
  115. }
  116. if (_idb_enabled && !_idb_client) _idbInitClient();
  117. }
  118. void _idbBrokerSensor(const String& topic, unsigned char id, double, const char* value) {
  119. idbSend(topic.c_str(), id, value);
  120. }
  121. void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value) {
  122. idbSend(topic.c_str(), id, String(int(value)).c_str());
  123. }
  124. // -----------------------------------------------------------------------------
  125. bool idbSend(const char * topic, const char * payload) {
  126. if (!_idb_enabled) return false;
  127. if (_idb_client->state != AsyncClientState::Disconnected) return false;
  128. _idb_client->values[topic] = payload;
  129. _idb_client->flush = true;
  130. return true;
  131. }
  132. void _idbSend(const String& host, const uint16_t port) {
  133. if (_idb_client->state != AsyncClientState::Disconnected) return;
  134. DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port);
  135. // TODO: cache `Host: <host>:<port>` header instead of storing things separately?
  136. _idb_client->host = host;
  137. _idb_client->port = port;
  138. _idb_client->timestamp = millis();
  139. _idb_client->state = _idb_client->connect(host.c_str(), port)
  140. ? AsyncClientState::Connecting
  141. : AsyncClientState::Disconnected;
  142. if (_idb_client->state == AsyncClientState::Disconnected) {
  143. DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port);
  144. _idb_client->close(true);
  145. }
  146. }
  147. void _idbFlush() {
  148. // Clean-up client object when not in use
  149. if (_idb_client && !_idb_enabled && (_idb_client->state == AsyncClientState::Disconnected)) {
  150. _idb_client = nullptr;
  151. }
  152. // Wait until current connection is finished
  153. if (!_idb_client) return;
  154. if (!_idb_client->flush) return;
  155. if (_idb_client->state != AsyncClientState::Disconnected) return;
  156. // Wait until connected
  157. if (!wifiConnected()) return;
  158. const auto host = getSetting("idbHost", INFLUXDB_HOST);
  159. const auto port = getSetting("idbPort", static_cast<uint16_t>(INFLUXDB_PORT));
  160. // TODO: should we always store specific pairs like tspk keeps relay / sensor readings?
  161. // note that we also send heartbeat data, persistent values should be flagged
  162. const String device = getSetting("hostname");
  163. _idb_client->payload = "";
  164. for (auto& pair : _idb_client->values) {
  165. if (!isNumber(pair.second.c_str())) {
  166. String quoted;
  167. quoted.reserve(pair.second.length() + 2);
  168. quoted += '"';
  169. quoted += pair.second;
  170. quoted += '"';
  171. pair.second = quoted;
  172. }
  173. char buffer[128] = {0};
  174. snprintf_P(buffer, sizeof(buffer),
  175. PSTR("%s,device=%s value=%s\n"),
  176. pair.first.c_str(), device.c_str(), pair.second.c_str()
  177. );
  178. _idb_client->payload += buffer;
  179. }
  180. _idb_client->values.clear();
  181. _idbSend(host, port);
  182. }
  183. bool idbSend(const char * topic, unsigned char id, const char * payload) {
  184. char measurement[64];
  185. snprintf(measurement, sizeof(measurement), "%s,id=%d", topic, id);
  186. return idbSend(measurement, payload);
  187. }
  188. bool idbEnabled() {
  189. return _idb_enabled;
  190. }
  191. void idbSetup() {
  192. _idbConfigure();
  193. #if WEB_SUPPORT
  194. wsRegister()
  195. .onVisible(_idbWebSocketOnVisible)
  196. .onConnected(_idbWebSocketOnConnected)
  197. .onKeyCheck(_idbWebSocketOnKeyCheck);
  198. #endif
  199. StatusBroker::Register(_idbBrokerStatus);
  200. #if SENSOR_SUPPORT
  201. SensorReportBroker::Register(_idbBrokerSensor);
  202. #endif
  203. espurnaRegisterReload(_idbConfigure);
  204. espurnaRegisterLoop(_idbFlush);
  205. #if TERMINAL_SUPPORT
  206. terminalRegisterCommand(F("IDB.SEND"), [](const terminal::CommandContext& ctx) {
  207. if (ctx.argc != 4) {
  208. terminalError(F("idb.send <topic> <id> <value>"));
  209. return;
  210. }
  211. idbSend(ctx.argv[1].c_str(), ctx.argv[2].toInt(), ctx.argv[3].c_str());
  212. });
  213. #endif
  214. }
  215. #endif