Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

483 lines
13 KiB

#!/usr/bin/env python3
# pylint: disable=C0330
# coding=utf-8
# -------------------------------------------------------------------------------
# ESPurna OTA manager
# xose.perez@gmail.com
#
# Requires PlatformIO Core
# -------------------------------------------------------------------------------
import argparse
import os
import re
import shutil
import socket
import subprocess
import functools
import sys
import time
import zeroconf
# -------------------------------------------------------------------------------
__version__ = (0, 4, 2)
DESCRIPTION = "ESPurna OTA Manager v{}".format(".".join(str(x) for x in __version__))
DISCOVERY_TIMEOUT = 10
# -------------------------------------------------------------------------------
class Printer:
OUTPUT_FORMAT = "{:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10}"
def header(self):
print(
self.OUTPUT_FORMAT.format(
"#",
"HOSTNAME",
"IP",
"MAC",
"APP",
"VERSION",
"BUILD_DATE",
"DEVICE",
"MEM_SIZE",
"SDK_SIZE",
"FREE_SPACE",
)
)
print("-" * 164)
def device(self, index, device):
print(
self.OUTPUT_FORMAT.format(
index,
device.get("hostname", ""),
device.get("ip", ""),
device.get("mac", ""),
device.get("app_name", ""),
device.get("app_version", ""),
device.get("build_date", ""),
device.get("target_board", ""),
device.get("mem_size", 0),
device.get("sdk_size", 0),
device.get("free_space", 0),
)
)
def devices(self, devices):
"""
Shows the list of discovered devices
"""
for index, device in enumerate(devices, 1):
self.device(index, device)
print()
# Based on:
# https://github.com/balloob/pychromecast/blob/master/pychromecast/discovery.py
class Listener:
def __init__(self, print_when_discovered=True):
self.devices = []
self.printer = Printer()
self.print_when_discovered = print_when_discovered
@property
def count(self):
return len(self.devices)
def add_service(self, browser, service_type, name):
"""
Callback that adds discovered devices to "devices" list
"""
info = None
tries = 0
while info is None and tries < 4:
try:
info = browser.get_service_info(service_type, name)
except IOError:
break
tries += 1
if not info:
print(
"! error getting service info {} {}".format(service_type, name),
file=sys.stderr,
)
return
hostname = info.server.split(".")[0]
addresses = info.parsed_addresses()
device = {
"hostname": hostname.upper(),
"ip": addresses[0] if addresses else info.host,
"mac": "",
"app_name": "",
"app_version": "",
"build_date": "",
"target_board": "",
"mem_size": 0,
"sdk_size": 0,
"free_space": 0,
}
for key, item in info.properties.items():
device[key.decode("UTF-8")] = item.decode("UTF-8")
# rename fields (needed for sorting by name)
device["app"] = device["app_name"]
device["device"] = device["target_board"]
device["version"] = device["app_version"]
self.devices.append(device)
if self.print_when_discovered:
self.printer.device(self.count, device)
def print_devices(self, devices=None):
if not devices:
devices = self.devices
self.printer.devices(devices)
def get_boards():
"""
Grabs board types from hardware.h file
"""
boards = []
for line in open("espurna/config/hardware.h"):
match = re.search(r"^#elif defined\((\w*)\)", line)
if match:
boards.append(match.group(1))
return sorted(boards)
def get_device_size(device):
if device.get("mem_size", 0) == device.get("sdk_size", 0):
return int(device.get("mem_size", 0)) // 1024
return 0
def get_empty_board():
"""
Returns the empty structure of a board to flash
"""
board = {"board": "", "ip": "", "size": 0, "auth": "", "flags": ""}
return board
def get_board_by_index(devices, index):
"""
Returns the required data to flash a given board
"""
board = {}
if 1 <= index <= len(devices):
device = devices[index - 1]
board["hostname"] = device.get("hostname")
board["board"] = device.get("target_board", "")
board["ip"] = device.get("ip", "")
board["size"] = get_device_size(device)
return board
def get_board_by_mac(devices, mac):
"""
Returns the required data to flash a given board
"""
for device in devices:
if device.get("mac", "").lower() == mac:
board = {}
board["hostname"] = device.get("hostname")
board["board"] = device.get("device")
board["ip"] = device.get("ip")
board["size"] = get_device_size(device)
if not board["board"] or not board["ip"] or board["size"] == 0:
return None
return board
return None
def get_board_by_hostname(devices, hostname):
"""
Returns the required data to flash a given board
"""
hostname = hostname.lower()
for device in devices:
if device.get("hostname", "").lower() == hostname:
board = {}
board["hostname"] = device.get("hostname")
board["board"] = device.get("target_board")
board["ip"] = device.get("ip")
board["size"] = get_device_size(device)
if not board["board"] or not board["ip"] or board["size"] == 0:
return None
return board
return None
def input_board(devices):
"""
Grabs info from the user about what device to flash
"""
# Choose the board
try:
index = int(
input("Choose the board you want to flash (empty if none of these): ")
)
except ValueError:
index = 0
if index < 0 or len(devices) < index:
print("Board number must be between 1 and {}\n".format(str(len(devices))))
return None
board = get_board_by_index(devices, index)
# Choose board type if none before
if not board.get("board"):
print()
count = 1
boards = get_boards()
for name in boards:
print("{:3d}\t{}".format(count, name))
count += 1
print()
try:
index = int(input("Choose the board type you want to flash: "))
except ValueError:
index = 0
if index < 1 or len(boards) < index:
print("Board number must be between 1 and {}\n".format(str(len(boards))))
return None
board["board"] = boards[index - 1]
# Choose board size of none before
if not board.get("size"):
try:
board["size"] = int(
input("Board memory size (1 for 1M, 2 for 2M, 4 for 4M): ")
)
except ValueError:
print("Wrong memory size")
return None
# Choose IP of none before
if not board.get("ip"):
board["ip"] = (
input("IP of the device to flash (empty for 192.168.4.1): ")
or "192.168.4.1"
)
return board
def boardname(board):
return board.get("hostname", board["ip"])
def store(device, env):
source = ".pio/build/{}/firmware.elf".format(env)
destination = ".pio/build/elfs/{}.elf".format(boardname(device).lower())
dst_dir = os.path.dirname(destination)
if not os.path.exists(dst_dir):
os.mkdir(dst_dir)
shutil.move(source, destination)
def run(device, env):
print("Building and flashing image over-the-air...")
environ = os.environ.copy()
environ["ESPURNA_IP"] = device["ip"]
environ["ESPURNA_BOARD"] = device["board"]
environ["ESPURNA_AUTH"] = device["auth"]
environ["ESPURNA_FLAGS"] = device["flags"]
environ["ESPURNA_PIO_SHARED_LIBRARIES"] = "y"
command = ("platformio", "run", "--silent", "--environment", env, "-t", "upload")
subprocess.check_call(command, env=environ)
store(device, env)
# -------------------------------------------------------------------------------
def parse_commandline_args():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.add_argument(
"-c",
"--core",
help="Use ESPurna core configuration",
action="store_true",
default=False,
)
parser.add_argument(
"-f", "--flash", help="Flash device", action="store_true", default=False
)
parser.add_argument(
"-a",
"--arduino-core",
help="Arduino ESP8266 Core version",
default="2_3_0",
choices=["2_3_0", "latest", "git"],
)
parser.add_argument("-o", "--flags", help="extra flags", default="")
parser.add_argument("-p", "--password", help="auth password", default="")
parser.add_argument("-s", "--sort", help="sort devices list by field", default="")
parser.add_argument(
"-y",
"--yes",
help="do not ask for confirmation",
action="store_true",
default=False,
)
parser.add_argument(
"-t",
"--timeout",
type=int,
help="how long to wait for mDNS discovery",
default=DISCOVERY_TIMEOUT,
)
parser.add_argument("hostnames", nargs="*", help="Hostnames to update")
return parser.parse_args()
def discover_devices(args):
# Look for services and try to immediatly print the device when it is discovered
# (unless --sort <field> is specified, then we will wait until discovery finishes
listener = Listener(print_when_discovered=not args.sort)
try:
browser = zeroconf.ServiceBrowser(
zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
)
except (
zeroconf.BadTypeInNameException,
NotImplementedError,
OSError,
socket.error,
zeroconf.NonUniqueNameException,
) as exc:
print("! error when creating service discovery browser: {}".format(exc))
sys.exit(1)
try:
time.sleep(args.timeout)
except KeyboardInterrupt:
sys.exit(1)
try:
browser.zc.close()
except KeyboardInterrupt:
sys.exit(1)
if not listener.devices:
print("Nothing found!\n")
sys.exit(1)
devices = listener.devices
# Sort list by specified field name and# show devices overview
if args.sort:
field = args.sort.lower()
if field not in devices[0]:
print("Unknown field '{}'\n".format(field))
sys.exit(1)
devices.sort(key=lambda dev: dev.get(field, ""))
listener.print_devices(devices)
return devices
@functools.lru_cache(maxsize=None)
def get_platformio_env(arduino_core, size):
# todo: eventually 2_3_0 is dropped
# todo: naming
prefix = "esp8266"
if not size in [1, 2, 4]:
raise ValueError(
"Board memory size can only be one of: 1 for 1M, 2 for 2M, 4 for 4M"
)
core = ""
if arduino_core != "2_3_0":
core = "-{}".format(arduino_core)
return "{prefix}-{size:d}m{core}-base".format(prefix=prefix, core=core, size=size)
def main(args):
print()
print(DESCRIPTION)
print()
devices = discover_devices(args)
# Flash device only when --flash arg is provided
if args.flash:
# Board(s) to flash
queue = []
# Check if hostnames
for hostname in args.hostnames:
board = get_board_by_hostname(devices, hostname)
if board:
board["auth"] = args.password
board["flags"] = args.flags
queue.append(board)
# If no boards ask the user
if not len(queue):
board = input_board(devices)
if board:
board["auth"] = args.password or input(
"Authorization key of the device to flash: "
)
board["flags"] = args.flags or input("Extra flags for the build: ")
queue.append(board)
# If still no boards quit
if not len(queue):
sys.exit(0)
queue.sort(key=lambda dev: dev.get("board", ""))
# Flash each board
for board in queue:
# Flash core version?
if args.core:
board["flags"] = "-DESPURNA_CORE " + board["flags"]
env = get_platformio_env(args.arduino_core, board["size"])
# Summary
print()
print("HOST = {}".format(boardname(board)))
print("IP = {}".format(board["ip"]))
print("BOARD = {}".format(board["board"]))
print("AUTH = {}".format(board["auth"]))
print("FLAGS = {}".format(board["flags"]))
print("ENV = {}".format(env))
response = True
if not args.yes:
response = input("\nAre these values right [y/N]: ") == "y"
if response:
print()
run(board, env)
if __name__ == "__main__":
main(parse_commandline_args())