You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

716 lines
24 KiB

  1. #!/usr/bin/env python3
  2. # coding=utf-8
  3. """MILC - A CLI Framework
  5. MILC is an opinionated framework for writing CLI apps. It optimizes for the
  6. most common unix tool pattern- small tools that are run from the command
  7. line but generally do not feature any user interaction while they run.
  8. For more details see the MILC documentation:
  9. <>
  10. """
  11. from __future__ import division, print_function, unicode_literals
  12. import argparse
  13. import logging
  14. import os
  15. import re
  16. import sys
  17. from decimal import Decimal
  18. from tempfile import NamedTemporaryFile
  19. from time import sleep
  20. try:
  21. from ConfigParser import RawConfigParser
  22. except ImportError:
  23. from configparser import RawConfigParser
  24. try:
  25. import thread
  26. import threading
  27. except ImportError:
  28. thread = None
  29. import argcomplete
  30. import colorama
  31. # Log Level Representations
  33. 'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}',
  34. 'ERROR': '{fg_red}☒{style_reset_all}',
  35. 'WARNING': '{fg_yellow}⚠{style_reset_all}',
  36. 'INFO': '{fg_blue}ℹ{style_reset_all}',
  37. 'DEBUG': '{fg_cyan}☐{style_reset_all}',
  38. 'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯'
  39. }
  42. # ANSI Color setup
  43. # Regex was gratefully borrowed from kfir on stackoverflow:
  44. #
  45. ansi_regex = r'\x1b(' \
  46. r'(\[\??\d+[hl])|' \
  47. r'([=<>a-kzNM78])|' \
  48. r'([\(\)][a-b0-2])|' \
  49. r'(\[\d{0,2}[ma-dgkjqi])|' \
  50. r'(\[\d+;\d+[hfy]?)|' \
  51. r'(\[;?[hf])|' \
  52. r'(#[3-68])|' \
  53. r'([01356]n)|' \
  54. r'(O[mlnp-z]?)|' \
  55. r'(/Z)|' \
  56. r'(\d+)|' \
  57. r'(\[\?\d;\d0c)|' \
  58. r'(\d;\dR))'
  59. ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE)
  60. ansi_styles = (
  61. ('fg', colorama.ansi.AnsiFore()),
  62. ('bg', colorama.ansi.AnsiBack()),
  63. ('style', colorama.ansi.AnsiStyle()),
  64. )
  65. ansi_colors = {}
  66. for prefix, obj in ansi_styles:
  67. for color in [x for x in obj.__dict__ if not x.startswith('_')]:
  68. ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color)
  69. def format_ansi(text):
  70. """Return a copy of text with certain strings replaced with ansi.
  71. """
  72. # Avoid .format() so we don't have to worry about the log content
  73. for color in ansi_colors:
  74. text = text.replace('{%s}' % color, ansi_colors[color])
  75. return text + ansi_colors['style_reset_all']
  76. class ANSIFormatter(logging.Formatter):
  77. """A log formatter that inserts ANSI color.
  78. """
  79. def format(self, record):
  80. msg = super(ANSIFormatter, self).format(record)
  81. return format_ansi(msg)
  82. class ANSIEmojiLoglevelFormatter(ANSIFormatter):
  83. """A log formatter that makes the loglevel an emoji.
  84. """
  85. def format(self, record):
  86. record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors)
  87. return super(ANSIEmojiLoglevelFormatter, self).format(record)
  88. class ANSIStrippingFormatter(ANSIFormatter):
  89. """A log formatter that strips ANSI.
  90. """
  91. def format(self, record):
  92. msg = super(ANSIStrippingFormatter, self).format(record)
  93. return ansi_escape.sub('', msg)
  94. class Configuration(object):
  95. """Represents the running configuration.
  96. This class never raises IndexError, instead it will return None if a
  97. section or option does not yet exist.
  98. """
  99. def __contains__(self, key):
  100. return self._config.__contains__(key)
  101. def __iter__(self):
  102. return self._config.__iter__()
  103. def __len__(self):
  104. return self._config.__len__()
  105. def __repr__(self):
  106. return self._config.__repr__()
  107. def keys(self):
  108. return self._config.keys()
  109. def items(self):
  110. return self._config.items()
  111. def values(self):
  112. return self._config.values()
  113. def __init__(self, *args, **kwargs):
  114. self._config = {}
  115. self.default_container = ConfigurationOption
  116. def __getitem__(self, key):
  117. """Returns a config section, creating it if it doesn't exist yet.
  118. """
  119. if key not in self._config:
  120. self.__dict__[key] = self._config[key] = ConfigurationOption()
  121. return self._config[key]
  122. def __setitem__(self, key, value):
  123. self.__dict__[key] = value
  124. self._config[key] = value
  125. def __delitem__(self, key):
  126. if key in self.__dict__ and key[0] != '_':
  127. del self.__dict__[key]
  128. del self._config[key]
  129. class ConfigurationOption(Configuration):
  130. def __init__(self, *args, **kwargs):
  131. super(ConfigurationOption, self).__init__(*args, **kwargs)
  132. self.default_container = dict
  133. def __getitem__(self, key):
  134. """Returns a config section, creating it if it doesn't exist yet.
  135. """
  136. if key not in self._config:
  137. self.__dict__[key] = self._config[key] = None
  138. return self._config[key]
  139. def handle_store_boolean(self, *args, **kwargs):
  140. """Does the add_argument for action='store_boolean'.
  141. """
  142. kwargs['add_dest'] = False
  143. disabled_args = None
  144. disabled_kwargs = kwargs.copy()
  145. disabled_kwargs['action'] = 'store_false'
  146. disabled_kwargs['help'] = 'Disable ' + kwargs['help']
  147. kwargs['action'] = 'store_true'
  148. kwargs['help'] = 'Enable ' + kwargs['help']
  149. for flag in args:
  150. if flag[:2] == '--':
  151. disabled_args = ('--no-' + flag[2:],)
  152. break
  153. self.add_argument(*args, **kwargs)
  154. self.add_argument(*disabled_args, **disabled_kwargs)
  155. return (args, kwargs, disabled_args, disabled_kwargs)
  156. class SubparserWrapper(object):
  157. """Wrap subparsers so we can populate the normal and the shadow parser.
  158. """
  159. def __init__(self, cli, submodule, subparser):
  160. self.cli = cli
  161. self.submodule = submodule
  162. self.subparser = subparser
  163. for attr in dir(subparser):
  164. if not hasattr(self, attr):
  165. setattr(self, attr, getattr(subparser, attr))
  166. def completer(self, completer):
  167. """Add an arpcomplete completer to this subcommand.
  168. """
  169. self.subparser.completer = completer
  170. def add_argument(self, *args, **kwargs):
  171. if kwargs.get('add_dest', True):
  172. kwargs['dest'] = self.submodule + '_' + self.cli.get_argument_name(*args, **kwargs)
  173. if 'add_dest' in kwargs:
  174. del kwargs['add_dest']
  175. if 'action' in kwargs and kwargs['action'] == 'store_boolean':
  176. return handle_store_boolean(self, *args, **kwargs)
  177. self.cli.acquire_lock()
  178. self.subparser.add_argument(*args, **kwargs)
  179. if 'default' in kwargs:
  180. del kwargs['default']
  181. if 'action' in kwargs and kwargs['action'] == 'store_false':
  182. kwargs['action'] == 'store_true'
  183. self.cli.subcommands_default[self.submodule].add_argument(*args, **kwargs)
  184. self.cli.release_lock()
  185. class MILC(object):
  186. """MILC - An Opinionated Batteries Included Framework
  187. """
  188. def __init__(self):
  189. """Initialize the MILC object.
  190. """
  191. # Setup a lock for thread safety
  192. self._lock = threading.RLock() if thread else None
  193. # Define some basic info
  194. self.acquire_lock()
  195. self._description = None
  196. self._entrypoint = None
  197. self._inside_context_manager = False
  198. self.ansi = ansi_colors
  199. self.config = Configuration()
  200. self.config_file = None
  201. self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0]
  202. self.version = os.environ.get('QMK_VERSION', 'unknown')
  203. self.release_lock()
  204. # Initialize all the things
  205. self.initialize_argparse()
  206. self.initialize_logging()
  207. @property
  208. def description(self):
  209. return self._description
  210. @description.setter
  211. def description(self, value):
  212. self._description = self._arg_parser.description = self._arg_defaults.description = value
  213. def echo(self, text, *args, **kwargs):
  214. """Print colorized text to stdout, as long as stdout is a tty.
  215. ANSI color strings (such as {fg-blue}) will be converted into ANSI
  216. escape sequences, and the ANSI reset sequence will be added to all
  217. strings.
  218. If *args or **kwargs are passed they will be used to %-format the strings.
  219. """
  220. if args and kwargs:
  221. raise RuntimeError('You can only specify *args or **kwargs, not both!')
  222. if sys.stdout.isatty():
  223. args = args or kwargs
  224. text = format_ansi(text)
  225. print(text % args)
  226. def initialize_argparse(self):
  227. """Prepare to process arguments from sys.argv.
  228. """
  229. kwargs = {
  230. 'fromfile_prefix_chars': '@',
  231. 'conflict_handler': 'resolve',
  232. }
  233. self.acquire_lock()
  234. self.subcommands = {}
  235. self.subcommands_default = {}
  236. self._subparsers = None
  237. self._subparsers_default = None
  238. self.argwarn = argcomplete.warn
  239. self.args = None
  240. self._arg_defaults = argparse.ArgumentParser(**kwargs)
  241. self._arg_parser = argparse.ArgumentParser(**kwargs)
  242. self.set_defaults = self._arg_parser.set_defaults
  243. self.print_usage = self._arg_parser.print_usage
  244. self.print_help = self._arg_parser.print_help
  245. self.release_lock()
  246. def completer(self, completer):
  247. """Add an arpcomplete completer to this subcommand.
  248. """
  249. self._arg_parser.completer = completer
  250. def add_argument(self, *args, **kwargs):
  251. """Wrapper to add arguments to both the main and the shadow argparser.
  252. """
  253. if kwargs.get('add_dest', True) and args[0][0] == '-':
  254. kwargs['dest'] = 'general_' + self.get_argument_name(*args, **kwargs)
  255. if 'add_dest' in kwargs:
  256. del kwargs['add_dest']
  257. if 'action' in kwargs and kwargs['action'] == 'store_boolean':
  258. return handle_store_boolean(self, *args, **kwargs)
  259. self.acquire_lock()
  260. self._arg_parser.add_argument(*args, **kwargs)
  261. # Populate the shadow parser
  262. if 'default' in kwargs:
  263. del kwargs['default']
  264. if 'action' in kwargs and kwargs['action'] == 'store_false':
  265. kwargs['action'] == 'store_true'
  266. self._arg_defaults.add_argument(*args, **kwargs)
  267. self.release_lock()
  268. def initialize_logging(self):
  269. """Prepare the defaults for the logging infrastructure.
  270. """
  271. self.acquire_lock()
  272. self.log_file = None
  273. self.log_file_mode = 'a'
  274. self.log_file_handler = None
  275. self.log_print = True
  276. self.log_print_to = sys.stderr
  277. self.log_print_level = logging.INFO
  278. self.log_file_level = logging.DEBUG
  279. self.log_level = logging.INFO
  280. self.log = logging.getLogger(self.__class__.__name__)
  281. self.log.setLevel(logging.DEBUG)
  282. logging.root.setLevel(logging.DEBUG)
  283. self.release_lock()
  284. self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit')
  285. self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose')
  286. self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes')
  287. self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output')
  288. self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.')
  289. self.add_argument('--log-file', help='File to write log messages to')
  290. self.add_argument('--color', action='store_boolean', default=True, help='color in output')
  291. self.add_argument('-c', '--config-file', help='The config file to read and/or write')
  292. self.add_argument('--save-config', action='store_true', help='Save the running configuration to the config file')
  293. def add_subparsers(self, title='Sub-commands', **kwargs):
  294. if self._inside_context_manager:
  295. raise RuntimeError('You must run this before the with statement!')
  296. self.acquire_lock()
  297. self._subparsers_default = self._arg_defaults.add_subparsers(title=title, dest='subparsers', **kwargs)
  298. self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs)
  299. self.release_lock()
  300. def acquire_lock(self):
  301. """Acquire the MILC lock for exclusive access to properties.
  302. """
  303. if self._lock:
  304. self._lock.acquire()
  305. def release_lock(self):
  306. """Release the MILC lock.
  307. """
  308. if self._lock:
  309. self._lock.release()
  310. def find_config_file(self):
  311. """Locate the config file.
  312. """
  313. if self.config_file:
  314. return self.config_file
  315. if self.args and self.args.general_config_file:
  316. return self.args.general_config_file
  317. return os.path.abspath(os.path.expanduser('~/.%s.ini' % self.prog_name))
  318. def get_argument_name(self, *args, **kwargs):
  319. """Takes argparse arguments and returns the dest name.
  320. """
  321. try:
  322. return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest']
  323. except ValueError:
  324. return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest']
  325. def argument(self, *args, **kwargs):
  326. """Decorator to call self.add_argument or self.<subcommand>.add_argument.
  327. """
  328. if self._inside_context_manager:
  329. raise RuntimeError('You must run this before the with statement!')
  330. def argument_function(handler):
  331. if handler is self._entrypoint:
  332. self.add_argument(*args, **kwargs)
  333. elif handler.__name__ in self.subcommands:
  334. self.subcommands[handler.__name__].add_argument(*args, **kwargs)
  335. else:
  336. raise RuntimeError('Decorated function is not entrypoint or subcommand!')
  337. return handler
  338. return argument_function
  339. def arg_passed(self, arg):
  340. """Returns True if arg was passed on the command line.
  341. """
  342. return self.args_passed[arg] in (None, False)
  343. def parse_args(self):
  344. """Parse the CLI args.
  345. """
  346. if self.args:
  347. self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!')
  348. return
  349. argcomplete.autocomplete(self._arg_parser)
  350. self.acquire_lock()
  351. self.args = self._arg_parser.parse_args()
  352. self.args_passed = self._arg_defaults.parse_args()
  353. if 'entrypoint' in self.args:
  354. self._entrypoint = self.args.entrypoint
  355. if self.args.general_config_file:
  356. self.config_file = self.args.general_config_file
  357. self.release_lock()
  358. def read_config(self):
  359. """Parse the configuration file and determine the runtime configuration.
  360. """
  361. self.acquire_lock()
  362. self.config_file = self.find_config_file()
  363. if self.config_file and os.path.exists(self.config_file):
  364. config = RawConfigParser(self.config)
  366. # Iterate over the config file options and write them into self.config
  367. for section in config.sections():
  368. for option in config.options(section):
  369. value = config.get(section, option)
  370. # Coerce values into useful datatypes
  371. if value.lower() in ['1', 'yes', 'true', 'on']:
  372. value = True
  373. elif value.lower() in ['0', 'no', 'false', 'none', 'off']:
  374. value = False
  375. elif value.replace('.', '').isdigit():
  376. if '.' in value:
  377. value = Decimal(value)
  378. else:
  379. value = int(value)
  380. self.config[section][option] = value
  381. # Fold the CLI args into self.config
  382. for argument in vars(self.args):
  383. if argument in ('subparsers', 'entrypoint'):
  384. continue
  385. if '_' not in argument:
  386. continue
  387. section, option = argument.split('_', 1)
  388. if hasattr(self.args_passed, argument):
  389. self.config[section][option] = getattr(self.args, argument)
  390. else:
  391. if option not in self.config[section]:
  392. self.config[section][option] = getattr(self.args, argument)
  393. self.release_lock()
  394. def save_config(self):
  395. """Save the current configuration to the config file.
  396. """
  397. self.log.debug("Saving config file to '%s'", self.config_file)
  398. if not self.config_file:
  399. self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__)
  400. return
  401. self.acquire_lock()
  402. config = RawConfigParser()
  403. for section_name, section in self.config._config.items():
  404. config.add_section(section_name)
  405. for option_name, value in section.items():
  406. if section_name == 'general':
  407. if option_name in ['save_config']:
  408. continue
  409. config.set(section_name, option_name, str(value))
  410. with NamedTemporaryFile(mode='w', dir=os.path.dirname(self.config_file), delete=False) as tmpfile:
  411. config.write(tmpfile)
  412. # Move the new config file into place atomically
  413. if os.path.getsize( > 0:
  414. os.rename(, self.config_file)
  415. else:
  416. self.log.warning('Config file saving failed, not replacing %s with %s.', self.config_file,
  417. self.release_lock()
  418. def __call__(self):
  419. """Execute the entrypoint function.
  420. """
  421. if not self._inside_context_manager:
  422. # If they didn't use the context manager use it ourselves
  423. with self:
  424. self.__call__()
  425. return
  426. if not self._entrypoint:
  427. raise RuntimeError('No entrypoint provided!')
  428. return self._entrypoint(self)
  429. def entrypoint(self, description):
  430. """Set the entrypoint for when no subcommand is provided.
  431. """
  432. if self._inside_context_manager:
  433. raise RuntimeError('You must run this before cli()!')
  434. self.acquire_lock()
  435. self.description = description
  436. self.release_lock()
  437. def entrypoint_func(handler):
  438. self.acquire_lock()
  439. self._entrypoint = handler
  440. self.release_lock()
  441. return handler
  442. return entrypoint_func
  443. def add_subcommand(self, handler, description, name=None, **kwargs):
  444. """Register a subcommand.
  445. If name is not provided we use `handler.__name__`.
  446. """
  447. if self._inside_context_manager:
  448. raise RuntimeError('You must run this before the with statement!')
  449. if self._subparsers is None:
  450. self.add_subparsers()
  451. if not name:
  452. name = handler.__name__
  453. self.acquire_lock()
  454. kwargs['help'] = description
  455. self.subcommands_default[name] = self._subparsers_default.add_parser(name, **kwargs)
  456. self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs))
  457. self.subcommands[name].set_defaults(entrypoint=handler)
  458. if name not in self.__dict__:
  459. self.__dict__[name] = self.subcommands[name]
  460. else:
  461. self.log.debug("Could not add subcommand '%s' to attributes, key already exists!", name)
  462. self.release_lock()
  463. return handler
  464. def subcommand(self, description, **kwargs):
  465. """Decorator to register a subcommand.
  466. """
  467. def subcommand_function(handler):
  468. return self.add_subcommand(handler, description, **kwargs)
  469. return subcommand_function
  470. def setup_logging(self):
  471. """Called by __enter__() to setup the logging configuration.
  472. """
  473. if len(logging.root.handlers) != 0:
  474. # This is not a design decision. This is what I'm doing for now until I can examine and think about this situation in more detail.
  475. raise RuntimeError('MILC should be the only system installing root log handlers!')
  476. self.acquire_lock()
  477. if self.config['general']['verbose']:
  478. self.log_print_level = logging.DEBUG
  479. self.log_file = self.config['general']['log_file'] or self.log_file
  480. self.log_file_format = self.config['general']['log_file_fmt']
  481. self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt'])
  482. self.log_format = self.config['general']['log_fmt']
  483. if self.config.general.color:
  484. self.log_format = ANSIEmojiLoglevelFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
  485. else:
  486. self.log_format = ANSIStrippingFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
  487. if self.log_file:
  488. self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode)
  489. self.log_file_handler.setLevel(self.log_file_level)
  490. self.log_file_handler.setFormatter(self.log_file_format)
  491. logging.root.addHandler(self.log_file_handler)
  492. if self.log_print:
  493. self.log_print_handler = logging.StreamHandler(self.log_print_to)
  494. self.log_print_handler.setLevel(self.log_print_level)
  495. self.log_print_handler.setFormatter(self.log_format)
  496. logging.root.addHandler(self.log_print_handler)
  497. self.release_lock()
  498. def __enter__(self):
  499. if self._inside_context_manager:
  500. self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.')
  501. return
  502. self.acquire_lock()
  503. self._inside_context_manager = True
  504. self.release_lock()
  505. colorama.init()
  506. self.parse_args()
  507. self.read_config()
  508. self.setup_logging()
  509. if self.config.general.save_config:
  510. self.save_config()
  511. return self
  512. def __exit__(self, exc_type, exc_val, exc_tb):
  513. self.acquire_lock()
  514. self._inside_context_manager = False
  515. self.release_lock()
  516. if exc_type is not None and not isinstance(SystemExit(), exc_type):
  517. print(exc_type)
  518. logging.exception(exc_val)
  519. exit(255)
  520. cli = MILC()
  521. if __name__ == '__main__':
  522. @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean')
  523. @cli.entrypoint('My useful CLI tool with subcommands.')
  524. def main(cli):
  525. comma = ',' if cli.config.general.comma else ''
  526.'{bg_green}{fg_red}Hello%s World!', comma)
  527. @cli.argument('-n', '--name', help='Name to greet', default='World')
  528. @cli.subcommand('Description of hello subcommand here.')
  529. def hello(cli):
  530. comma = ',' if cli.config.general.comma else ''
  531.'{fg_blue}Hello%s %s!', comma,
  532. def goodbye(cli):
  533. comma = ',' if cli.config.general.comma else ''
  534.'{bg_red}Goodbye%s %s!', comma,
  535. @cli.argument('-n', '--name', help='Name to greet', default='World')
  536. @cli.subcommand('Think a bit before greeting the user.')
  537. def thinking(cli):
  538. comma = ',' if cli.config.general.comma else ''
  539. spinner = cli.spinner(text='Just a moment...', spinner='earth')
  540. spinner.start()
  541. sleep(2)
  542. spinner.stop()
  543. with cli.spinner(text='Almost there!', spinner='moon'):
  544. sleep(2)
  545.'{fg_cyan}Hello%s %s!', comma,
  546. @cli.subcommand('Show off our ANSI colors.')
  547. def pride(cli):
  548. cli.echo('{bg_red} ')
  549. cli.echo('{bg_lightred_ex} ')
  550. cli.echo('{bg_lightyellow_ex} ')
  551. cli.echo('{bg_green} ')
  552. cli.echo('{bg_blue} ')
  553. cli.echo('{bg_magenta} ')
  554. # You can register subcommands using decorators as seen above, or using functions like like this:
  555. cli.add_subcommand(goodbye, 'This will show up in --help output.')
  556. cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World')
  557. cli() # Automatically picks between main(), hello() and goodbye()
  558. print(sorted(ansi_colors.keys()))