Fork of the espurna firmware for `mhsw` switches
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
9.1 KiB

6 years ago
  1. #!/usr/bin/env python
  2. # coding=utf-8
  3. # -------------------------------------------------------------------------------
  4. # ESPurna OTA manager
  5. # xose.perez@gmail.com
  6. #
  7. # Requires PlatformIO Core
  8. # -------------------------------------------------------------------------------
  9. from __future__ import print_function
  10. import argparse
  11. import re
  12. import socket
  13. import subprocess
  14. import sys
  15. import time
  16. from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf
  17. try:
  18. # noinspection PyUnresolvedReferences
  19. input = raw_input # Python2 *!! redefining build-in input.
  20. except NameError:
  21. pass # Python3
  22. # -------------------------------------------------------------------------------
  23. DISCOVER_TIMEOUT = 2
  24. description = "ESPurna OTA Manager v0.2"
  25. devices = []
  26. discover_last = 0
  27. # -------------------------------------------------------------------------------
  28. def on_service_state_change(zeroconf, service_type, name, state_change):
  29. """
  30. Callback that adds discovered devices to "devices" list
  31. """
  32. if state_change is ServiceStateChange.Added:
  33. discover_last = time.time()
  34. info = zeroconf.get_service_info(service_type, name)
  35. if info:
  36. hostname = info.server.split(".")[0]
  37. device = {
  38. 'hostname': hostname.upper(),
  39. 'ip': socket.inet_ntoa(info.address),
  40. 'mac': '',
  41. 'app_name': '',
  42. 'app_version': '',
  43. 'target_board': '',
  44. 'mem_size': '',
  45. 'sdk_size': '',
  46. 'free_space': '',
  47. }
  48. for key, item in info.properties.items():
  49. device[key.decode('UTF-8')] = item.decode('UTF-8');
  50. devices.append(device)
  51. def list_devices():
  52. """
  53. Shows the list of discovered devices
  54. """
  55. output_format="{:>3} {:<14} {:<15} {:<17} {:<12} {:<8} {:<25} {:<8} {:<8} {:<10}"
  56. print(output_format.format(
  57. "#",
  58. "HOSTNAME",
  59. "IP",
  60. "MAC",
  61. "APP",
  62. "VERSION",
  63. "DEVICE",
  64. "MEM_SIZE",
  65. "SDK_SIZE",
  66. "FREE_SPACE"
  67. ))
  68. print("-" * 139)
  69. index = 0
  70. for device in devices:
  71. index = index + 1
  72. print(output_format.format(
  73. index,
  74. device.get('hostname', ''),
  75. device.get('ip', ''),
  76. device.get('mac', ''),
  77. device.get('app_name', ''),
  78. device.get('app_version', ''),
  79. device.get('target_board', ''),
  80. device.get('mem_size', ''),
  81. device.get('sdk_size', ''),
  82. device.get('free_space', ''),
  83. ))
  84. print()
  85. def get_boards():
  86. """
  87. Grabs board types fro hardware.h file
  88. """
  89. boards = []
  90. for line in open("espurna/config/hardware.h"):
  91. m = re.search(r'defined\((\w*)\)', line)
  92. if m:
  93. boards.append(m.group(1))
  94. return sorted(boards)
  95. def get_empty_board():
  96. """
  97. Returns the empty structure of a board to flash
  98. """
  99. board = {'board': '', 'ip': '', 'size': 0, 'auth': '', 'flags': ''}
  100. return board
  101. def get_board_by_index(index):
  102. """
  103. Returns the required data to flash a given board
  104. """
  105. board = {}
  106. if 1 <= index and index <= len(devices):
  107. device = devices[index - 1]
  108. board['hostname'] = device.get('hostname')
  109. board['board'] = device.get('target_board', '')
  110. board['ip'] = device.get('ip', '')
  111. board['size'] = int(device.get('mem_size', 0) if device.get('mem_size', 0) == device.get('sdk_size', 0) else 0) / 1024
  112. return board
  113. def get_board_by_hostname(hostname):
  114. """
  115. Returns the required data to flash a given board
  116. """
  117. hostname = hostname.lower()
  118. for device in devices:
  119. if device.get('hostname', '').lower() == hostname:
  120. board = {}
  121. board['hostname'] = device.get('hostname')
  122. board['board'] = device.get('target_board')
  123. if not board['board']:
  124. return None
  125. board['ip'] = device.get('ip')
  126. if not board['ip']:
  127. return None
  128. board['size'] = int(device.get('sdk_size', 0)) / 1024
  129. if board['size'] == 0:
  130. return None
  131. return board
  132. return None
  133. def input_board():
  134. """
  135. Grabs info from the user about what device to flash
  136. """
  137. # Choose the board
  138. try:
  139. index = int(input("Choose the board you want to flash (empty if none of these): "))
  140. except ValueError:
  141. index = 0
  142. if index < 0 or len(devices) < index:
  143. print("Board number must be between 1 and %s\n" % str(len(devices)))
  144. return None
  145. board = get_board_by_index(index);
  146. # Choose board type if none before
  147. if len(board.get('board', '')) == 0:
  148. print()
  149. count = 1
  150. boards = get_boards()
  151. for name in boards:
  152. print("%3d\t%s" % (count, name))
  153. count = count + 1
  154. print()
  155. try:
  156. index = int(input("Choose the board type you want to flash: "))
  157. except ValueError:
  158. index = 0
  159. if index < 1 or len(boards) < index:
  160. print("Board number must be between 1 and %s\n" % str(len(boards)))
  161. return None
  162. board['board'] = boards[index - 1]
  163. # Choose board size of none before
  164. if board.get('size', 0) == 0:
  165. try:
  166. board['size'] = int(input("Board memory size (1 for 1M, 4 for 4M): "))
  167. except ValueError:
  168. print("Wrong memory size")
  169. return None
  170. # Choose IP of none before
  171. if len(board.get('ip', '')) == 0:
  172. board['ip'] = input("IP of the device to flash (empty for 192.168.4.1): ") or "192.168.4.1"
  173. return board
  174. def run(device, env):
  175. print("Building and flashing image over-the-air...")
  176. command = "export ESPURNA_IP=\"%s\"; export ESPURNA_BOARD=\"%s\"; export ESPURNA_AUTH=\"%s\"; export ESPURNA_FLAGS=\"%s\"; platformio run --silent --environment %s -t upload"
  177. command = command % (device['ip'], device['board'], device['auth'], device['flags'], env)
  178. subprocess.check_call(command, shell=True)
  179. # -------------------------------------------------------------------------------
  180. if __name__ == '__main__':
  181. # Parse command line options
  182. parser = argparse.ArgumentParser(description=description)
  183. parser.add_argument("-c", "--core", help="flash ESPurna core", default=0, action='count')
  184. parser.add_argument("-f", "--flash", help="flash device", default=0, action='count')
  185. parser.add_argument("-o", "--flags", help="extra flags", default='')
  186. parser.add_argument("-p", "--password", help="auth password", default='')
  187. parser.add_argument("-s", "--sort", help="sort devices list by field", default='hostname')
  188. parser.add_argument("-y", "--yes", help="do not ask for confirmation", default=0, action='count')
  189. parser.add_argument("hostnames", nargs='*', help="Hostnames to update")
  190. args = parser.parse_args()
  191. print()
  192. print(description)
  193. print()
  194. # Look for sevices
  195. zeroconf = Zeroconf()
  196. browser = ServiceBrowser(zeroconf, "_arduino._tcp.local.", handlers=[on_service_state_change])
  197. discover_last = time.time()
  198. while time.time() < discover_last + DISCOVER_TIMEOUT:
  199. None
  200. #zeroconf.close()
  201. if len(devices) == 0:
  202. print("Nothing found!\n")
  203. sys.exit(0)
  204. # Sort list
  205. field = args.sort.lower()
  206. if field not in devices[0]:
  207. print("Unknown field '%s'\n" % field)
  208. sys.exit(1)
  209. devices = sorted(devices, key=lambda device: device.get(field, ''))
  210. # List devices
  211. list_devices()
  212. # Flash device
  213. if args.flash > 0:
  214. # Board(s) to flash
  215. queue = []
  216. # Check if hostnames
  217. for hostname in args.hostnames:
  218. board = get_board_by_hostname(hostname)
  219. if board:
  220. board['auth'] = args.password
  221. board['flags'] = args.flags
  222. queue.append(board)
  223. # If no boards ask the user
  224. if len(queue) == 0:
  225. board = input_board()
  226. if board:
  227. board['auth'] = args.password or input("Authorization key of the device to flash: ")
  228. board['flags'] = args.flags or input("Extra flags for the build: ")
  229. queue.append(board)
  230. # If still no boards quit
  231. if len(queue) == 0:
  232. sys.exit(0)
  233. # Flash eash board
  234. for board in queue:
  235. # Flash core version?
  236. if args.core > 0:
  237. board['flags'] = "-DESPURNA_CORE " + board['flags']
  238. env = "esp8266-%sm-ota" % board['size']
  239. # Summary
  240. print()
  241. print("HOST = %s" % board.get('hostname', board['ip']))
  242. print("IP = %s" % board['ip'])
  243. print("BOARD = %s" % board['board'])
  244. print("AUTH = %s" % board['auth'])
  245. print("FLAGS = %s" % board['flags'])
  246. print("ENV = %s" % env)
  247. response = True
  248. if args.yes == 0:
  249. response = (input("\nAre these values right [y/N]: ") == "y")
  250. if response:
  251. print()
  252. run(board, env)