- #!/usr/bin/env python3
- # pylint: disable=C0330
- # coding=utf-8
- # -------------------------------------------------------------------------------
- # ESPurna OTA manager
- # xose.perez@gmail.com
- #
- # Requires PlatformIO Core
- # -------------------------------------------------------------------------------
-
- import argparse
- import os
- import re
- import shutil
- import socket
- import subprocess
- import functools
- import sys
- import time
-
- import zeroconf
-
- # -------------------------------------------------------------------------------
-
- __version__ = (0, 4, 2)
-
- DESCRIPTION = "ESPurna OTA Manager v{}".format(".".join(str(x) for x in __version__))
- DISCOVERY_TIMEOUT = 10
-
- # -------------------------------------------------------------------------------
-
-
- class Printer:
-
- OUTPUT_FORMAT = "{:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10}"
-
- def header(self):
- print(
- self.OUTPUT_FORMAT.format(
- "#",
- "HOSTNAME",
- "IP",
- "MAC",
- "APP",
- "VERSION",
- "BUILD_DATE",
- "DEVICE",
- "MEM_SIZE",
- "SDK_SIZE",
- "FREE_SPACE",
- )
- )
- print("-" * 164)
-
- def device(self, index, device):
- print(
- self.OUTPUT_FORMAT.format(
- index,
- device.get("hostname", ""),
- device.get("ip", ""),
- device.get("mac", ""),
- device.get("app_name", ""),
- device.get("app_version", ""),
- device.get("build_date", ""),
- device.get("target_board", ""),
- device.get("mem_size", 0),
- device.get("sdk_size", 0),
- device.get("free_space", 0),
- )
- )
-
- def devices(self, devices):
- """
- Shows the list of discovered devices
- """
- for index, device in enumerate(devices, 1):
- self.device(index, device)
-
- print()
-
-
- # Based on:
- # https://github.com/balloob/pychromecast/blob/master/pychromecast/discovery.py
- class Listener:
- def __init__(self, print_when_discovered=True):
- self.devices = []
- self.printer = Printer()
- self.print_when_discovered = print_when_discovered
-
- @property
- def count(self):
- return len(self.devices)
-
- def add_service(self, browser, service_type, name):
- """
- Callback that adds discovered devices to "devices" list
- """
-
- info = None
- tries = 0
- while info is None and tries < 4:
- try:
- info = browser.get_service_info(service_type, name)
- except IOError:
- break
- tries += 1
-
- if not info:
- print(
- "! error getting service info {} {}".format(service_type, name),
- file=sys.stderr,
- )
- return
-
- hostname = info.server.split(".")[0]
- addresses = info.parsed_addresses()
-
- device = {
- "hostname": hostname.upper(),
- "ip": addresses[0] if addresses else info.host,
- "mac": "",
- "app_name": "",
- "app_version": "",
- "build_date": "",
- "target_board": "",
- "mem_size": 0,
- "sdk_size": 0,
- "free_space": 0,
- }
-
- for key, item in info.properties.items():
- device[key.decode("UTF-8")] = item.decode("UTF-8")
-
- # rename fields (needed for sorting by name)
- device["app"] = device["app_name"]
- device["device"] = device["target_board"]
- device["version"] = device["app_version"]
-
- self.devices.append(device)
-
- if self.print_when_discovered:
- self.printer.device(self.count, device)
-
- def print_devices(self, devices=None):
- if not devices:
- devices = self.devices
- self.printer.devices(devices)
-
-
- def get_boards():
- """
- Grabs board types from hardware.h file
- """
- boards = []
- for line in open("espurna/config/hardware.h"):
- match = re.search(r"^#elif defined\((\w*)\)", line)
- if match:
- boards.append(match.group(1))
- return sorted(boards)
-
-
- def get_device_size(device):
- if device.get("mem_size", 0) == device.get("sdk_size", 0):
- return int(device.get("mem_size", 0)) // 1024
- return 0
-
-
- def get_empty_board():
- """
- Returns the empty structure of a board to flash
- """
- board = {"board": "", "ip": "", "size": 0, "auth": "", "flags": ""}
- return board
-
-
- def get_board_by_index(devices, index):
- """
- Returns the required data to flash a given board
- """
- board = {}
- if 1 <= index <= len(devices):
- device = devices[index - 1]
- board["hostname"] = device.get("hostname")
- board["board"] = device.get("target_board", "")
- board["ip"] = device.get("ip", "")
- board["size"] = get_device_size(device)
- return board
-
-
- def get_board_by_mac(devices, mac):
- """
- Returns the required data to flash a given board
- """
- for device in devices:
- if device.get("mac", "").lower() == mac:
- board = {}
- board["hostname"] = device.get("hostname")
- board["board"] = device.get("device")
- board["ip"] = device.get("ip")
- board["size"] = get_device_size(device)
- if not board["board"] or not board["ip"] or board["size"] == 0:
- return None
- return board
- return None
-
-
- def get_board_by_hostname(devices, hostname):
- """
- Returns the required data to flash a given board
- """
- hostname = hostname.lower()
- for device in devices:
- if device.get("hostname", "").lower() == hostname:
- board = {}
- board["hostname"] = device.get("hostname")
- board["board"] = device.get("target_board")
- board["ip"] = device.get("ip")
- board["size"] = get_device_size(device)
- if not board["board"] or not board["ip"] or board["size"] == 0:
- return None
- return board
- return None
-
-
- def input_board(devices):
- """
- Grabs info from the user about what device to flash
- """
-
- # Choose the board
- try:
- index = int(
- input("Choose the board you want to flash (empty if none of these): ")
- )
- except ValueError:
- index = 0
- if index < 0 or len(devices) < index:
- print("Board number must be between 1 and {}\n".format(str(len(devices))))
- return None
-
- board = get_board_by_index(devices, index)
-
- # Choose board type if none before
- if not board.get("board"):
-
- print()
- count = 1
- boards = get_boards()
- for name in boards:
- print("{:3d}\t{}".format(count, name))
- count += 1
- print()
- try:
- index = int(input("Choose the board type you want to flash: "))
- except ValueError:
- index = 0
- if index < 1 or len(boards) < index:
- print("Board number must be between 1 and {}\n".format(str(len(boards))))
- return None
- board["board"] = boards[index - 1]
-
- # Choose board size of none before
- if not board.get("size"):
- try:
- board["size"] = int(
- input("Board memory size (1 for 1M, 2 for 2M, 4 for 4M): ")
- )
- except ValueError:
- print("Wrong memory size")
- return None
-
- # Choose IP of none before
- if not board.get("ip"):
- board["ip"] = (
- input("IP of the device to flash (empty for 192.168.4.1): ")
- or "192.168.4.1"
- )
-
- return board
-
-
- def boardname(board):
- return board.get("hostname", board["ip"])
-
-
- def store(device, env):
- source = ".pio/build/{}/firmware.elf".format(env)
- destination = ".pio/build/elfs/{}.elf".format(boardname(device).lower())
-
- dst_dir = os.path.dirname(destination)
- if not os.path.exists(dst_dir):
- os.mkdir(dst_dir)
-
- shutil.move(source, destination)
-
-
- def run(device, env):
- print("Building and flashing image over-the-air...")
- environ = os.environ.copy()
- environ["ESPURNA_IP"] = device["ip"]
- environ["ESPURNA_BOARD"] = device["board"]
- environ["ESPURNA_AUTH"] = device["auth"]
- environ["ESPURNA_FLAGS"] = device["flags"]
- environ["ESPURNA_PIO_SHARED_LIBRARIES"] = "y"
-
- command = ("platformio", "run", "--silent", "--environment", env, "-t", "upload")
- subprocess.check_call(command, env=environ)
-
- store(device, env)
-
-
- # -------------------------------------------------------------------------------
-
-
- def parse_commandline_args():
- parser = argparse.ArgumentParser(description=DESCRIPTION)
- parser.add_argument(
- "-c",
- "--core",
- help="Use ESPurna core configuration",
- action="store_true",
- default=False,
- )
- parser.add_argument(
- "-f", "--flash", help="Flash device", action="store_true", default=False
- )
- parser.add_argument(
- "-a",
- "--arduino-core",
- help="Arduino ESP8266 Core version",
- default="2_3_0",
- choices=["2_3_0", "latest", "git"],
- )
- parser.add_argument("-o", "--flags", help="extra flags", default="")
- parser.add_argument("-p", "--password", help="auth password", default="")
- parser.add_argument("-s", "--sort", help="sort devices list by field", default="")
- parser.add_argument(
- "-y",
- "--yes",
- help="do not ask for confirmation",
- action="store_true",
- default=False,
- )
- parser.add_argument(
- "-t",
- "--timeout",
- type=int,
- help="how long to wait for mDNS discovery",
- default=DISCOVERY_TIMEOUT,
- )
- parser.add_argument("hostnames", nargs="*", help="Hostnames to update")
- return parser.parse_args()
-
-
- def discover_devices(args):
- # Look for services and try to immediatly print the device when it is discovered
- # (unless --sort <field> is specified, then we will wait until discovery finishes
- listener = Listener(print_when_discovered=not args.sort)
-
- try:
- browser = zeroconf.ServiceBrowser(
- zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
- )
- except (
- zeroconf.BadTypeInNameException,
- NotImplementedError,
- OSError,
- socket.error,
- zeroconf.NonUniqueNameException,
- ) as exc:
- print("! error when creating service discovery browser: {}".format(exc))
- sys.exit(1)
-
- try:
- time.sleep(args.timeout)
- except KeyboardInterrupt:
- sys.exit(1)
-
- try:
- browser.zc.close()
- except KeyboardInterrupt:
- sys.exit(1)
-
- if not listener.devices:
- print("Nothing found!\n")
- sys.exit(1)
-
- devices = listener.devices
-
- # Sort list by specified field name and# show devices overview
- if args.sort:
- field = args.sort.lower()
- if field not in devices[0]:
- print("Unknown field '{}'\n".format(field))
- sys.exit(1)
- devices.sort(key=lambda dev: dev.get(field, ""))
-
- listener.print_devices(devices)
-
- return devices
-
-
- @functools.lru_cache(maxsize=None)
- def get_platformio_env(arduino_core, size):
- # todo: eventually 2_3_0 is dropped
- # todo: naming
- prefix = "esp8266"
- if not size in [1, 2, 4]:
- raise ValueError(
- "Board memory size can only be one of: 1 for 1M, 2 for 2M, 4 for 4M"
- )
- core = ""
- if arduino_core != "2_3_0":
- core = "-{}".format(arduino_core)
- return "{prefix}-{size:d}m{core}-base".format(prefix=prefix, core=core, size=size)
-
-
- def main(args):
-
- print()
- print(DESCRIPTION)
- print()
-
- devices = discover_devices(args)
-
- # Flash device only when --flash arg is provided
- if args.flash:
-
- # Board(s) to flash
- queue = []
-
- # Check if hostnames
- for hostname in args.hostnames:
- board = get_board_by_hostname(devices, hostname)
- if board:
- board["auth"] = args.password
- board["flags"] = args.flags
- queue.append(board)
-
- # If no boards ask the user
- if not len(queue):
- board = input_board(devices)
- if board:
- board["auth"] = args.password or input(
- "Authorization key of the device to flash: "
- )
- board["flags"] = args.flags or input("Extra flags for the build: ")
- queue.append(board)
-
- # If still no boards quit
- if not len(queue):
- sys.exit(0)
-
- queue.sort(key=lambda dev: dev.get("board", ""))
-
- # Flash each board
- for board in queue:
-
- # Flash core version?
- if args.core:
- board["flags"] = "-DESPURNA_CORE " + board["flags"]
-
- env = get_platformio_env(args.arduino_core, board["size"])
-
- # Summary
- print()
- print("HOST = {}".format(boardname(board)))
- print("IP = {}".format(board["ip"]))
- print("BOARD = {}".format(board["board"]))
- print("AUTH = {}".format(board["auth"]))
- print("FLAGS = {}".format(board["flags"]))
- print("ENV = {}".format(env))
-
- response = True
- if not args.yes:
- response = input("\nAre these values right [y/N]: ") == "y"
- if response:
- print()
- run(board, env)
-
-
- if __name__ == "__main__":
- main(parse_commandline_args())
|