Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

454 lines
13 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. #!/usr/bin/env python3
  2. # pylint: disable=C0330
  3. # coding=utf-8
  4. # -------------------------------------------------------------------------------
  5. # ESPurna OTA manager
  6. # xose.perez@gmail.com
  7. #
  8. # Requires PlatformIO Core
  9. # -------------------------------------------------------------------------------
  10. import argparse
  11. import os
  12. import re
  13. import shutil
  14. import socket
  15. import subprocess
  16. import sys
  17. import time
  18. import zeroconf
  19. # -------------------------------------------------------------------------------
  20. __version__ = (0, 4)
  21. DESCRIPTION = "ESPurna OTA Manager v{}".format(".".join(str(x) for x in __version__))
  22. DISCOVERY_TIMEOUT = 10
  23. # -------------------------------------------------------------------------------
  24. class Printer:
  25. OUTPUT_FORMAT = "{:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10}"
  26. def header(self):
  27. print(
  28. self.OUTPUT_FORMAT.format(
  29. "#",
  30. "HOSTNAME",
  31. "IP",
  32. "MAC",
  33. "APP",
  34. "VERSION",
  35. "BUILD_DATE",
  36. "DEVICE",
  37. "MEM_SIZE",
  38. "SDK_SIZE",
  39. "FREE_SPACE",
  40. )
  41. )
  42. print("-" * 164)
  43. def device(self, index, device):
  44. print(
  45. self.OUTPUT_FORMAT.format(
  46. index,
  47. device.get("hostname", ""),
  48. device.get("ip", ""),
  49. device.get("mac", ""),
  50. device.get("app_name", ""),
  51. device.get("app_version", ""),
  52. device.get("build_date", ""),
  53. device.get("target_board", ""),
  54. device.get("mem_size", 0),
  55. device.get("sdk_size", 0),
  56. device.get("free_space", 0),
  57. )
  58. )
  59. def devices(self, devices):
  60. """
  61. Shows the list of discovered devices
  62. """
  63. for index, device in enumerate(devices, 1):
  64. self.device(index, device)
  65. print()
  66. # Based on:
  67. # https://github.com/balloob/pychromecast/blob/master/pychromecast/discovery.py
  68. class Listener:
  69. def __init__(self, print_when_discovered=True):
  70. self.devices = []
  71. self.printer = Printer()
  72. self.print_when_discovered = print_when_discovered
  73. @property
  74. def count(self):
  75. return len(self.devices)
  76. def add_service(self, browser, service_type, name):
  77. """
  78. Callback that adds discovered devices to "devices" list
  79. """
  80. info = None
  81. tries = 0
  82. while info is None and tries < 4:
  83. try:
  84. info = browser.get_service_info(service_type, name)
  85. except IOError:
  86. break
  87. tries += 1
  88. if not info:
  89. print(
  90. "! error getting service info {} {}".format(service_type, name),
  91. file=sys.stderr,
  92. )
  93. return
  94. hostname = info.server.split(".")[0]
  95. device = {
  96. "hostname": hostname.upper(),
  97. "ip": socket.inet_ntoa(info.address),
  98. "mac": "",
  99. "app_name": "",
  100. "app_version": "",
  101. "build_date": "",
  102. "target_board": "",
  103. "mem_size": 0,
  104. "sdk_size": 0,
  105. "free_space": 0,
  106. }
  107. for key, item in info.properties.items():
  108. device[key.decode("UTF-8")] = item.decode("UTF-8")
  109. # rename fields (needed for sorting by name)
  110. device["app"] = device["app_name"]
  111. device["device"] = device["target_board"]
  112. device["version"] = device["app_version"]
  113. self.devices.append(device)
  114. if self.print_when_discovered:
  115. self.printer.device(self.count, device)
  116. def print_devices(self, devices=None):
  117. if not devices:
  118. devices = self.devices
  119. self.printer.devices(devices)
  120. def get_boards():
  121. """
  122. Grabs board types from hardware.h file
  123. """
  124. boards = []
  125. for line in open("espurna/config/hardware.h"):
  126. match = re.search(r"^#elif defined\((\w*)\)", line)
  127. if match:
  128. boards.append(match.group(1))
  129. return sorted(boards)
  130. def get_device_size(device):
  131. if device.get("mem_size", 0) == device.get("sdk_size", 0):
  132. return int(device.get("mem_size", 0)) // 1024
  133. return 0
  134. def get_empty_board():
  135. """
  136. Returns the empty structure of a board to flash
  137. """
  138. board = {"board": "", "ip": "", "size": 0, "auth": "", "flags": ""}
  139. return board
  140. def get_board_by_index(devices, index):
  141. """
  142. Returns the required data to flash a given board
  143. """
  144. board = {}
  145. if 1 <= index <= len(devices):
  146. device = devices[index - 1]
  147. board["hostname"] = device.get("hostname")
  148. board["board"] = device.get("target_board", "")
  149. board["ip"] = device.get("ip", "")
  150. board["size"] = get_device_size(device)
  151. return board
  152. def get_board_by_mac(devices, mac):
  153. """
  154. Returns the required data to flash a given board
  155. """
  156. for device in devices:
  157. if device.get("mac", "").lower() == mac:
  158. board = {}
  159. board["hostname"] = device.get("hostname")
  160. board["board"] = device.get("device")
  161. board["ip"] = device.get("ip")
  162. board["size"] = get_device_size(device)
  163. if not board["board"] or not board["ip"] or board["size"] == 0:
  164. return None
  165. return board
  166. return None
  167. def get_board_by_hostname(devices, hostname):
  168. """
  169. Returns the required data to flash a given board
  170. """
  171. hostname = hostname.lower()
  172. for device in devices:
  173. if device.get("hostname", "").lower() == hostname:
  174. board = {}
  175. board["hostname"] = device.get("hostname")
  176. board["board"] = device.get("target_board")
  177. board["ip"] = device.get("ip")
  178. board["size"] = get_device_size(device)
  179. if not board["board"] or not board["ip"] or board["size"] == 0:
  180. return None
  181. return board
  182. return None
  183. def input_board(devices):
  184. """
  185. Grabs info from the user about what device to flash
  186. """
  187. # Choose the board
  188. try:
  189. index = int(
  190. input("Choose the board you want to flash (empty if none of these): ")
  191. )
  192. except ValueError:
  193. index = 0
  194. if index < 0 or len(devices) < index:
  195. print("Board number must be between 1 and {}\n".format(str(len(devices))))
  196. return None
  197. board = get_board_by_index(devices, index)
  198. # Choose board type if none before
  199. if not board.get("board"):
  200. print()
  201. count = 1
  202. boards = get_boards()
  203. for name in boards:
  204. print("{:3d}\t{}".format(count, name))
  205. count += 1
  206. print()
  207. try:
  208. index = int(input("Choose the board type you want to flash: "))
  209. except ValueError:
  210. index = 0
  211. if index < 1 or len(boards) < index:
  212. print("Board number must be between 1 and {}\n".format(str(len(boards))))
  213. return None
  214. board["board"] = boards[index - 1]
  215. # Choose board size of none before
  216. if not board.get("size"):
  217. try:
  218. board["size"] = int(
  219. input("Board memory size (1 for 1M, 2 for 2M, 4 for 4M): ")
  220. )
  221. except ValueError:
  222. print("Wrong memory size")
  223. return None
  224. # Choose IP of none before
  225. if not board.get("ip"):
  226. board["ip"] = (
  227. input("IP of the device to flash (empty for 192.168.4.1): ")
  228. or "192.168.4.1"
  229. )
  230. return board
  231. def boardname(board):
  232. return board.get("hostname", board["ip"])
  233. def store(device, env):
  234. source = ".pio/build/{}/firmware.elf".format(env)
  235. destination = ".pio/build/elfs/{}.elf".format(boardname(device).lower())
  236. dst_dir = os.path.dirname(destination)
  237. if not os.path.exists(dst_dir):
  238. os.mkdir(dst_dir)
  239. shutil.move(source, destination)
  240. def run(device, env):
  241. print("Building and flashing image over-the-air...")
  242. environ = os.environ.copy()
  243. environ["ESPURNA_IP"] = device["ip"]
  244. environ["ESPURNA_BOARD"] = device["board"]
  245. environ["ESPURNA_AUTH"] = device["auth"]
  246. environ["ESPURNA_FLAGS"] = device["flags"]
  247. environ["ESPURNA_PIO_SHARED_LIBRARIES"] = "y"
  248. command = ("platformio", "run", "--silent", "--environment", env, "-t", "upload")
  249. subprocess.check_call(command, env=environ)
  250. store(device, env)
  251. # -------------------------------------------------------------------------------
  252. def parse_commandline_args():
  253. parser = argparse.ArgumentParser(description=DESCRIPTION)
  254. parser.add_argument(
  255. "-c", "--core", help="flash ESPurna core", action="store_true", default=False
  256. )
  257. parser.add_argument(
  258. "-f", "--flash", help="flash device", action="store_true", default=False
  259. )
  260. parser.add_argument("-o", "--flags", help="extra flags", default="")
  261. parser.add_argument("-p", "--password", help="auth password", default="")
  262. parser.add_argument("-s", "--sort", help="sort devices list by field", default="")
  263. parser.add_argument(
  264. "-y",
  265. "--yes",
  266. help="do not ask for confirmation",
  267. action="store_true",
  268. default=False,
  269. )
  270. parser.add_argument(
  271. "-t",
  272. "--timeout",
  273. type=int,
  274. help="how long to wait for mDNS discovery",
  275. default=DISCOVERY_TIMEOUT,
  276. )
  277. parser.add_argument("hostnames", nargs="*", help="Hostnames to update")
  278. return parser.parse_args()
  279. def discover_devices(args):
  280. # Look for services and try to immediatly print the device when it is discovered
  281. # (unless --sort <field> is specified, then we will wait until discovery finishes
  282. listener = Listener(print_when_discovered=not args.sort)
  283. try:
  284. browser = zeroconf.ServiceBrowser(
  285. zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
  286. )
  287. except (
  288. zeroconf.BadTypeInNameException,
  289. NotImplementedError,
  290. OSError,
  291. socket.error,
  292. zeroconf.NonUniqueNameException,
  293. ) as exc:
  294. print("! error when creating service discovery browser: {}".format(exc))
  295. sys.exit(1)
  296. try:
  297. time.sleep(args.timeout)
  298. except KeyboardInterrupt:
  299. sys.exit(1)
  300. try:
  301. browser.zc.close()
  302. except KeyboardInterrupt:
  303. sys.exit(1)
  304. if not listener.devices:
  305. print("Nothing found!\n")
  306. sys.exit(1)
  307. devices = listener.devices
  308. # Sort list by specified field name and# show devices overview
  309. if args.sort:
  310. field = args.sort.lower()
  311. if field not in devices[0]:
  312. print("Unknown field '{}'\n".format(field))
  313. sys.exit(1)
  314. devices.sort(key=lambda dev: dev.get(field, ""))
  315. listener.print_devices(devices)
  316. return devices
  317. def main(args):
  318. print()
  319. print(DESCRIPTION)
  320. print()
  321. devices = discover_devices(args)
  322. # Flash device only when --flash arg is provided
  323. if args.flash:
  324. # Board(s) to flash
  325. queue = []
  326. # Check if hostnames
  327. for hostname in args.hostnames:
  328. board = get_board_by_hostname(devices, hostname)
  329. if board:
  330. board["auth"] = args.password
  331. board["flags"] = args.flags
  332. queue.append(board)
  333. # If no boards ask the user
  334. if not len(queue):
  335. board = input_board(devices)
  336. if board:
  337. board["auth"] = args.password or input(
  338. "Authorization key of the device to flash: "
  339. )
  340. board["flags"] = args.flags or input("Extra flags for the build: ")
  341. queue.append(board)
  342. # If still no boards quit
  343. if not len(queue):
  344. sys.exit(0)
  345. queue.sort(key=lambda dev: dev.get("board", ""))
  346. # Flash each board
  347. for board in queue:
  348. # Flash core version?
  349. if args.core:
  350. board["flags"] = "-DESPURNA_CORE " + board["flags"]
  351. env = "esp8266-{:d}m-ota".format(board["size"])
  352. # Summary
  353. print()
  354. print("HOST = {}".format(boardname(board)))
  355. print("IP = {}".format(board["ip"]))
  356. print("BOARD = {}".format(board["board"]))
  357. print("AUTH = {}".format(board["auth"]))
  358. print("FLAGS = {}".format(board["flags"]))
  359. print("ENV = {}".format(env))
  360. response = True
  361. if not args.yes:
  362. response = input("\nAre these values right [y/N]: ") == "y"
  363. if response:
  364. print()
  365. run(board, env)
  366. if __name__ == "__main__":
  367. main(parse_commandline_args())