Mirror of espurna firmware for wireless switches and more
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

477 lines
13 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. #!/usr/bin/env python3
  2. # pylint: disable=C0330
  3. # coding=utf-8
  4. # -------------------------------------------------------------------------------
  5. # ESPurna OTA manager
  6. # xose.perez@gmail.com
  7. #
  8. # Requires PlatformIO Core
  9. # -------------------------------------------------------------------------------
  10. import argparse
  11. import os
  12. import re
  13. import shutil
  14. import socket
  15. import subprocess
  16. import functools
  17. import sys
  18. import time
  19. import zeroconf
  20. # -------------------------------------------------------------------------------
  21. __version__ = (0, 4, 3)
  22. DESCRIPTION = "ESPurna OTA Manager v{}".format(".".join(str(x) for x in __version__))
  23. DISCOVERY_TIMEOUT = 10
  24. # -------------------------------------------------------------------------------
  25. class Printer:
  26. OUTPUT_FORMAT = "{:>3} {:<14} {:<15} {:<17} {:<12} {:<12} {:<20} {:<25} {:<8} {:<8} {:<10}"
  27. def header(self):
  28. print(
  29. self.OUTPUT_FORMAT.format(
  30. "#",
  31. "HOSTNAME",
  32. "IP",
  33. "MAC",
  34. "APP",
  35. "VERSION",
  36. "BUILD_DATE",
  37. "DEVICE",
  38. "MEM_SIZE",
  39. "SDK_SIZE",
  40. "FREE_SPACE",
  41. )
  42. )
  43. print("-" * 164)
  44. def device(self, index, device):
  45. print(
  46. self.OUTPUT_FORMAT.format(
  47. index,
  48. device.get("hostname", ""),
  49. device.get("ip", ""),
  50. device.get("mac", ""),
  51. device.get("app_name", ""),
  52. device.get("app_version", ""),
  53. device.get("build_date", ""),
  54. device.get("target_board", ""),
  55. device.get("mem_size", 0),
  56. device.get("sdk_size", 0),
  57. device.get("free_space", 0),
  58. )
  59. )
  60. def devices(self, devices):
  61. """
  62. Shows the list of discovered devices
  63. """
  64. for index, device in enumerate(devices, 1):
  65. self.device(index, device)
  66. print()
  67. # Based on:
  68. # https://github.com/balloob/pychromecast/blob/master/pychromecast/discovery.py
  69. class Listener:
  70. def __init__(self, print_when_discovered=True):
  71. self.devices = []
  72. self.printer = Printer()
  73. self.print_when_discovered = print_when_discovered
  74. @property
  75. def count(self):
  76. return len(self.devices)
  77. def add_service(self, browser, service_type, name):
  78. """
  79. Callback that adds discovered devices to "devices" list
  80. """
  81. info = None
  82. tries = 0
  83. while info is None and tries < 4:
  84. try:
  85. info = browser.get_service_info(service_type, name)
  86. except IOError:
  87. break
  88. tries += 1
  89. if not info:
  90. print(
  91. "! error getting service info {} {}".format(service_type, name),
  92. file=sys.stderr,
  93. )
  94. return
  95. hostname = info.server.split(".")[0]
  96. addresses = info.parsed_addresses()
  97. device = {
  98. "hostname": hostname.upper(),
  99. "ip": addresses[0] if addresses else info.host,
  100. "mac": "",
  101. "app_name": "",
  102. "app_version": "",
  103. "build_date": "",
  104. "target_board": "",
  105. "mem_size": 0,
  106. "sdk_size": 0,
  107. "free_space": 0,
  108. }
  109. for key, item in info.properties.items():
  110. device[key.decode("UTF-8")] = item.decode("UTF-8")
  111. # rename fields (needed for sorting by name)
  112. device["app"] = device["app_name"]
  113. device["device"] = device["target_board"]
  114. device["version"] = device["app_version"]
  115. self.devices.append(device)
  116. if self.print_when_discovered:
  117. self.printer.device(self.count, device)
  118. def print_devices(self, devices=None):
  119. if not devices:
  120. devices = self.devices
  121. self.printer.devices(devices)
  122. def get_boards():
  123. """
  124. Grabs board types from hardware.h file
  125. """
  126. boards = []
  127. for line in open("espurna/config/hardware.h"):
  128. match = re.search(r"^#elif defined\((\w*)\)", line)
  129. if match:
  130. boards.append(match.group(1))
  131. return sorted(boards)
  132. def get_device_size(device):
  133. if device.get("mem_size", 0) == device.get("sdk_size", 0):
  134. return int(device.get("mem_size", 0)) // 1024
  135. return 0
  136. def get_empty_board():
  137. """
  138. Returns the empty structure of a board to flash
  139. """
  140. board = {"board": "", "ip": "", "size": 0, "auth": "", "flags": ""}
  141. return board
  142. def get_board_by_index(devices, index):
  143. """
  144. Returns the required data to flash a given board
  145. """
  146. board = {}
  147. if 1 <= index <= len(devices):
  148. device = devices[index - 1]
  149. board["hostname"] = device.get("hostname")
  150. board["board"] = device.get("target_board", "")
  151. board["ip"] = device.get("ip", "")
  152. board["size"] = get_device_size(device)
  153. return board
  154. def get_board_by_mac(devices, mac):
  155. """
  156. Returns the required data to flash a given board
  157. """
  158. for device in devices:
  159. if device.get("mac", "").lower() == mac:
  160. board = {}
  161. board["hostname"] = device.get("hostname")
  162. board["board"] = device.get("device")
  163. board["ip"] = device.get("ip")
  164. board["size"] = get_device_size(device)
  165. if not board["board"] or not board["ip"] or board["size"] == 0:
  166. return None
  167. return board
  168. return None
  169. def get_board_by_hostname(devices, hostname):
  170. """
  171. Returns the required data to flash a given board
  172. """
  173. hostname = hostname.lower()
  174. for device in devices:
  175. if device.get("hostname", "").lower() == hostname:
  176. board = {}
  177. board["hostname"] = device.get("hostname")
  178. board["board"] = device.get("target_board")
  179. board["ip"] = device.get("ip")
  180. board["size"] = get_device_size(device)
  181. if not board["board"] or not board["ip"] or board["size"] == 0:
  182. return None
  183. return board
  184. return None
  185. def input_board(devices):
  186. """
  187. Grabs info from the user about what device to flash
  188. """
  189. # Choose the board
  190. try:
  191. index = int(
  192. input("Choose the board you want to flash (empty if none of these): ")
  193. )
  194. except ValueError:
  195. index = 0
  196. if index < 0 or len(devices) < index:
  197. print("Board number must be between 1 and {}\n".format(str(len(devices))))
  198. return None
  199. board = get_board_by_index(devices, index)
  200. # Choose board type if none before
  201. if not board.get("board"):
  202. print()
  203. count = 1
  204. boards = get_boards()
  205. for name in boards:
  206. print("{:3d}\t{}".format(count, name))
  207. count += 1
  208. print()
  209. try:
  210. index = int(input("Choose the board type you want to flash: "))
  211. except ValueError:
  212. index = 0
  213. if index < 1 or len(boards) < index:
  214. print("Board number must be between 1 and {}\n".format(str(len(boards))))
  215. return None
  216. board["board"] = boards[index - 1]
  217. # Choose board size of none before
  218. if not board.get("size"):
  219. try:
  220. board["size"] = int(
  221. input("Board memory size (1 for 1M, 2 for 2M, 4 for 4M): ")
  222. )
  223. except ValueError:
  224. print("Wrong memory size")
  225. return None
  226. # Choose IP of none before
  227. if not board.get("ip"):
  228. board["ip"] = (
  229. input("IP of the device to flash (empty for 192.168.4.1): ")
  230. or "192.168.4.1"
  231. )
  232. return board
  233. def boardname(board):
  234. return board.get("hostname", board["ip"])
  235. def store(device, env):
  236. source = ".pio/build/{}/firmware.elf".format(env)
  237. destination = ".pio/build/elfs/{}.elf".format(boardname(device).lower())
  238. dst_dir = os.path.dirname(destination)
  239. if not os.path.exists(dst_dir):
  240. os.mkdir(dst_dir)
  241. shutil.move(source, destination)
  242. def run(device, env):
  243. print("Building and flashing image over-the-air...")
  244. environ = os.environ.copy()
  245. environ["ESPURNA_IP"] = device["ip"]
  246. environ["ESPURNA_BOARD"] = device["board"]
  247. environ["ESPURNA_AUTH"] = device["auth"]
  248. environ["ESPURNA_FLAGS"] = device["flags"]
  249. command = ("platformio", "run", "--silent", "--environment", env, "-t", "upload")
  250. subprocess.check_call(command, env=environ)
  251. store(device, env)
  252. # -------------------------------------------------------------------------------
  253. def parse_commandline_args():
  254. parser = argparse.ArgumentParser(description=DESCRIPTION)
  255. parser.add_argument(
  256. "--minimal",
  257. help="Use ESPurna minimal configuration",
  258. action="choice",
  259. choices=["arduino-ota", "webui"],
  260. default="arduino-ota",
  261. )
  262. parser.add_argument(
  263. "--flash", help="Flash device", action="store_true", default=False
  264. )
  265. parser.add_argument(
  266. "--arduino-core",
  267. help="Arduino ESP8266 Core version",
  268. default="current",
  269. choices=["current", "latest", "git"],
  270. )
  271. parser.add_argument("--flags", help="extra flags", default="")
  272. parser.add_argument("--password", help="auth password", default="")
  273. parser.add_argument("--sort", help="sort devices list by field", default="")
  274. parser.add_argument(
  275. "--yes",
  276. help="do not ask for confirmation",
  277. action="store_true",
  278. default=False,
  279. )
  280. parser.add_argument(
  281. "--timeout",
  282. type=int,
  283. help="how long to wait for mDNS discovery",
  284. default=DISCOVERY_TIMEOUT,
  285. )
  286. parser.add_argument("hostnames", nargs="*", help="Hostnames to update")
  287. return parser.parse_args()
  288. def discover_devices(args):
  289. # Look for services and try to immediatly print the device when it is discovered
  290. # (unless --sort <field> is specified, then we will wait until discovery finishes
  291. listener = Listener(print_when_discovered=not args.sort)
  292. try:
  293. browser = zeroconf.ServiceBrowser(
  294. zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
  295. )
  296. except (
  297. zeroconf.BadTypeInNameException,
  298. NotImplementedError,
  299. OSError,
  300. socket.error,
  301. zeroconf.NonUniqueNameException,
  302. ) as exc:
  303. print("! error when creating service discovery browser: {}".format(exc))
  304. sys.exit(1)
  305. try:
  306. time.sleep(args.timeout)
  307. except KeyboardInterrupt:
  308. sys.exit(1)
  309. try:
  310. browser.zc.close()
  311. except KeyboardInterrupt:
  312. sys.exit(1)
  313. if not listener.devices:
  314. print("Nothing found!\n")
  315. sys.exit(1)
  316. devices = listener.devices
  317. # Sort list by specified field name and# show devices overview
  318. if args.sort:
  319. field = args.sort.lower()
  320. if field not in devices[0]:
  321. print("Unknown field '{}'\n".format(field))
  322. sys.exit(1)
  323. devices.sort(key=lambda dev: dev.get(field, ""))
  324. listener.print_devices(devices)
  325. return devices
  326. @functools.lru_cache(maxsize=None)
  327. def get_platformio_env(arduino_core, size):
  328. prefix = "esp8266"
  329. if not size in [1, 2, 4]:
  330. raise ValueError(
  331. "Board memory size can only be one of: 1 for 1M, 2 for 2M, 4 for 4M"
  332. )
  333. core = ""
  334. if arduino_core != "current":
  335. core = "-{}".format(arduino_core)
  336. return "{prefix}-{size:d}m{core}-base".format(prefix=prefix, core=core, size=size)
  337. def main(args):
  338. print()
  339. print(DESCRIPTION)
  340. print()
  341. devices = discover_devices(args)
  342. # Flash device only when --flash arg is provided
  343. if args.flash:
  344. # Board(s) to flash
  345. queue = []
  346. # Check if hostnames
  347. for hostname in args.hostnames:
  348. board = get_board_by_hostname(devices, hostname)
  349. if board:
  350. board["auth"] = args.password
  351. board["flags"] = args.flags
  352. queue.append(board)
  353. # If no boards ask the user
  354. if not len(queue):
  355. board = input_board(devices)
  356. if board:
  357. board["auth"] = args.password or input(
  358. "Authorization key of the device to flash: "
  359. )
  360. board["flags"] = args.flags or input("Extra flags for the build: ")
  361. queue.append(board)
  362. # If still no boards quit
  363. if not len(queue):
  364. sys.exit(0)
  365. queue.sort(key=lambda dev: dev.get("board", ""))
  366. # Flash each board
  367. for board in queue:
  368. if args.minimal:
  369. flag = args.minimal.replace("-", "_").upper()
  370. flag = f"ESPURNA_MINIMAL_{flag}"
  371. board["flags"] = "-D${flag} " + board["flags"]
  372. env = get_platformio_env(args.arduino_core, board["size"])
  373. # Summary
  374. print()
  375. print("HOST = {}".format(boardname(board)))
  376. print("IP = {}".format(board["ip"]))
  377. print("BOARD = {}".format(board["board"]))
  378. print("AUTH = {}".format(board["auth"]))
  379. print("FLAGS = {}".format(board["flags"]))
  380. print("ENV = {}".format(env))
  381. response = True
  382. if not args.yes:
  383. response = input("\nAre these values right [y/N]: ") == "y"
  384. if response:
  385. print()
  386. run(board, env)
  387. if __name__ == "__main__":
  388. main(parse_commandline_args())