@ -1,4 +1,5 @@
#!/usr/bin/env python3
# pylint: disable=C0330
# coding=utf-8
# -------------------------------------------------------------------------------
# ESPurna OTA manager
@ -6,7 +7,6 @@
#
# Requires PlatformIO Core
# -------------------------------------------------------------------------------
from __future__ import print_function
import argparse
import os
@ -17,117 +17,148 @@ import subprocess
import sys
import time
from zeroconf import ServiceBrowser , ServiceStateChange , Zeroconf
try :
# noinspection PyUnresolvedReferences
input = raw_input # Python2 *!! redefining build-in input.
except NameError :
pass # Python3
import zeroconf
# -------------------------------------------------------------------------------
DISCOVER_TIMEOUT = 2
description = " ESPurna OTA Manager v0.3 "
devices = [ ]
discover_last = 0
__version__ = ( 0 , 4 )
DESCRIPTION = " ESPurna OTA Manager v{} " . format ( " . " . join ( str ( x ) for x in __version__ ) )
DISCOVERY_TIMEOUT = 10
# -------------------------------------------------------------------------------
def on_service_state_change ( zeroconf , service_type , name , state_change ) :
"""
Callback that adds discovered devices to " devices " list
"""
global discover_last
if state_change is ServiceStateChange . Added :
discover_last = time . time ( )
info = zeroconf . get_service_info ( service_type , name )
if info :
hostname = info . server . split ( " . " ) [ 0 ]
device = {
' hostname ' : hostname . upper ( ) ,
' ip ' : socket . inet_ntoa ( info . address ) ,
' mac ' : ' ' ,
' app_name ' : ' ' ,
' app_version ' : ' ' ,
' build_date ' : ' ' ,
' target_board ' : ' ' ,
' mem_size ' : 0 ,
' sdk_size ' : 0 ,
' free_space ' : 0 ,
}
for key , item in info . properties . items ( ) :
device [ key . decode ( ' UTF-8 ' ) ] = item . decode ( ' UTF-8 ' )
# rename fields (needed for sorting by name)
device [ ' app ' ] = device [ ' app_name ' ]
device [ ' device ' ] = device [ ' target_board ' ]
device [ ' version ' ] = device [ ' app_version ' ]
devices . append ( device )
class Printer :
OUTPUT_FORMAT = " {:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10} "
def header ( self ) :
print (
self . OUTPUT_FORMAT . format (
" # " ,
" HOSTNAME " ,
" IP " ,
" MAC " ,
" APP " ,
" VERSION " ,
" BUILD_DATE " ,
" DEVICE " ,
" MEM_SIZE " ,
" SDK_SIZE " ,
" FREE_SPACE " ,
)
)
print ( " - " * 164 )
def device ( self , index , device ) :
print (
self . OUTPUT_FORMAT . format (
index ,
device . get ( " hostname " , " " ) ,
device . get ( " ip " , " " ) ,
device . get ( " mac " , " " ) ,
device . get ( " app_name " , " " ) ,
device . get ( " app_version " , " " ) ,
device . get ( " build_date " , " " ) ,
device . get ( " target_board " , " " ) ,
device . get ( " mem_size " , 0 ) ,
device . get ( " sdk_size " , 0 ) ,
device . get ( " free_space " , 0 ) ,
)
)
def devices ( self , devices ) :
"""
Shows the list of discovered devices
"""
for index , device in enumerate ( devices , 1 ) :
self . device ( index , device )
print ( )
def list_devices ( ) :
"""
Shows the list of discovered devices
"""
output_format = " {:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10} "
print ( output_format . format (
" # " ,
" HOSTNAME " ,
" IP " ,
" MAC " ,
" APP " ,
" VERSION " ,
" BUILD_DATE " ,
" DEVICE " ,
" MEM_SIZE " ,
" SDK_SIZE " ,
" FREE_SPACE "
) )
print ( " - " * 164 )
index = 0
for device in devices :
index + = 1
print ( output_format . format (
index ,
device . get ( ' hostname ' , ' ' ) ,
device . get ( ' ip ' , ' ' ) ,
device . get ( ' mac ' , ' ' ) ,
device . get ( ' app_name ' , ' ' ) ,
device . get ( ' app_version ' , ' ' ) ,
device . get ( ' build_date ' , ' ' ) ,
device . get ( ' target_board ' , ' ' ) ,
device . get ( ' mem_size ' , 0 ) ,
device . get ( ' sdk_size ' , 0 ) ,
device . get ( ' free_space ' , 0 ) ,
) )
print ( )
# Based on:
# https://github.com/balloob/pychromecast/blob/master/pychromecast/discovery.py
class Listener :
def __init__ ( self , print_when_discovered = True ) :
self . devices = [ ]
self . printer = Printer ( )
self . print_when_discovered = print_when_discovered
@property
def count ( self ) :
return len ( self . devices )
def add_service ( self , browser , service_type , name ) :
"""
Callback that adds discovered devices to " devices " list
"""
info = None
tries = 0
while info is None and tries < 4 :
try :
info = browser . get_service_info ( service_type , name )
except IOError :
break
tries + = 1
if not info :
print (
" ! error getting service info {} {} " . format ( service_type , name ) ,
file = sys . stderr ,
)
return
hostname = info . server . split ( " . " ) [ 0 ]
device = {
" hostname " : hostname . upper ( ) ,
" ip " : socket . inet_ntoa ( info . address ) ,
" mac " : " " ,
" app_name " : " " ,
" app_version " : " " ,
" build_date " : " " ,
" target_board " : " " ,
" mem_size " : 0 ,
" sdk_size " : 0 ,
" free_space " : 0 ,
}
for key , item in info . properties . items ( ) :
device [ key . decode ( " UTF-8 " ) ] = item . decode ( " UTF-8 " )
# rename fields (needed for sorting by name)
device [ " app " ] = device [ " app_name " ]
device [ " device " ] = device [ " target_board " ]
device [ " version " ] = device [ " app_version " ]
self . devices . append ( device )
if self . print_when_discovered :
self . printer . device ( self . count , device )
def print_devices ( self , devices = None ) :
if not devices :
devices = self . devices
self . printer . devices ( devices )
def get_boards ( ) :
"""
Grabs board types fro hardware . h file
Grabs board types from hardware.h file
"""
boards = [ ]
for line in open ( " espurna/config/hardware.h " ) :
m = re . search ( r ' defined \ (( \ w*) \ ) ' , line )
if m :
boards . append ( m . group ( 1 ) )
match = re . search ( r " ^#elif defined \ (( \ w*) \ ) ", line )
if match :
boards . append ( match . group ( 1 ) )
return sorted ( boards )
def get_device_size ( device ) :
if device . get ( ' mem_size ' , 0 ) == device . get ( ' sdk_size ' , 0 ) :
return int ( int ( device . get ( ' mem_size ' , 0 ) ) / 1024 )
if device . get ( " mem_size ", 0 ) == device . get ( " sdk_size ", 0 ) :
return int ( device . get ( " mem_size ", 0 ) ) / / 1024
return 0
@ -135,77 +166,79 @@ def get_empty_board():
"""
Returns the empty structure of a board to flash
"""
board = { ' board ' : ' ' , ' ip ' : ' ' , ' size ': 0 , ' auth ' : ' ' , ' flags ' : ' '}
board = { " board " : " " , " ip " : " " , " size ": 0 , " auth " : " " , " flags " : " "}
return board
def get_board_by_index ( index ) :
def get_board_by_index ( devices , index ) :
"""
Returns the required data to flash a given board
"""
board = { }
if 1 < = index < = len ( devices ) :
device = devices [ index - 1 ]
board [ ' hostname '] = device . get ( ' hostname ')
board [ ' board '] = device . get ( ' target_board ' , ' ')
board [ ' ip '] = device . get ( ' ip ' , ' ')
board [ ' size '] = get_device_size ( device )
board [ " hostname "] = device . get ( " hostname ")
board [ " board "] = device . get ( " target_board " , " ")
board [ " ip "] = device . get ( " ip " , " ")
board [ " size "] = get_device_size ( device )
return board
def get_board_by_mac ( mac ) :
def get_board_by_mac ( devices , mac ) :
"""
Returns the required data to flash a given board
"""
for device in devices :
if device . get ( ' mac ' , ' ') . lower ( ) == mac :
if device . get ( " mac " , " ") . lower ( ) == mac :
board = { }
board [ ' hostname '] = device . get ( ' hostname ')
board [ ' board '] = device . get ( ' device ')
board [ ' ip '] = device . get ( ' ip ')
board [ ' size '] = get_device_size ( device )
if not board [ ' board '] or not board [ ' ip '] or board [ ' size '] == 0 :
board [ " hostname "] = device . get ( " hostname ")
board [ " board "] = device . get ( " device ")
board [ " ip "] = device . get ( " ip ")
board [ " size "] = get_device_size ( device )
if not board [ " board "] or not board [ " ip "] or board [ " size "] == 0 :
return None
return board
return None
def get_board_by_hostname ( hostname ) :
def get_board_by_hostname ( devices , hostname ) :
"""
Returns the required data to flash a given board
"""
hostname = hostname . lower ( )
for device in devices :
if device . get ( ' hostname ' , ' ') . lower ( ) == hostname :
if device . get ( " hostname " , " ") . lower ( ) == hostname :
board = { }
board [ ' hostname '] = device . get ( ' hostname ')
board [ ' board '] = device . get ( ' target_board ')
board [ ' ip '] = device . get ( ' ip ')
board [ ' size '] = get_device_size ( device )
if not board [ ' board '] or not board [ ' ip '] or board [ ' size '] == 0 :
board [ " hostname "] = device . get ( " hostname ")
board [ " board "] = device . get ( " target_board ")
board [ " ip "] = device . get ( " ip ")
board [ " size "] = get_device_size ( device )
if not board [ " board "] or not board [ " ip "] or board [ " size "] == 0 :
return None
return board
return None
def input_board ( ) :
def input_board ( devices ) :
"""
Grabs info from the user about what device to flash
"""
# Choose the board
try :
index = int ( input ( " Choose the board you want to flash (empty if none of these): " ) )
index = int (
input ( " Choose the board you want to flash (empty if none of these): " )
)
except ValueError :
index = 0
if index < 0 or len ( devices ) < index :
print ( " Board number must be between 1 and {} \n " . format ( str ( len ( devices ) ) ) )
return None
board = get_board_by_index ( index )
board = get_board_by_index ( devices , index )
# Choose board type if none before
if len ( board . get ( ' board ' , ' ' ) ) == 0 :
if not board . get ( " board " ) :
print ( )
count = 1
@ -221,25 +254,30 @@ def input_board():
if index < 1 or len ( boards ) < index :
print ( " Board number must be between 1 and {} \n " . format ( str ( len ( boards ) ) ) )
return None
board [ ' board '] = boards [ index - 1 ]
board [ " board "] = boards [ index - 1 ]
# Choose board size of none before
if board . get ( ' size ' , 0 ) == 0 :
if not board . get ( " size " ) :
try :
board [ ' size ' ] = int ( input ( " Board memory size (1 for 1M, 2 for 2M, 4 for 4M): " ) )
board [ " size " ] = int (
input ( " Board memory size (1 for 1M, 2 for 2M, 4 for 4M): " )
)
except ValueError :
print ( " Wrong memory size " )
return None
# Choose IP of none before
if len ( board . get ( ' ip ' , ' ' ) ) == 0 :
board [ ' ip ' ] = input ( " IP of the device to flash (empty for 192.168.4.1): " ) or " 192.168.4.1 "
if not board . get ( " ip " ) :
board [ " ip " ] = (
input ( " IP of the device to flash (empty for 192.168.4.1): " )
or " 192.168.4.1 "
)
return board
def boardname ( board ) :
return board . get ( ' hostname ', board [ ' ip '] )
return board . get ( " hostname ", board [ " ip "] )
def store ( device , env ) :
@ -270,94 +308,147 @@ def run(device, env):
# -------------------------------------------------------------------------------
if __name__ == ' __main__ ' :
# Parse command line options
parser = argparse . ArgumentParser ( description = description )
parser . add_argument ( " -c " , " --core " , help = " flash ESPurna core " , default = 0 , action = ' count ' )
parser . add_argument ( " -f " , " --flash " , help = " flash device " , default = 0 , action = ' count ' )
parser . add_argument ( " -o " , " --flags " , help = " extra flags " , default = ' ' )
parser . add_argument ( " -p " , " --password " , help = " auth password " , default = ' ' )
parser . add_argument ( " -s " , " --sort " , help = " sort devices list by field " , default = ' hostname ' )
parser . add_argument ( " -y " , " --yes " , help = " do not ask for confirmation " , default = 0 , action = ' count ' )
parser . add_argument ( " hostnames " , nargs = ' * ' , help = " Hostnames to update " )
args = parser . parse_args ( )
def parse_commandline_args ( ) :
parser = argparse . ArgumentParser ( description = DESCRIPTION )
parser . add_argument (
" -c " , " --core " , help = " flash ESPurna core " , action = " store_true " , default = False
)
parser . add_argument (
" -f " , " --flash " , help = " flash device " , action = " store_true " , default = False
)
parser . add_argument ( " -o " , " --flags " , help = " extra flags " , default = " " )
parser . add_argument ( " -p " , " --password " , help = " auth password " , default = " " )
parser . add_argument ( " -s " , " --sort " , help = " sort devices list by field " , default = " " )
parser . add_argument (
" -y " ,
" --yes " ,
help = " do not ask for confirmation " ,
action = " store_true " ,
default = False ,
)
parser . add_argument (
" -t " ,
" --timeout " ,
type = int ,
help = " how long to wait for mDNS discovery " ,
default = DISCOVERY_TIMEOUT ,
)
parser . add_argument ( " hostnames " , nargs = " * " , help = " Hostnames to update " )
return parser . parse_args ( )
def discover_devices ( args ) :
# Look for services and try to immediatly print the device when it is discovered
# (unless --sort <field> is specified, then we will wait until discovery finishes
listener = Listener ( print_when_discovered = not args . sort )
print ( )
print ( description )
print ( )
try :
browser = zeroconf . ServiceBrowser (
zeroconf . Zeroconf ( ) , " _arduino._tcp.local. " , listener
)
except (
zeroconf . BadTypeInNameException ,
NotImplementedError ,
OSError ,
socket . error ,
zeroconf . NonUniqueNameException ,
) as exc :
print ( " ! error when creating service discovery browser: {} " . format ( exc ) )
sys . exit ( 1 )
# Look for services
zeroconf = Zeroconf ( )
browser = ServiceBrowser ( zeroconf , " _arduino._tcp.local. " , handlers = [ on_service_state_change ] )
discover_last = time . time ( )
while time . time ( ) < discover_last + DISCOVER_TIMEOUT :
pass
# zeroconf.close()
try :
time . sleep ( args . timeout )
except KeyboardInterrupt :
sys . exit ( 1 )
if len ( devices ) == 0 :
print ( " Nothing found! \n " )
sys . exit ( 0 )
try :
browser . zc . close ( )
except KeyboardInterrupt :
sys . exit ( 1 )
# Sort list
field = args . sort . lower ( )
if field not in devices [ 0 ] :
print ( " Unknown field ' {} ' \n " . format ( field ) )
if not listener . devices :
print ( " Nothing found! \n " )
sys . exit ( 1 )
devices = sorted ( devices , key = lambda device : device . get ( field , ' ' ) )
# List devices
list_devices ( )
devices = listener . devices
# Sort list by specified field name and# show devices overview
if args . sort :
field = args . sort . lower ( )
if field not in devices [ 0 ] :
print ( " Unknown field ' {} ' \n " . format ( field ) )
sys . exit ( 1 )
devices . sort ( key = lambda dev : dev . get ( field , " " ) )
listener . print_devices ( devices )
# Flash device
if args . flash > 0 :
return devices
def main ( args ) :
print ( )
print ( DESCRIPTION )
print ( )
devices = discover_devices ( args )
# Flash device only when --flash arg is provided
if args . flash :
# Board(s) to flash
queue = [ ]
# Check if hostnames
for hostname in args . hostnames :
board = get_board_by_hostname ( hostname )
board = get_board_by_hostname ( devices , hostname )
if board :
board [ ' auth ' ] = args . password
board [ ' flags ' ] = args . flags
board [ " auth "] = args . password
board [ " flags "] = args . flags
queue . append ( board )
# If no boards ask the user
if len ( queue ) == 0 :
board = input_board ( )
if not len ( queue ) :
board = input_board ( devices )
if board :
board [ ' auth ' ] = args . password or input ( " Authorization key of the device to flash: " )
board [ ' flags ' ] = args . flags or input ( " Extra flags for the build: " )
board [ " auth " ] = args . password or input (
" Authorization key of the device to flash: "
)
board [ " flags " ] = args . flags or input ( " Extra flags for the build: " )
queue . append ( board )
# If still no boards quit
if len ( queue ) == 0 :
if not len ( queue ) :
sys . exit ( 0 )
queue = sorted ( queue , key = lambda device : device . get ( ' board ' , ' ') )
queue . sort ( key = lambda dev : dev . get ( " board " , " ") )
# Flash each board
for board in queue :
# Flash core version?
if args . core > 0 :
board [ ' flags '] = " -DESPURNA_CORE " + board [ ' flags ']
if args . core :
board [ " flags "] = " -DESPURNA_CORE " + board [ " flags "]
env = " esp8266-{:d}m-ota " . format ( board [ ' size '] )
env = " esp8266-{:d}m-ota " . format ( board [ " size "] )
# Summary
print ( )
print ( " HOST = {} " . format ( boardname ( board ) ) )
print ( " IP = {} " . format ( board [ ' ip '] ) )
print ( " BOARD = {} " . format ( board [ ' board '] ) )
print ( " AUTH = {} " . format ( board [ ' auth '] ) )
print ( " FLAGS = {} " . format ( board [ ' flags '] ) )
print ( " IP = {} " . format ( board [ " ip "] ) )
print ( " BOARD = {} " . format ( board [ " board "] ) )
print ( " AUTH = {} " . format ( board [ " auth "] ) )
print ( " FLAGS = {} " . format ( board [ " flags "] ) )
print ( " ENV = {} " . format ( env ) )
response = True
if args . yes == 0 :
response = ( input ( " \n Are these values right [y/N]: " ) == " y " )
if not args . yes :
response = input ( " \n Are these values right [y/N]: " ) == " y "
if response :
print ( )
run ( board , env )
if __name__ == " __main__ " :
main ( parse_commandline_args ( ) )