Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

480 lines
13 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. #!/usr/bin/env python3
  2. # pylint: disable=C0330
  3. # coding=utf-8
  4. # -------------------------------------------------------------------------------
  5. # ESPurna OTA manager
  6. # xose.perez@gmail.com
  7. #
  8. # Requires PlatformIO Core
  9. # -------------------------------------------------------------------------------
  10. import argparse
  11. import os
  12. import re
  13. import shutil
  14. import socket
  15. import subprocess
  16. import functools
  17. import sys
  18. import time
  19. import zeroconf
  20. # -------------------------------------------------------------------------------
  21. __version__ = (0, 4, 1)
  22. DESCRIPTION = "ESPurna OTA Manager v{}".format(".".join(str(x) for x in __version__))
  23. DISCOVERY_TIMEOUT = 10
  24. # -------------------------------------------------------------------------------
  25. class Printer:
  26. OUTPUT_FORMAT = "{:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10}"
  27. def header(self):
  28. print(
  29. self.OUTPUT_FORMAT.format(
  30. "#",
  31. "HOSTNAME",
  32. "IP",
  33. "MAC",
  34. "APP",
  35. "VERSION",
  36. "BUILD_DATE",
  37. "DEVICE",
  38. "MEM_SIZE",
  39. "SDK_SIZE",
  40. "FREE_SPACE",
  41. )
  42. )
  43. print("-" * 164)
  44. def device(self, index, device):
  45. print(
  46. self.OUTPUT_FORMAT.format(
  47. index,
  48. device.get("hostname", ""),
  49. device.get("ip", ""),
  50. device.get("mac", ""),
  51. device.get("app_name", ""),
  52. device.get("app_version", ""),
  53. device.get("build_date", ""),
  54. device.get("target_board", ""),
  55. device.get("mem_size", 0),
  56. device.get("sdk_size", 0),
  57. device.get("free_space", 0),
  58. )
  59. )
  60. def devices(self, devices):
  61. """
  62. Shows the list of discovered devices
  63. """
  64. for index, device in enumerate(devices, 1):
  65. self.device(index, device)
  66. print()
  67. # Based on:
  68. # https://github.com/balloob/pychromecast/blob/master/pychromecast/discovery.py
  69. class Listener:
  70. def __init__(self, print_when_discovered=True):
  71. self.devices = []
  72. self.printer = Printer()
  73. self.print_when_discovered = print_when_discovered
  74. @property
  75. def count(self):
  76. return len(self.devices)
  77. def add_service(self, browser, service_type, name):
  78. """
  79. Callback that adds discovered devices to "devices" list
  80. """
  81. info = None
  82. tries = 0
  83. while info is None and tries < 4:
  84. try:
  85. info = browser.get_service_info(service_type, name)
  86. except IOError:
  87. break
  88. tries += 1
  89. if not info:
  90. print(
  91. "! error getting service info {} {}".format(service_type, name),
  92. file=sys.stderr,
  93. )
  94. return
  95. hostname = info.server.split(".")[0]
  96. device = {
  97. "hostname": hostname.upper(),
  98. "ip": socket.inet_ntoa(info.address),
  99. "mac": "",
  100. "app_name": "",
  101. "app_version": "",
  102. "build_date": "",
  103. "target_board": "",
  104. "mem_size": 0,
  105. "sdk_size": 0,
  106. "free_space": 0,
  107. }
  108. for key, item in info.properties.items():
  109. device[key.decode("UTF-8")] = item.decode("UTF-8")
  110. # rename fields (needed for sorting by name)
  111. device["app"] = device["app_name"]
  112. device["device"] = device["target_board"]
  113. device["version"] = device["app_version"]
  114. self.devices.append(device)
  115. if self.print_when_discovered:
  116. self.printer.device(self.count, device)
  117. def print_devices(self, devices=None):
  118. if not devices:
  119. devices = self.devices
  120. self.printer.devices(devices)
  121. def get_boards():
  122. """
  123. Grabs board types from hardware.h file
  124. """
  125. boards = []
  126. for line in open("espurna/config/hardware.h"):
  127. match = re.search(r"^#elif defined\((\w*)\)", line)
  128. if match:
  129. boards.append(match.group(1))
  130. return sorted(boards)
  131. def get_device_size(device):
  132. if device.get("mem_size", 0) == device.get("sdk_size", 0):
  133. return int(device.get("mem_size", 0)) // 1024
  134. return 0
  135. def get_empty_board():
  136. """
  137. Returns the empty structure of a board to flash
  138. """
  139. board = {"board": "", "ip": "", "size": 0, "auth": "", "flags": ""}
  140. return board
  141. def get_board_by_index(devices, index):
  142. """
  143. Returns the required data to flash a given board
  144. """
  145. board = {}
  146. if 1 <= index <= len(devices):
  147. device = devices[index - 1]
  148. board["hostname"] = device.get("hostname")
  149. board["board"] = device.get("target_board", "")
  150. board["ip"] = device.get("ip", "")
  151. board["size"] = get_device_size(device)
  152. return board
  153. def get_board_by_mac(devices, mac):
  154. """
  155. Returns the required data to flash a given board
  156. """
  157. for device in devices:
  158. if device.get("mac", "").lower() == mac:
  159. board = {}
  160. board["hostname"] = device.get("hostname")
  161. board["board"] = device.get("device")
  162. board["ip"] = device.get("ip")
  163. board["size"] = get_device_size(device)
  164. if not board["board"] or not board["ip"] or board["size"] == 0:
  165. return None
  166. return board
  167. return None
  168. def get_board_by_hostname(devices, hostname):
  169. """
  170. Returns the required data to flash a given board
  171. """
  172. hostname = hostname.lower()
  173. for device in devices:
  174. if device.get("hostname", "").lower() == hostname:
  175. board = {}
  176. board["hostname"] = device.get("hostname")
  177. board["board"] = device.get("target_board")
  178. board["ip"] = device.get("ip")
  179. board["size"] = get_device_size(device)
  180. if not board["board"] or not board["ip"] or board["size"] == 0:
  181. return None
  182. return board
  183. return None
  184. def input_board(devices):
  185. """
  186. Grabs info from the user about what device to flash
  187. """
  188. # Choose the board
  189. try:
  190. index = int(
  191. input("Choose the board you want to flash (empty if none of these): ")
  192. )
  193. except ValueError:
  194. index = 0
  195. if index < 0 or len(devices) < index:
  196. print("Board number must be between 1 and {}\n".format(str(len(devices))))
  197. return None
  198. board = get_board_by_index(devices, index)
  199. # Choose board type if none before
  200. if not board.get("board"):
  201. print()
  202. count = 1
  203. boards = get_boards()
  204. for name in boards:
  205. print("{:3d}\t{}".format(count, name))
  206. count += 1
  207. print()
  208. try:
  209. index = int(input("Choose the board type you want to flash: "))
  210. except ValueError:
  211. index = 0
  212. if index < 1 or len(boards) < index:
  213. print("Board number must be between 1 and {}\n".format(str(len(boards))))
  214. return None
  215. board["board"] = boards[index - 1]
  216. # Choose board size of none before
  217. if not board.get("size"):
  218. try:
  219. board["size"] = int(
  220. input("Board memory size (1 for 1M, 2 for 2M, 4 for 4M): ")
  221. )
  222. except ValueError:
  223. print("Wrong memory size")
  224. return None
  225. # Choose IP of none before
  226. if not board.get("ip"):
  227. board["ip"] = (
  228. input("IP of the device to flash (empty for 192.168.4.1): ")
  229. or "192.168.4.1"
  230. )
  231. return board
  232. def boardname(board):
  233. return board.get("hostname", board["ip"])
  234. def store(device, env):
  235. source = ".pio/build/{}/firmware.elf".format(env)
  236. destination = ".pio/build/elfs/{}.elf".format(boardname(device).lower())
  237. dst_dir = os.path.dirname(destination)
  238. if not os.path.exists(dst_dir):
  239. os.mkdir(dst_dir)
  240. shutil.move(source, destination)
  241. def run(device, env):
  242. print("Building and flashing image over-the-air...")
  243. environ = os.environ.copy()
  244. environ["ESPURNA_IP"] = device["ip"]
  245. environ["ESPURNA_BOARD"] = device["board"]
  246. environ["ESPURNA_AUTH"] = device["auth"]
  247. environ["ESPURNA_FLAGS"] = device["flags"]
  248. environ["ESPURNA_PIO_SHARED_LIBRARIES"] = "y"
  249. command = ("platformio", "run", "--silent", "--environment", env, "-t", "upload")
  250. subprocess.check_call(command, env=environ)
  251. store(device, env)
  252. # -------------------------------------------------------------------------------
  253. def parse_commandline_args():
  254. parser = argparse.ArgumentParser(description=DESCRIPTION)
  255. parser.add_argument(
  256. "-c",
  257. "--core",
  258. help="Use ESPurna core configuration",
  259. action="store_true",
  260. default=False,
  261. )
  262. parser.add_argument(
  263. "-f", "--flash", help="Flash device", action="store_true", default=False
  264. )
  265. parser.add_argument(
  266. "-a",
  267. "--arduino-core",
  268. help="Arduino ESP8266 Core version",
  269. default="2_3_0",
  270. choices=["2_3_0", "latest", "git"],
  271. )
  272. parser.add_argument("-o", "--flags", help="extra flags", default="")
  273. parser.add_argument("-p", "--password", help="auth password", default="")
  274. parser.add_argument("-s", "--sort", help="sort devices list by field", default="")
  275. parser.add_argument(
  276. "-y",
  277. "--yes",
  278. help="do not ask for confirmation",
  279. action="store_true",
  280. default=False,
  281. )
  282. parser.add_argument(
  283. "-t",
  284. "--timeout",
  285. type=int,
  286. help="how long to wait for mDNS discovery",
  287. default=DISCOVERY_TIMEOUT,
  288. )
  289. parser.add_argument("hostnames", nargs="*", help="Hostnames to update")
  290. return parser.parse_args()
  291. def discover_devices(args):
  292. # Look for services and try to immediatly print the device when it is discovered
  293. # (unless --sort <field> is specified, then we will wait until discovery finishes
  294. listener = Listener(print_when_discovered=not args.sort)
  295. try:
  296. browser = zeroconf.ServiceBrowser(
  297. zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
  298. )
  299. except (
  300. zeroconf.BadTypeInNameException,
  301. NotImplementedError,
  302. OSError,
  303. socket.error,
  304. zeroconf.NonUniqueNameException,
  305. ) as exc:
  306. print("! error when creating service discovery browser: {}".format(exc))
  307. sys.exit(1)
  308. try:
  309. time.sleep(args.timeout)
  310. except KeyboardInterrupt:
  311. sys.exit(1)
  312. try:
  313. browser.zc.close()
  314. except KeyboardInterrupt:
  315. sys.exit(1)
  316. if not listener.devices:
  317. print("Nothing found!\n")
  318. sys.exit(1)
  319. devices = listener.devices
  320. # Sort list by specified field name and# show devices overview
  321. if args.sort:
  322. field = args.sort.lower()
  323. if field not in devices[0]:
  324. print("Unknown field '{}'\n".format(field))
  325. sys.exit(1)
  326. devices.sort(key=lambda dev: dev.get(field, ""))
  327. listener.print_devices(devices)
  328. return devices
  329. @functools.lru_cache(maxsize=None)
  330. def get_platformio_env(arduino_core, size):
  331. # todo: eventually 2_3_0 is dropped
  332. # todo: naming
  333. env_prefix = "esp8266"
  334. if not size in [1, 2, 4]:
  335. raise ValueError(
  336. "Board memory size can only be one of: 1 for 1M, 2 for 2M, 4 for 4M"
  337. )
  338. if arduino_core != "2_3_0":
  339. env_prefix = "{}-{}".format(env_prefix, arduino_core)
  340. return "{env_prefix}-{size:d}m-ota".format(env_prefix=env_prefix, size=size)
  341. def main(args):
  342. print()
  343. print(DESCRIPTION)
  344. print()
  345. devices = discover_devices(args)
  346. # Flash device only when --flash arg is provided
  347. if args.flash:
  348. # Board(s) to flash
  349. queue = []
  350. # Check if hostnames
  351. for hostname in args.hostnames:
  352. board = get_board_by_hostname(devices, hostname)
  353. if board:
  354. board["auth"] = args.password
  355. board["flags"] = args.flags
  356. queue.append(board)
  357. # If no boards ask the user
  358. if not len(queue):
  359. board = input_board(devices)
  360. if board:
  361. board["auth"] = args.password or input(
  362. "Authorization key of the device to flash: "
  363. )
  364. board["flags"] = args.flags or input("Extra flags for the build: ")
  365. queue.append(board)
  366. # If still no boards quit
  367. if not len(queue):
  368. sys.exit(0)
  369. queue.sort(key=lambda dev: dev.get("board", ""))
  370. # Flash each board
  371. for board in queue:
  372. # Flash core version?
  373. if args.core:
  374. board["flags"] = "-DESPURNA_CORE " + board["flags"]
  375. env = get_platformio_env(args.arduino_core, board["size"])
  376. # Summary
  377. print()
  378. print("HOST = {}".format(boardname(board)))
  379. print("IP = {}".format(board["ip"]))
  380. print("BOARD = {}".format(board["board"]))
  381. print("AUTH = {}".format(board["auth"]))
  382. print("FLAGS = {}".format(board["flags"]))
  383. print("ENV = {}".format(env))
  384. response = True
  385. if not args.yes:
  386. response = input("\nAre these values right [y/N]: ") == "y"
  387. if response:
  388. print()
  389. run(board, env)
  390. if __name__ == "__main__":
  391. main(parse_commandline_args())