Browse Source

Thingspeak: async client fixes (#1806)

* Thingspeak: async client fixes

- remove memory management in async client callbacks
- instead of global one, reference asyncclient from callback argument
- transfer temporary data-buffer to the global scope, avoid accidental String copy
- implement strnstr to search onData payload

* fixup format strings

* update retry time each attempt

* start with default attempts value

* trying to work with lwip2-536

* count time from last request

* promptly run flush
pull/1812/head
Max Prokhorov 4 years ago
committed by GitHub
parent
commit
bd00758162
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 65 deletions
  1. +3
    -0
      code/espurna/config/prototypes.h
  2. +145
    -65
      code/espurna/thinkspeak.ino
  3. +16
    -0
      code/espurna/utils.ino

+ 3
- 0
code/espurna/config/prototypes.h View File

@ -183,6 +183,9 @@ bool inline eraseSDKConfig();
#define ARRAYINIT(type, name, ...) type name[] = {__VA_ARGS__};
size_t strnlen(const char*, size_t);
char* strnstr(const char*, const char*, size_t);
// -----------------------------------------------------------------------------
// WebServer
// -----------------------------------------------------------------------------


+ 145
- 65
code/espurna/thinkspeak.ino View File

@ -10,28 +10,35 @@ Copyright (C) 2019 by Xose Pérez <xose dot perez at gmail dot com>
#if THINGSPEAK_USE_ASYNC
#include <ESPAsyncTCP.h>
AsyncClient * _tspk_client;
#else
#include <ESP8266WiFi.h>
#endif
#define THINGSPEAK_DATA_BUFFER_SIZE 256
const char THINGSPEAK_REQUEST_TEMPLATE[] PROGMEM =
"POST %s HTTP/1.1\r\n"
"Host: %s\r\n"
"User-Agent: ESPurna\r\n"
"Connection: close\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n"
"Content-Length: %d\r\n\r\n"
"%s\r\n";
"Content-Length: %d\r\n\r\n";
bool _tspk_enabled = false;
bool _tspk_clear = false;
char * _tspk_queue[THINGSPEAK_FIELDS] = {NULL};
String _tspk_data;
bool _tspk_flush = false;
unsigned long _tspk_last_flush = 0;
unsigned char _tspk_tries = 0;
unsigned char _tspk_tries = THINGSPEAK_TRIES;
#if THINGSPEAK_USE_ASYNC
AsyncClient * _tspk_client;
bool _tspk_connecting = false;
bool _tspk_connected = false;
#endif
// -----------------------------------------------------------------------------
@ -92,50 +99,106 @@ void _tspkConfigure() {
_tspk_enabled = false;
setSetting("tspkEnabled", 0);
}
if (_tspk_enabled && !_tspk_client) _tspkInitClient();
}
#if THINGSPEAK_USE_ASYNC
void _tspkPost(String data) {
if (_tspk_client == NULL) {
_tspk_client = new AsyncClient();
}
_tspk_client->onDisconnect([](void *s, AsyncClient *c) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Disconnected\n"));
_tspk_client->free();
delete _tspk_client;
_tspk_client = NULL;
}, 0);
enum class tspk_state_t : uint8_t {
NONE,
HEADERS,
BODY
};
_tspk_client->onTimeout([](void *s, AsyncClient *c, uint32_t time) {
_tspk_client->close(true);
}, 0);
tspk_state_t _tspk_client_state = tspk_state_t::NONE;
unsigned long _tspk_client_ts = 0;
constexpr const unsigned long THINGSPEAK_CLIENT_TIMEOUT = 5000;
_tspk_client->onData([](void * arg, AsyncClient * c, void * response, size_t len) {
void _tspkInitClient() {
char * b = (char *) response;
b[len] = 0;
char * p = strstr((char *)response, "\r\n\r\n");
unsigned int code = (p != NULL) ? atoi(&p[4]) : 0;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %d\n"), code);
_tspk_client = new AsyncClient();
_tspk_client->onDisconnect([](void * s, AsyncClient * client) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Disconnected\n"));
_tspk_data = "";
_tspk_client_ts = 0;
_tspk_last_flush = millis();
if ((0 == code) && (--_tspk_tries > 0)) {
_tspk_flush = true;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing\n"));
} else {
_tspkClearQueue();
_tspk_connected = false;
_tspk_connecting = false;
_tspk_client_state = tspk_state_t::NONE;
}, nullptr);
_tspk_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Network timeout after %ums\n"), time);
client->close(true);
}, nullptr);
_tspk_client->onPoll([](void * s, AsyncClient * client) {
uint32_t ts = millis() - _tspk_client_ts;
if (ts > THINGSPEAK_CLIENT_TIMEOUT) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] No response after %ums\n"), ts);
client->close(true);
}
}, nullptr);
_tspk_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) {
char * p = nullptr;
do {
p = nullptr;
switch (_tspk_client_state) {
case tspk_state_t::NONE:
{
p = strnstr(reinterpret_cast<const char *>(response), "HTTP/1.1 200 OK", len);
if (!p) {
client->close(true);
return;
}
_tspk_client_state = tspk_state_t::HEADERS;
continue;
}
case tspk_state_t::HEADERS:
{
p = strnstr(reinterpret_cast<const char *>(response), "\r\n\r\n", len);
if (!p) return;
_tspk_client_state = tspk_state_t::BODY;
}
case tspk_state_t::BODY:
{
if (!p) {
p = strnstr(reinterpret_cast<const char *>(response), "\r\n\r\n", len);
if (!p) return;
}
unsigned int code = (p) ? atoi(&p[4]) : 0;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %u\n"), code);
if ((0 == code) && _tspk_tries) {
_tspk_flush = true;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_tries);
} else {
_tspkClearQueue();
}
client->close(true);
_tspk_client_state = tspk_state_t::NONE;
}
}
_tspk_client->close(true);
} while (_tspk_client_state != tspk_state_t::NONE);
}, nullptr);
}, NULL);
_tspk_client->onConnect([](void * arg, AsyncClient * client) {
_tspk_client->onConnect([data](void * arg, AsyncClient * client) {
_tspk_connected = true;
_tspk_connecting = false;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%d\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%u\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);
#if THINGSPEAK_USE_SSL
uint8_t fp[20] = {0};
@ -146,20 +209,27 @@ void _tspkPost(String data) {
}
#endif
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, data.c_str());
char buffer[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + data.length()];
snprintf_P(buffer, sizeof(buffer),
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, _tspk_data.c_str());
char headers[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + 1];
snprintf_P(headers, sizeof(headers),
THINGSPEAK_REQUEST_TEMPLATE,
THINGSPEAK_URL,
THINGSPEAK_HOST,
data.length(),
data.c_str()
_tspk_data.length()
);
client->write(buffer);
client->write(headers);
client->write(_tspk_data.c_str());
}, nullptr);
}
void _tspkPost() {
}, NULL);
if (_tspk_connected || _tspk_connecting) return;
_tspk_client_ts = millis();
#if ASYNC_TCP_SSL_ENABLED
bool connected = _tspk_client->connect(THINGSPEAK_HOST, THINGSPEAK_PORT, THINGSPEAK_USE_SSL);
@ -167,6 +237,8 @@ void _tspkPost(String data) {
bool connected = _tspk_client->connect(THINGSPEAK_HOST, THINGSPEAK_PORT);
#endif
_tspk_connecting = connected;
if (!connected) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connection failed\n"));
_tspk_client->close(true);
@ -176,7 +248,7 @@ void _tspkPost(String data) {
#else // THINGSPEAK_USE_ASYNC
void _tspkPost(String data) {
void _tspkPost() {
#if THINGSPEAK_USE_SSL
WiFiClientSecure _tspk_client;
@ -186,35 +258,36 @@ void _tspkPost(String data) {
if (_tspk_client.connect(THINGSPEAK_HOST, THINGSPEAK_PORT)) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%d\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Connected to %s:%u\n"), THINGSPEAK_HOST, THINGSPEAK_PORT);
if (!_tspk_client.verify(THINGSPEAK_FINGERPRINT, THINGSPEAK_HOST)) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Warning: certificate doesn't match\n"));
}
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, data.c_str());
char buffer[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + data.length()];
snprintf_P(buffer, sizeof(buffer),
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), THINGSPEAK_URL, _tspk_data.c_str());
char headers[strlen_P(THINGSPEAK_REQUEST_TEMPLATE) + strlen(THINGSPEAK_URL) + strlen(THINGSPEAK_HOST) + 1];
snprintf_P(headers, sizeof(headers),
THINGSPEAK_REQUEST_TEMPLATE,
THINGSPEAK_URL,
THINGSPEAK_HOST,
data.length(),
data.c_str()
_tspk_data.length()
);
_tspk_client.print(buffer);
_tspk_client.print(headers);
_tspk_client.print(_tspk_data);
nice_delay(100);
String response = _tspk_client.readString();
int pos = response.indexOf("\r\n\r\n");
unsigned int code = (pos > 0) ? response.substring(pos + 4).toInt() : 0;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %d\n"), code);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Response value: %u\n"), code);
_tspk_client.stop();
_tspk_last_flush = millis();
if ((0 == code) && (--_tspk_tries > 0)) {
if ((0 == code) && _tspk_tries) {
_tspk_flush = true;
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing\n"));
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_tries);
} else {
_tspkClearQueue();
}
@ -230,13 +303,14 @@ void _tspkPost(String data) {
#endif // THINGSPEAK_USE_ASYNC
void _tspkEnqueue(unsigned char index, char * payload) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Enqueuing field #%d with value %s\n"), index, payload);
DEBUG_MSG_P(PSTR("[THINGSPEAK] Enqueuing field #%u with value %s\n"), index, payload);
--index;
if (_tspk_queue[index] != NULL) free(_tspk_queue[index]);
_tspk_queue[index] = strdup(payload);
}
void _tspkClearQueue() {
_tspk_tries = THINGSPEAK_TRIES;
if (_tspk_clear) {
for (unsigned char id=0; id<THINGSPEAK_FIELDS; id++) {
if (_tspk_queue[id] != NULL) {
@ -249,22 +323,30 @@ void _tspkClearQueue() {
void _tspkFlush() {
if (!_tspk_flush) return;
if (millis() - _tspk_last_flush < THINGSPEAK_MIN_INTERVAL) return;
if (_tspk_connected || _tspk_connecting) return;
_tspk_last_flush = millis();
_tspk_flush = false;
_tspk_data.reserve(THINGSPEAK_DATA_BUFFER_SIZE);
// Walk the fields
String data;
// Walk the fields, numbered 1...THINGSPEAK_FIELDS
for (unsigned char id=0; id<THINGSPEAK_FIELDS; id++) {
if (_tspk_queue[id] != NULL) {
if (data.length() > 0) data = data + String("&");
data = data + String("field") + String(id+1) + String("=") + String(_tspk_queue[id]);
if (_tspk_data.length() > 0) _tspk_data.concat("&");
char buf[32] = {0};
snprintf_P(buf, sizeof(buf), PSTR("field%u=%s"), (id + 1), _tspk_queue[id]);
_tspk_data.concat(buf);
}
}
// POST data if any
if (data.length() > 0) {
data = data + String("&api_key=") + getSetting("tspkKey");
_tspk_tries = THINGSPEAK_TRIES;
_tspkPost(data);
if (_tspk_data.length()) {
_tspk_data.concat("&api_key=");
_tspk_data.concat(getSetting("tspkKey"));
--_tspk_tries;
_tspkPost();
}
}
@ -326,9 +408,7 @@ void tspkSetup() {
void tspkLoop() {
if (!_tspk_enabled) return;
if (!wifiConnected() || (WiFi.getMode() != WIFI_STA)) return;
if (_tspk_flush && (millis() - _tspk_last_flush > THINGSPEAK_MIN_INTERVAL)) {
_tspkFlush();
}
_tspkFlush();
}
#endif

+ 16
- 0
code/espurna/utils.ino View File

@ -612,3 +612,19 @@ bool isNumber(const char * s) {
}
return digit;
}
// ref: lwip2 lwip_strnstr with strnlen
char* strnstr(const char* buffer, const char* token, size_t n) {
size_t token_len = strnlen(token, n);
if (token_len == 0) {
return const_cast<char*>(buffer);
}
for (const char* p = buffer; *p && (p + token_len <= buffer + n); p++) {
if ((*p == *token) && (strncmp(p, token, token_len) == 0)) {
return const_cast<char*>(p);
}
}
return nullptr;
}

Loading…
Cancel
Save