From bd0075816202960ea032b53f44864692fb1834ed Mon Sep 17 00:00:00 2001 From: Max Prokhorov Date: Sun, 7 Jul 2019 11:21:15 +0300 Subject: [PATCH] Thingspeak: async client fixes (#1806) * Thingspeak: async client fixes - remove memory management in async client callbacks - instead of global one, reference asyncclient from callback argument - transfer temporary data-buffer to the global scope, avoid accidental String copy - implement strnstr to search onData payload * fixup format strings * update retry time each attempt * start with default attempts value * trying to work with lwip2-536 * count time from last request * promptly run flush --- code/espurna/config/prototypes.h | 3 + code/espurna/thinkspeak.ino | 210 +++++++++++++++++++++---------- code/espurna/utils.ino | 16 +++ 3 files changed, 164 insertions(+), 65 deletions(-) diff --git a/code/espurna/config/prototypes.h b/code/espurna/config/prototypes.h index 9c7dc20e..ab6598da 100644 --- a/code/espurna/config/prototypes.h +++ b/code/espurna/config/prototypes.h @@ -183,6 +183,9 @@ bool inline eraseSDKConfig(); #define ARRAYINIT(type, name, ...) type name[] = {__VA_ARGS__}; +size_t strnlen(const char*, size_t); +char* strnstr(const char*, const char*, size_t); + // ----------------------------------------------------------------------------- // WebServer // ----------------------------------------------------------------------------- diff --git a/code/espurna/thinkspeak.ino b/code/espurna/thinkspeak.ino index f6cb70cc..e70a99f9 100644 --- a/code/espurna/thinkspeak.ino +++ b/code/espurna/thinkspeak.ino @@ -10,28 +10,35 @@ Copyright (C) 2019 by Xose PĂ©rez #if THINGSPEAK_USE_ASYNC #include -AsyncClient * _tspk_client; #else #include #endif +#define THINGSPEAK_DATA_BUFFER_SIZE 256 + const char THINGSPEAK_REQUEST_TEMPLATE[] PROGMEM = "POST %s HTTP/1.1\r\n" "Host: %s\r\n" "User-Agent: ESPurna\r\n" "Connection: close\r\n" "Content-Type: application/x-www-form-urlencoded\r\n" - "Content-Length: %d\r\n\r\n" - "%s\r\n"; + "Content-Length: %d\r\n\r\n"; bool _tspk_enabled = false; bool _tspk_clear = false; char * _tspk_queue[THINGSPEAK_FIELDS] = {NULL}; +String _tspk_data; bool _tspk_flush = false; unsigned long _tspk_last_flush = 0; -unsigned char _tspk_tries = 0; +unsigned char _tspk_tries = THINGSPEAK_TRIES; + +#if THINGSPEAK_USE_ASYNC +AsyncClient * _tspk_client; +bool _tspk_connecting = false; +bool _tspk_connected = false; +#endif // ----------------------------------------------------------------------------- @@ -92,50 +99,106 @@ void _tspkConfigure() { _tspk_enabled = false; setSetting("tspkEnabled", 0); } + if (_tspk_enabled && !_tspk_client) _tspkInitClient(); } #if THINGSPEAK_USE_ASYNC -void _tspkPost(String data) { - - if (_tspk_client == NULL) { - _tspk_client = new AsyncClient(); - } - - _tspk_client->onDisconnect([](void *s, AsyncClient *c) { - DEBUG_MSG_P(PSTR("[THINGSPEAK] Disconnected\n")); - _tspk_client->free(); - delete _tspk_client; - _tspk_client = NULL; - }, 0); +enum class tspk_state_t : uint8_t { + NONE, + HEADERS, + BODY +}; - _tspk_client->onTimeout([](void *s, AsyncClient *c, uint32_t time) { - _tspk_client->close(true); - }, 0); +tspk_state_t _tspk_client_state = tspk_state_t::NONE; +unsigned long _tspk_client_ts = 0; +constexpr const unsigned long THINGSPEAK_CLIENT_TIMEOUT = 5000; - _tspk_client->onData([](void * arg, AsyncClient * c, void * response, size_t len) { +void _tspkInitClient() { - char * b = (char *) response; - b[len] = 0; - char * p = strstr((char *)response, "\r\n\r\n"); - unsigned int code = (p != NULL) ? atoi(&p[4]) : 0; - DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %d\n"), code); + _tspk_client = new AsyncClient(); + _tspk_client->onDisconnect([](void * s, AsyncClient * client) { + DEBUG_MSG_P(PSTR("[THINGSPEAK] Disconnected\n")); + _tspk_data = ""; + _tspk_client_ts = 0; _tspk_last_flush = millis(); - if ((0 == code) && (--_tspk_tries > 0)) { - _tspk_flush = true; - DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing\n")); - } else { - _tspkClearQueue(); + _tspk_connected = false; + _tspk_connecting = false; + _tspk_client_state = tspk_state_t::NONE; + }, nullptr); + + _tspk_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) { + DEBUG_MSG_P(PSTR("[THINGSPEAK] Network timeout after %ums\n"), time); + client->close(true); + }, nullptr); + + _tspk_client->onPoll([](void * s, AsyncClient * client) { + uint32_t ts = millis() - _tspk_client_ts; + if (ts > THINGSPEAK_CLIENT_TIMEOUT) { + DEBUG_MSG_P(PSTR("[THINGSPEAK] No response after %ums\n"), ts); + client->close(true); } + }, nullptr); + + _tspk_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) { + + char * p = nullptr; + + do { + + p = nullptr; + + switch (_tspk_client_state) { + case tspk_state_t::NONE: + { + p = strnstr(reinterpret_cast(response), "HTTP/1.1 200 OK", len); + if (!p) { + client->close(true); + return; + } + _tspk_client_state = tspk_state_t::HEADERS; + continue; + } + case tspk_state_t::HEADERS: + { + p = strnstr(reinterpret_cast(response), "\r\n\r\n", len); + if (!p) return; + _tspk_client_state = tspk_state_t::BODY; + } + case tspk_state_t::BODY: + { + if (!p) { + p = strnstr(reinterpret_cast(response), "\r\n\r\n", len); + if (!p) return; + } + + unsigned int code = (p) ? atoi(&p[4]) : 0; + DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %u\n"), code); + + if ((0 == code) && _tspk_tries) { + _tspk_flush = true; + DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_tries); + } else { + _tspkClearQueue(); + } + + client->close(true); + + _tspk_client_state = tspk_state_t::NONE; + } + } - _tspk_client->close(true); + } while (_tspk_client_state != tspk_state_t::NONE); + + }, nullptr); - }, NULL); + _tspk_client->onConnect([](void * arg, AsyncClient * client) { - _tspk_client->onConnect([data](void * arg, AsyncClient * client) { + _tspk_connected = true; + _tspk_connecting = false; - DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%d\n"), THINGSPEAK_HOST, THINGSPEAK_PORT); + DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%u\n"), THINGSPEAK_HOST, THINGSPEAK_PORT); #if THINGSPEAK_USE_SSL uint8_t fp[20] = {0}; @@ -146,20 +209,27 @@ void _tspkPost(String data) { } #endif - DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, data.c_str()); - - char buffer[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + data.length()]; - snprintf_P(buffer, sizeof(buffer), + DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, _tspk_data.c_str()); + char headers[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + 1]; + snprintf_P(headers, sizeof(headers), THINGSPEAK_REQUEST_TEMPLATE, THINGSPEAK_URL, THINGSPEAK_HOST, - data.length(), - data.c_str() + _tspk_data.length() ); - client->write(buffer); + client->write(headers); + client->write(_tspk_data.c_str()); + + }, nullptr); + +} + +void _tspkPost() { - }, NULL); + if (_tspk_connected || _tspk_connecting) return; + + _tspk_client_ts = millis(); #if ASYNC_TCP_SSL_ENABLED bool connected = _tspk_client->connect(THINGSPEAK_HOST, THINGSPEAK_PORT, THINGSPEAK_USE_SSL); @@ -167,6 +237,8 @@ void _tspkPost(String data) { bool connected = _tspk_client->connect(THINGSPEAK_HOST, THINGSPEAK_PORT); #endif + _tspk_connecting = connected; + if (!connected) { DEBUG_MSG_P(PSTR("[THINGSPEAK] Connection failed\n")); _tspk_client->close(true); @@ -176,7 +248,7 @@ void _tspkPost(String data) { #else // THINGSPEAK_USE_ASYNC -void _tspkPost(String data) { +void _tspkPost() { #if THINGSPEAK_USE_SSL WiFiClientSecure _tspk_client; @@ -186,35 +258,36 @@ void _tspkPost(String data) { if (_tspk_client.connect(THINGSPEAK_HOST, THINGSPEAK_PORT)) { - DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%d\n"), THINGSPEAK_HOST, THINGSPEAK_PORT); + DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%u\n"), THINGSPEAK_HOST, THINGSPEAK_PORT); if (!_tspk_client.verify(THINGSPEAK_FINGERPRINT, THINGSPEAK_HOST)) { DEBUG_MSG_P(PSTR("[THINGSPEAK] Warning: certificate doesn't match\n")); } - DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, data.c_str()); - char buffer[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + data.length()]; - snprintf_P(buffer, sizeof(buffer), + DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, _tspk_data.c_str()); + char headers[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + 1]; + snprintf_P(headers, sizeof(headers), THINGSPEAK_REQUEST_TEMPLATE, THINGSPEAK_URL, THINGSPEAK_HOST, - data.length(), - data.c_str() + _tspk_data.length() ); - _tspk_client.print(buffer); + + _tspk_client.print(headers); + _tspk_client.print(_tspk_data); nice_delay(100); String response = _tspk_client.readString(); int pos = response.indexOf("\r\n\r\n"); unsigned int code = (pos > 0) ? response.substring(pos + 4).toInt() : 0; - DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %d\n"), code); + DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %u\n"), code); _tspk_client.stop(); _tspk_last_flush = millis(); - if ((0 == code) && (--_tspk_tries > 0)) { + if ((0 == code) && _tspk_tries) { _tspk_flush = true; - DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing\n")); + DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_tries); } else { _tspkClearQueue(); } @@ -230,13 +303,14 @@ void _tspkPost(String data) { #endif // THINGSPEAK_USE_ASYNC void _tspkEnqueue(unsigned char index, char * payload) { - DEBUG_MSG_P(PSTR("[THINGSPEAK] Enqueuing field #%d with value %s\n"), index, payload); + DEBUG_MSG_P(PSTR("[THINGSPEAK] Enqueuing field #%u with value %s\n"), index, payload); --index; if (_tspk_queue[index] != NULL) free(_tspk_queue[index]); _tspk_queue[index] = strdup(payload); } void _tspkClearQueue() { + _tspk_tries = THINGSPEAK_TRIES; if (_tspk_clear) { for (unsigned char id=0; id 0) data = data + String("&"); - data = data + String("field") + String(id+1) + String("=") + String(_tspk_queue[id]); + if (_tspk_data.length() > 0) _tspk_data.concat("&"); + char buf[32] = {0}; + snprintf_P(buf, sizeof(buf), PSTR("field%u=%s"), (id + 1), _tspk_queue[id]); + _tspk_data.concat(buf); } } // POST data if any - if (data.length() > 0) { - data = data + String("&api_key=") + getSetting("tspkKey"); - _tspk_tries = THINGSPEAK_TRIES; - _tspkPost(data); + if (_tspk_data.length()) { + _tspk_data.concat("&api_key="); + _tspk_data.concat(getSetting("tspkKey")); + --_tspk_tries; + _tspkPost(); } } @@ -326,9 +408,7 @@ void tspkSetup() { void tspkLoop() { if (!_tspk_enabled) return; if (!wifiConnected() || (WiFi.getMode() != WIFI_STA)) return; - if (_tspk_flush && (millis() - _tspk_last_flush > THINGSPEAK_MIN_INTERVAL)) { - _tspkFlush(); - } + _tspkFlush(); } #endif diff --git a/code/espurna/utils.ino b/code/espurna/utils.ino index f74d1bdf..aefac90a 100644 --- a/code/espurna/utils.ino +++ b/code/espurna/utils.ino @@ -612,3 +612,19 @@ bool isNumber(const char * s) { } return digit; } + +// ref: lwip2 lwip_strnstr with strnlen +char* strnstr(const char* buffer, const char* token, size_t n) { + size_t token_len = strnlen(token, n); + if (token_len == 0) { + return const_cast(buffer); + } + + for (const char* p = buffer; *p && (p + token_len <= buffer + n); p++) { + if ((*p == *token) && (strncmp(p, token, token_len) == 0)) { + return const_cast(p); + } + } + + return nullptr; +}