@ -18,19 +18,102 @@ Copyright (C) 2016-2019 by Xose Pérez <xose dot perez at gmail dot com>
AsyncWebSocket _ws ( " /ws " ) ;
Ticker _web_defer ;
std : : vector < ws_on_send_callback_f > _ws_on_send_callbacks ;
std : : vector < ws_on_action_callback_f > _ws_on_action_callbacks ;
std : : vector < ws_on_receive_callback_f > _ws_on_receive_callbacks ;
// -----------------------------------------------------------------------------
// WS callbacks
// -----------------------------------------------------------------------------
ws_callbacks_t _ws_callbacks ;
struct ws_counter_t {
ws_counter_t ( ) : current ( 0 ) , start ( 0 ) , stop ( 0 ) { }
ws_counter_t ( uint32_t start , uint32_t stop ) :
current ( start ) , start ( start ) , stop ( stop ) { }
void reset ( ) {
current = start ;
}
void next ( ) {
if ( current < stop ) {
+ + current ;
}
}
bool done ( ) {
return ( current > = stop ) ;
}
uint32_t current ;
uint32_t start ;
uint32_t stop ;
} ;
struct ws_data_t {
enum mode_t {
SEQUENCE ,
ALL
} ;
ws_data_t ( const ws_on_send_callback_f & cb ) :
storage ( new ws_on_send_callback_list_t { cb } ) ,
client_id ( 0 ) ,
mode ( ALL ) ,
callbacks ( * storage . get ( ) ) ,
counter ( 0 , 1 )
{ }
ws_data_t ( const uint32_t client_id , const ws_on_send_callback_list_t & callbacks , mode_t mode = SEQUENCE ) :
client_id ( client_id ) ,
mode ( mode ) ,
callbacks ( callbacks ) ,
counter ( 0 , callbacks . size ( ) )
{ }
bool done ( ) {
return counter . done ( ) ;
}
void sendAll ( JsonObject & root ) {
while ( ! counter . done ( ) ) counter . next ( ) ;
for ( auto & callback : callbacks ) {
callback ( root ) ;
}
}
void sendCurrent ( JsonObject & root ) {
callbacks [ counter . current ] ( root ) ;
counter . next ( ) ;
}
void send ( JsonObject & root ) {
switch ( mode ) {
case SEQUENCE : sendCurrent ( root ) ; break ;
case ALL : sendAll ( root ) ; break ;
}
}
std : : unique_ptr < ws_on_send_callback_list_t > storage ;
const uint32_t client_id ;
const mode_t mode ;
const ws_on_send_callback_list_t & callbacks ;
ws_counter_t counter ;
} ;
std : : queue < ws_data_t > _ws_client_data ;
// -----------------------------------------------------------------------------
// Private methods
// WS authentication
// -----------------------------------------------------------------------------
typedef struct {
struct ws_ticket_ t {
IPAddress ip ;
unsigned long timestamp = 0 ;
} ws_ticket_t ;
ws_ticket_t _ticket [ WS_BUFFER_SIZE ] ;
} ;
ws_ticket_t _ws_ tickets [ WS_BUFFER_SIZE ] ;
void _onAuth ( AsyncWebServerRequest * request ) {
@ -41,15 +124,15 @@ void _onAuth(AsyncWebServerRequest *request) {
unsigned long now = millis ( ) ;
unsigned short index ;
for ( index = 0 ; index < WS_BUFFER_SIZE ; index + + ) {
if ( _ticket [ index ] . ip = = ip ) break ;
if ( _ticket [ index ] . timestamp = = 0 ) break ;
if ( now - _ticket [ index ] . timestamp > WS_TIMEOUT ) break ;
if ( _ws_ tickets [ index ] . ip = = ip ) break ;
if ( _ws_ tickets [ index ] . timestamp = = 0 ) break ;
if ( now - _ws_ tickets [ index ] . timestamp > WS_TIMEOUT ) break ;
}
if ( index = = WS_BUFFER_SIZE ) {
request - > send ( 429 ) ;
} else {
_ticket [ index ] . ip = ip ;
_ticket [ index ] . timestamp = now ;
_ws_ tickets [ index ] . ip = ip ;
_ws_ tickets [ index ] . timestamp = now ;
request - > send ( 200 , " text/plain " , " OK " ) ;
}
@ -62,7 +145,7 @@ bool _wsAuth(AsyncWebSocketClient * client) {
unsigned short index = 0 ;
for ( index = 0 ; index < WS_BUFFER_SIZE ; index + + ) {
if ( ( _ticket [ index ] . ip = = ip ) & & ( now - _ticket [ index ] . timestamp < WS_TIMEOUT ) ) break ;
if ( ( _ws_ tickets [ index ] . ip = = ip ) & & ( now - _ws_ tickets [ index ] . timestamp < WS_TIMEOUT ) ) break ;
}
if ( index = = WS_BUFFER_SIZE ) {
@ -73,39 +156,43 @@ bool _wsAuth(AsyncWebSocketClient * client) {
}
// -----------------------------------------------------------------------------
// Debug
// -----------------------------------------------------------------------------
# if DEBUG_WEB_SUPPORT
bool wsDebugSend ( const char * prefix , const char * message ) {
if ( ! wsConnected ( ) ) return false ;
if ( getFreeHeap ( ) < ( strlen ( message ) * 3 ) ) return false ;
DynamicJsonBuffer jsonBuffer ;
JsonObject & root = jsonBuffer . createObject ( ) ;
JsonObject & weblog = root . createNestedObject ( " weblog " ) ;
// via: https://arduinojson.org/v6/assistant/
// we use 1 object for "weblog", 2nd one for "message". "prefix", optional
StaticJsonBuffer < JSON_OBJECT_SIZE ( 1 ) + JSON_OBJECT_SIZE ( 2 ) > jsonBuffer ;
JsonObject & root = jsonBuffer . createObject ( ) ;
JsonObject & weblog = root . createNestedObject ( " weblog " ) ;
weblog . set ( " message " , message ) ;
weblog [ " message " ] = message ;
if ( prefix & & ( prefix [ 0 ] ! = ' \0 ' ) ) {
weblog . set ( " prefix " , prefix ) ;
weblog [ " prefix " ] = prefix ;
}
// TODO: avoid serializing twice and just measure json ourselves?
//const size_t len = strlen(message) + strlen(prefix)
// + strlen("{\"weblog\":}")
// + strlen("{\"message\":\"\"}")
// + (strlen(prefix) ? strlen("\",\"prefix\":\"\"") : 0);
//wsSend(root, len);
wsSend ( root ) ;
return true ;
}
# endif
// -----------------------------------------------------------------------------
# if MQTT_SUPPORT
void _wsMQTTCallback ( unsigned int type , const char * topic , const char * payload ) {
if ( type = = MQTT_CONNECT_EVENT ) wsSend_P ( PSTR ( " { \" mqttStatus \" : true} " ) ) ;
if ( type = = MQTT_DISCONNECT_EVENT ) wsSend_P ( PSTR ( " { \" mqttStatus \" : false} " ) ) ;
}
# endif
bool _wsStore ( String key , String value ) {
// Check the existing setting before saving it
// TODO: this should know of the default values, somehow?
// TODO: move webPort handling somewhere else?
bool _wsStore ( const String & key , const String & value ) {
// HTTP port
if ( key = = " webPort " ) {
if ( ( value . toInt ( ) = = 0 ) | | ( value . toInt ( ) = = 80 ) ) {
return delSetting ( key ) ;
@ -120,7 +207,11 @@ bool _wsStore(String key, String value) {
}
bool _wsStore ( String key , JsonArray & value ) {
// -----------------------------------------------------------------------------
// Store indexed key (key0, key1, etc.) from array
// -----------------------------------------------------------------------------
bool _wsStore ( const String & key , JsonArray & value ) {
bool changed = false ;
@ -140,6 +231,15 @@ bool _wsStore(String key, JsonArray& value) {
}
bool _wsCheckKey ( const String & key , JsonVariant & value ) {
for ( auto & callback : _ws_callbacks . on_keycheck ) {
if ( callback ( key . c_str ( ) , value ) ) return true ;
// TODO: remove this to call all OnKeyCheckCallbacks with the
// current key/value
}
return false ;
}
void _wsParse ( AsyncWebSocketClient * client , uint8_t * payload , size_t length ) {
//DEBUG_MSG_P(PSTR("[WEBSOCKET] Parsing: %s\n"), length ? (char*) payload : "");
@ -147,11 +247,22 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
// Get client ID
uint32_t client_id = client - > id ( ) ;
// Check early for empty object / nothing
if ( ( length = = 0 ) | | ( length = = 1 ) ) {
return ;
}
if ( ( length = = 3 ) & & ( strcmp ( ( char * ) payload , " {} " ) = = 0 ) ) {
return ;
}
// Parse JSON input
DynamicJsonBuffer jsonBuffer ;
// TODO: json buffer should be pretty efficient with the non-const payload,
// most of the space is taken by the object key references
DynamicJsonBuffer jsonBuffer ( 512 ) ;
JsonObject & root = jsonBuffer . parseObject ( ( char * ) payload ) ;
if ( ! root . success ( ) ) {
DEBUG_MSG_P ( PSTR ( " [WEBSOCKET] Error parsing data \n " ) ) ;
DEBUG_MSG_P ( PSTR ( " [WEBSOCKET] JSON parsing error \n " ) ) ;
wsSend_P ( client_id , PSTR ( " { \" message \" : 3} " ) ) ;
return ;
}
@ -184,8 +295,8 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
if ( data . success ( ) ) {
// Callbacks
for ( unsigned char i = 0 ; i < _ws_on_action_callbacks . size ( ) ; i + + ) {
( _ws_on_action_ callbacks [ i ] ) ( client_id , action , data ) ;
for ( auto & callback : _ws_callbacks . on_action ) {
callback ( client_id , action , data ) ;
}
// Restore configuration via websockets
@ -237,15 +348,7 @@ void _wsParse(AsyncWebSocketClient *client, uint8_t * payload, size_t length) {
continue ;
}
// Check if key has to be processed
bool found = false ;
for ( unsigned char i = 0 ; i < _ws_on_receive_callbacks . size ( ) ; i + + ) {
found | = ( _ws_on_receive_callbacks [ i ] ) ( key . c_str ( ) , value ) ;
// TODO: remove this to call all OnReceiveCallbacks with the
// current key/value
if ( found ) break ;
}
if ( ! found ) {
if ( ! _wsCheckKey ( key , value ) ) {
delSetting ( key ) ;
continue ;
}
@ -312,7 +415,7 @@ void _wsDoUpdate(bool reset = false) {
}
bool _wsOnReceive ( const char * key , JsonVariant & value ) {
bool _wsOnKeyCheck ( const char * key , JsonVariant & value ) {
if ( strncmp ( key , " ws " , 2 ) = = 0 ) return true ;
if ( strncmp ( key , " admin " , 5 ) = = 0 ) return true ;
if ( strncmp ( key , " hostname " , 8 ) = = 0 ) return true ;
@ -321,15 +424,9 @@ bool _wsOnReceive(const char * key, JsonVariant& value) {
return false ;
}
void _wsOnStart ( JsonObject & root ) {
void _wsOnConnected ( JsonObject & root ) {
char chipid [ 7 ] ;
snprintf_P ( chipid , sizeof ( chipid ) , PSTR ( " %06X " ) , ESP . getChipId ( ) ) ;
uint8_t * bssid = WiFi . BSSID ( ) ;
char bssid_str [ 20 ] ;
snprintf_P ( bssid_str , sizeof ( bssid_str ) ,
PSTR ( " %02X:%02X:%02X:%02X:%02X:%02X " ) ,
bssid [ 0 ] , bssid [ 1 ] , bssid [ 2 ] , bssid [ 3 ] , bssid [ 4 ] , bssid [ 5 ]
) ;
root [ " webMode " ] = WEB_MODE_NORMAL ;
@ -342,7 +439,7 @@ void _wsOnStart(JsonObject& root) {
root [ " manufacturer " ] = MANUFACTURER ;
root [ " chipid " ] = String ( chipid ) ;
root [ " mac " ] = WiFi . macAddress ( ) ;
root [ " bssid " ] = String ( bssid_str ) ;
root [ " bssid " ] = WiFi . BSSIDstr ( ) ;
root [ " channel " ] = WiFi . channel ( ) ;
root [ " device " ] = DEVICE ;
root [ " hostname " ] = getSetting ( " hostname " ) ;
@ -368,6 +465,7 @@ void _wsOnStart(JsonObject& root) {
}
void wsSend ( JsonObject & root ) {
// TODO: avoid serializing twice?
size_t len = root . measureLength ( ) ;
AsyncWebSocketMessageBuffer * buffer = _ws . makeBuffer ( len ) ;
@ -381,6 +479,7 @@ void wsSend(uint32_t client_id, JsonObject& root) {
AsyncWebSocketClient * client = _ws . client ( client_id ) ;
if ( client = = nullptr ) return ;
// TODO: avoid serializing twice?
size_t len = root . measureLength ( ) ;
AsyncWebSocketMessageBuffer * buffer = _ws . makeBuffer ( len ) ;
@ -390,27 +489,24 @@ void wsSend(uint32_t client_id, JsonObject& root) {
}
}
void _wsStart ( uint32_t client_id ) {
# if USE_PASSWORD && WEB_FORCE_PASS_CHANGE
bool changePassword = getAdminPass ( ) . equals ( ADMIN_PASS ) ;
# else
bool changePassword = false ;
# endif
void _wsConnected ( uint32_t client_id ) {
DynamicJsonBuffer jsonBuffer ;
JsonObject & root = jsonBuffer . createObject ( ) ;
const bool changePassword = ( USE_PASSWORD & & WEB_FORCE_PASS_CHANGE )
? getAdminPass ( ) . equals ( ADMIN_PASS )
: false ;
if ( changePassword ) {
StaticJsonBuffer < JSON_OBJECT_SIZE ( 1 ) > jsonBuffer ;
JsonObject & root = jsonBuffer . createObject ( ) ;
root [ " webMode " ] = WEB_MODE_PASSWORD ;
wsSend ( root ) ;
wsSend ( client_id , root ) ;
return ;
}
for ( auto & callback : _ws_on_send_callbacks ) {
callback ( root ) ;
}
wsPostAll ( client_id , _ws_callbacks . on_visible ) ;
wsPostSequence ( client_id , _ws_callbacks . on_connected ) ;
wsPostSequence ( client_id , _ws_callbacks . on_data ) ;
wsSend ( client_id , root ) ;
}
void _wsEvent ( AsyncWebSocket * server , AsyncWebSocketClient * client , AwsEventType type , void * arg , uint8_t * data , size_t len ) {
@ -430,8 +526,7 @@ void _wsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventTy
IPAddress ip = client - > remoteIP ( ) ;
DEBUG_MSG_P ( PSTR ( " [WEBSOCKET] #%u connected, ip: %d.%d.%d.%d, url: %s \n " ) , client - > id ( ) , ip [ 0 ] , ip [ 1 ] , ip [ 2 ] , ip [ 3 ] , server - > url ( ) ) ;
_wsStart ( client - > id ( ) ) ;
client - > _tempObject = new WebSocketIncommingBuffer ( & _wsParse , true ) ;
_wsConnected ( client - > id ( ) ) ;
wifiReconnectCheck ( ) ;
} else if ( type = = WS_EVT_DISCONNECT ) {
@ -449,6 +544,7 @@ void _wsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventTy
} else if ( type = = WS_EVT_DATA ) {
//DEBUG_MSG_P(PSTR("[WEBSOCKET] #%u data(%u): %s\n"), client->id(), len, len ? (char*) data : "");
if ( ! client - > _tempObject ) return ;
WebSocketIncommingBuffer * buffer = ( WebSocketIncommingBuffer * ) client - > _tempObject ;
AwsFrameInfo * info = ( AwsFrameInfo * ) arg ;
buffer - > data_event ( client , info , data , len ) ;
@ -457,9 +553,52 @@ void _wsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventTy
}
// TODO: make this generic loop method to queue important ws messages?
// or, if something uses ticker / async ctx to send messages,
// it needs a retry mechanism built into the callback object
void _wsHandleClientData ( ) {
if ( _ws_client_data . empty ( ) ) return ;
auto & data = _ws_client_data . front ( ) ;
AsyncWebSocketClient * ws_client = _ws . client ( data . client_id ) ;
if ( ! ws_client ) {
_ws_client_data . pop ( ) ;
return ;
}
// wait until we can send the next batch of messages
// XXX: enforce that callbacks send only one message per iteration
if ( ws_client - > queueIsFull ( ) ) {
return ;
}
// XXX: block allocation will try to create *2 next time,
// likely failing and causing wsSend to reference empty objects
// XXX: arduinojson6 will not do this, but we may need to use per-callback buffers
constexpr const size_t BUFFER_SIZE = 3192 ;
DynamicJsonBuffer jsonBuffer ( BUFFER_SIZE ) ;
JsonObject & root = jsonBuffer . createObject ( ) ;
data . send ( root ) ;
if ( data . client_id ) {
wsSend ( data . client_id , root ) ;
} else {
wsSend ( root ) ;
}
yield ( ) ;
if ( data . done ( ) ) {
// push the queue and finally allow incoming messages
_ws_client_data . pop ( ) ;
ws_client - > _tempObject = new WebSocketIncommingBuffer ( _wsParse , true ) ;
}
}
void _wsLoop ( ) {
if ( ! wsConnected ( ) ) return ;
_wsDoUpdate ( ) ;
_wsHandleClientData ( ) ;
}
// -----------------------------------------------------------------------------
@ -474,21 +613,13 @@ bool wsConnected(uint32_t client_id) {
return _ws . hasClient ( client_id ) ;
}
void wsOnSendRegister ( ws_on_send_callback_f callback ) {
_ws_on_send_callbacks . push_back ( callback ) ;
}
void wsOnReceiveRegister ( ws_on_receive_callback_f callback ) {
_ws_on_receive_callbacks . push_back ( callback ) ;
}
void wsOnActionRegister ( ws_on_action_callback_f callback ) {
_ws_on_action_callbacks . push_back ( callback ) ;
ws_callbacks_t & wsRegister ( ) {
return _ws_callbacks ;
}
void wsSend ( ws_on_send_callback_f callback ) {
if ( _ws . count ( ) > 0 ) {
DynamicJsonBuffer jsonBuffer ;
DynamicJsonBuffer jsonBuffer ( 512 ) ;
JsonObject & root = jsonBuffer . createObject ( ) ;
callback ( root ) ;
@ -514,17 +645,10 @@ void wsSend(uint32_t client_id, ws_on_send_callback_f callback) {
AsyncWebSocketClient * client = _ws . client ( client_id ) ;
if ( client = = nullptr ) return ;
DynamicJsonBuffer jsonBuffer ;
DynamicJsonBuffer jsonBuffer ( 512 ) ;
JsonObject & root = jsonBuffer . createObject ( ) ;
callback ( root ) ;
size_t len = root . measureLength ( ) ;
AsyncWebSocketMessageBuffer * buffer = _ws . makeBuffer ( len ) ;
if ( buffer ) {
root . printTo ( reinterpret_cast < char * > ( buffer - > get ( ) ) , len + 1 ) ;
client - > text ( buffer ) ;
}
wsSend ( client_id , root ) ;
}
void wsSend ( uint32_t client_id , const char * payload ) {
@ -537,6 +661,26 @@ void wsSend_P(uint32_t client_id, PGM_P payload) {
_ws . text ( client_id , buffer ) ;
}
void wsPost ( const ws_on_send_callback_f & cb ) {
_ws_client_data . emplace ( cb ) ;
}
void wsPostAll ( uint32_t client_id , const ws_on_send_callback_list_t & cbs ) {
_ws_client_data . emplace ( client_id , cbs , ws_data_t : : ALL ) ;
}
void wsPostAll ( const ws_on_send_callback_list_t & cbs ) {
_ws_client_data . emplace ( 0 , cbs , ws_data_t : : ALL ) ;
}
void wsPostSequence ( uint32_t client_id , const ws_on_send_callback_list_t & cbs ) {
_ws_client_data . emplace ( client_id , cbs , ws_data_t : : SEQUENCE ) ;
}
void wsPostSequence ( const ws_on_send_callback_list_t & cbs ) {
_ws_client_data . emplace ( 0 , cbs , ws_data_t : : SEQUENCE ) ;
}
void wsSetup ( ) {
_ws . onEvent ( _wsEvent ) ;
@ -551,12 +695,10 @@ void wsSetup() {
webServer ( ) - > on ( " /auth " , HTTP_GET , _onAuth ) ;
# if MQTT_SUPPORT
mqttRegister ( _wsMQTTCallback ) ;
# endif
wsRegister ( )
. onConnected ( _wsOnConnected )
. onKeyCheck ( _wsOnKeyCheck ) ;
wsOnSendRegister ( _wsOnStart ) ;
wsOnReceiveRegister ( _wsOnReceive ) ;
espurnaRegisterLoop ( _wsLoop ) ;
}