#!/usr/bin/env python3 # coding=utf-8 """MILC - A CLI Framework PYTHON_ARGCOMPLETE_OK MILC is an opinionated framework for writing CLI apps. It optimizes for the most common unix tool pattern- small tools that are run from the command line but generally do not feature any user interaction while they run. For more details see the MILC documentation: <https://github.com/clueboard/milc/tree/master/docs> """ from __future__ import division, print_function, unicode_literals import argparse import logging import os import re import shlex import sys from decimal import Decimal from pathlib import Path from tempfile import NamedTemporaryFile from time import sleep try: from ConfigParser import RawConfigParser except ImportError: from configparser import RawConfigParser try: import thread import threading except ImportError: thread = None import argcomplete import colorama from appdirs import user_config_dir # Disable logging until we can configure it how the user wants logging.basicConfig(stream=os.devnull) # Log Level Representations EMOJI_LOGLEVELS = { 'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}', 'ERROR': '{fg_red}☒{style_reset_all}', 'WARNING': '{fg_yellow}⚠{style_reset_all}', 'INFO': '{fg_blue}ℹ{style_reset_all}', 'DEBUG': '{fg_cyan}☐{style_reset_all}', 'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯' } EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL'] EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING'] UNICODE_SUPPORT = sys.stdout.encoding.lower().startswith('utf') # ANSI Color setup # Regex was gratefully borrowed from kfir on stackoverflow: # https://stackoverflow.com/a/45448194 ansi_regex = r'\x1b(' \ r'(\[\??\d+[hl])|' \ r'([=<>a-kzNM78])|' \ r'([\(\)][a-b0-2])|' \ r'(\[\d{0,2}[ma-dgkjqi])|' \ r'(\[\d+;\d+[hfy]?)|' \ r'(\[;?[hf])|' \ r'(#[3-68])|' \ r'([01356]n)|' \ r'(O[mlnp-z]?)|' \ r'(/Z)|' \ r'(\d+)|' \ r'(\[\?\d;\d0c)|' \ r'(\d;\dR))' ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE) ansi_styles = ( ('fg', colorama.ansi.AnsiFore()), ('bg', colorama.ansi.AnsiBack()), ('style', colorama.ansi.AnsiStyle()), ) ansi_colors = {} for prefix, obj in ansi_styles: for color in [x for x in obj.__dict__ if not x.startswith('_')]: ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color) def format_ansi(text): """Return a copy of text with certain strings replaced with ansi. """ # Avoid .format() so we don't have to worry about the log content for color in ansi_colors: text = text.replace('{%s}' % color, ansi_colors[color]) return text + ansi_colors['style_reset_all'] class ANSIFormatter(logging.Formatter): """A log formatter that inserts ANSI color. """ def format(self, record): msg = super(ANSIFormatter, self).format(record) return format_ansi(msg) class ANSIEmojiLoglevelFormatter(ANSIFormatter): """A log formatter that makes the loglevel an emoji on UTF capable terminals. """ def format(self, record): if UNICODE_SUPPORT: record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors) return super(ANSIEmojiLoglevelFormatter, self).format(record) class ANSIStrippingFormatter(ANSIFormatter): """A log formatter that strips ANSI. """ def format(self, record): msg = super(ANSIStrippingFormatter, self).format(record) return ansi_escape.sub('', msg) class Configuration(object): """Represents the running configuration. This class never raises IndexError, instead it will return None if a section or option does not yet exist. """ def __contains__(self, key): return self._config.__contains__(key) def __iter__(self): return self._config.__iter__() def __len__(self): return self._config.__len__() def __repr__(self): return self._config.__repr__() def keys(self): return self._config.keys() def items(self): return self._config.items() def values(self): return self._config.values() def __init__(self, *args, **kwargs): self._config = {} def __getattr__(self, key): return self.__getitem__(key) def __getitem__(self, key): """Returns a config section, creating it if it doesn't exist yet. """ if key not in self._config: self.__dict__[key] = self._config[key] = ConfigurationSection(self) return self._config[key] def __setitem__(self, key, value): self.__dict__[key] = value self._config[key] = value def __delitem__(self, key): if key in self.__dict__ and key[0] != '_': del self.__dict__[key] if key in self._config: del self._config[key] class ConfigurationSection(Configuration): def __init__(self, parent, *args, **kwargs): super(ConfigurationSection, self).__init__(*args, **kwargs) self.parent = parent def __getitem__(self, key): """Returns a config value, pulling from the `user` section as a fallback. This is called when the attribute is accessed either via the get method or through [ ] index. """ if key in self._config and self._config.get(key) is not None: return self._config[key] elif key in self.parent.user: return self.parent.user[key] return None def __getattr__(self, key): """Returns the config value from the `user` section. This is called when the attribute is accessed via dot notation but does not exists. """ if key in self.parent.user: return self.parent.user[key] return None def handle_store_boolean(self, *args, **kwargs): """Does the add_argument for action='store_boolean'. """ disabled_args = None disabled_kwargs = kwargs.copy() disabled_kwargs['action'] = 'store_false' disabled_kwargs['dest'] = self.get_argument_name(*args, **kwargs) disabled_kwargs['help'] = 'Disable ' + kwargs['help'] kwargs['action'] = 'store_true' kwargs['help'] = 'Enable ' + kwargs['help'] for flag in args: if flag[:2] == '--': disabled_args = ('--no-' + flag[2:],) break self.add_argument(*args, **kwargs) self.add_argument(*disabled_args, **disabled_kwargs) return (args, kwargs, disabled_args, disabled_kwargs) class SubparserWrapper(object): """Wrap subparsers so we can track what options the user passed. """ def __init__(self, cli, submodule, subparser): self.cli = cli self.submodule = submodule self.subparser = subparser for attr in dir(subparser): if not hasattr(self, attr): setattr(self, attr, getattr(subparser, attr)) def completer(self, completer): """Add an arpcomplete completer to this subcommand. """ self.subparser.completer = completer def add_argument(self, *args, **kwargs): """Add an argument for this subcommand. This also stores the default for the argument in `self.cli.default_arguments`. """ if kwargs.get('action') == 'store_boolean': # Store boolean will call us again with the enable/disable flag arguments return handle_store_boolean(self, *args, **kwargs) self.cli.acquire_lock() argument_name = self.cli.get_argument_name(*args, **kwargs) self.subparser.add_argument(*args, **kwargs) if kwargs.get('action') == 'store_false': self.cli._config_store_false.append(argument_name) if kwargs.get('action') == 'store_true': self.cli._config_store_true.append(argument_name) if self.submodule not in self.cli.default_arguments: self.cli.default_arguments[self.submodule] = {} self.cli.default_arguments[self.submodule][argument_name] = kwargs.get('default') self.cli.release_lock() class MILC(object): """MILC - An Opinionated Batteries Included Framework """ def __init__(self): """Initialize the MILC object. version The version string to associate with your CLI program """ # Setup a lock for thread safety self._lock = threading.RLock() if thread else None # Define some basic info self.acquire_lock() self._config_store_true = [] self._config_store_false = [] self._description = None self._entrypoint = None self._inside_context_manager = False self.ansi = ansi_colors self.arg_only = {} self.config = self.config_source = None self.config_file = None self.default_arguments = {} self.version = 'unknown' self.release_lock() # Figure out our program name self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0] self.prog_name = self.prog_name.split('/')[-1] # Initialize all the things self.read_config_file() self.initialize_argparse() self.initialize_logging() @property def description(self): return self._description @description.setter def description(self, value): self._description = self._arg_parser.description = value def echo(self, text, *args, **kwargs): """Print colorized text to stdout. ANSI color strings (such as {fg-blue}) will be converted into ANSI escape sequences, and the ANSI reset sequence will be added to all strings. If *args or **kwargs are passed they will be used to %-format the strings. """ if args and kwargs: raise RuntimeError('You can only specify *args or **kwargs, not both!') args = args or kwargs text = format_ansi(text) print(text % args) def initialize_argparse(self): """Prepare to process arguments from sys.argv. """ kwargs = { 'fromfile_prefix_chars': '@', 'conflict_handler': 'resolve', } self.acquire_lock() self.subcommands = {} self._subparsers = None self.argwarn = argcomplete.warn self.args = None self._arg_parser = argparse.ArgumentParser(**kwargs) self.set_defaults = self._arg_parser.set_defaults self.print_usage = self._arg_parser.print_usage self.print_help = self._arg_parser.print_help self.release_lock() def completer(self, completer): """Add an argcomplete completer to this subcommand. """ self._arg_parser.completer = completer def add_argument(self, *args, **kwargs): """Wrapper to add arguments and track whether they were passed on the command line. """ if 'action' in kwargs and kwargs['action'] == 'store_boolean': return handle_store_boolean(self, *args, **kwargs) self.acquire_lock() self._arg_parser.add_argument(*args, **kwargs) if 'general' not in self.default_arguments: self.default_arguments['general'] = {} self.default_arguments['general'][self.get_argument_name(*args, **kwargs)] = kwargs.get('default') self.release_lock() def initialize_logging(self): """Prepare the defaults for the logging infrastructure. """ self.acquire_lock() self.log_file = None self.log_file_mode = 'a' self.log_file_handler = None self.log_print = True self.log_print_to = sys.stderr self.log_print_level = logging.INFO self.log_file_level = logging.DEBUG self.log_level = logging.INFO self.log = logging.getLogger(self.__class__.__name__) self.log.setLevel(logging.DEBUG) logging.root.setLevel(logging.DEBUG) self.release_lock() self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit') self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose') self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes') self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output') self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.') self.add_argument('--log-file', help='File to write log messages to') self.add_argument('--color', action='store_boolean', default=True, help='color in output') self.add_argument('--config-file', help='The location for the configuration file') self.arg_only['config_file'] = ['general'] def add_subparsers(self, title='Sub-commands', **kwargs): if self._inside_context_manager: raise RuntimeError('You must run this before the with statement!') self.acquire_lock() self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs) self.release_lock() def acquire_lock(self): """Acquire the MILC lock for exclusive access to properties. """ if self._lock: self._lock.acquire() def release_lock(self): """Release the MILC lock. """ if self._lock: self._lock.release() def find_config_file(self): """Locate the config file. """ if self.config_file: return self.config_file if '--config-file' in sys.argv: return Path(sys.argv[sys.argv.index('--config-file') + 1]).expanduser().resolve() filedir = user_config_dir(appname='qmk', appauthor='QMK') filename = '%s.ini' % self.prog_name return Path(filedir) / filename def get_argument_name(self, *args, **kwargs): """Takes argparse arguments and returns the dest name. """ try: return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest'] except ValueError: return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest'] def argument(self, *args, **kwargs): """Decorator to call self.add_argument or self.<subcommand>.add_argument. """ if self._inside_context_manager: raise RuntimeError('You must run this before the with statement!') def argument_function(handler): subcommand_name = handler.__name__.replace("_", "-") if kwargs.get('arg_only'): arg_name = self.get_argument_name(*args, **kwargs) if arg_name not in self.arg_only: self.arg_only[arg_name] = [] self.arg_only[arg_name].append(subcommand_name) del kwargs['arg_only'] if handler is self._entrypoint: self.add_argument(*args, **kwargs) elif subcommand_name in self.subcommands: self.subcommands[subcommand_name].add_argument(*args, **kwargs) else: raise RuntimeError('Decorated function is not entrypoint or subcommand!') return handler return argument_function def arg_passed(self, arg): """Returns True if arg was passed on the command line. """ return self.default_arguments.get(arg) != self.args[arg] def parse_args(self): """Parse the CLI args. """ if self.args: self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!') return argcomplete.autocomplete(self._arg_parser) self.acquire_lock() self.args = self._arg_parser.parse_args() if 'entrypoint' in self.args: self._entrypoint = self.args.entrypoint self.release_lock() def read_config_file(self): """Read in the configuration file and store it in self.config. """ self.acquire_lock() self.config = Configuration() self.config_source = Configuration() self.config_file = self.find_config_file() if self.config_file and self.config_file.exists(): config = RawConfigParser(self.config) config.read(str(self.config_file)) # Iterate over the config file options and write them into self.config for section in config.sections(): for option in config.options(section): value = config.get(section, option) # Coerce values into useful datatypes if value.lower() in ['1', 'yes', 'true', 'on']: value = True elif value.lower() in ['0', 'no', 'false', 'off']: value = False elif value.lower() in ['none']: continue elif value.replace('.', '').isdigit(): if '.' in value: value = Decimal(value) else: value = int(value) self.config[section][option] = value self.config_source[section][option] = 'config_file' self.release_lock() def merge_args_into_config(self): """Merge CLI arguments into self.config to create the runtime configuration. """ self.acquire_lock() for argument in vars(self.args): if argument in ('subparsers', 'entrypoint'): continue # Find the argument's section # Underscores in command's names are converted to dashes during initialization. # TODO(Erovia) Find a better solution entrypoint_name = self._entrypoint.__name__.replace("_", "-") if entrypoint_name in self.default_arguments and argument in self.default_arguments[entrypoint_name]: argument_found = True section = self._entrypoint.__name__ if argument in self.default_arguments['general']: argument_found = True section = 'general' if not argument_found: raise RuntimeError('Could not find argument in `self.default_arguments`. This should be impossible!') exit(1) if argument not in self.arg_only or section not in self.arg_only[argument]: # Determine the arg value and source arg_value = getattr(self.args, argument) if argument in self._config_store_true and arg_value: passed_on_cmdline = True elif argument in self._config_store_false and not arg_value: passed_on_cmdline = True elif arg_value is not None: passed_on_cmdline = True else: passed_on_cmdline = False # Merge this argument into self.config if passed_on_cmdline and (argument in self.default_arguments['general'] or argument in self.default_arguments[entrypoint_name] or argument not in self.config[entrypoint_name]): self.config[section][argument] = arg_value self.config_source[section][argument] = 'argument' self.release_lock() def save_config(self): """Save the current configuration to the config file. """ self.log.debug("Saving config file to '%s'", str(self.config_file)) if not self.config_file: self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__) return self.acquire_lock() # Generate a sanitized version of our running configuration config = RawConfigParser() for section_name, section in self.config._config.items(): config.add_section(section_name) for option_name, value in section.items(): if section_name == 'general': if option_name in ['config_file']: continue if value is not None: config.set(section_name, option_name, str(value)) # Write out the config file config_dir = self.config_file.parent if not config_dir.exists(): config_dir.mkdir(parents=True, exist_ok=True) with NamedTemporaryFile(mode='w', dir=str(config_dir), delete=False) as tmpfile: config.write(tmpfile) # Move the new config file into place atomically if os.path.getsize(tmpfile.name) > 0: os.replace(tmpfile.name, str(self.config_file)) else: self.log.warning('Config file saving failed, not replacing %s with %s.', str(self.config_file), tmpfile.name) # Housekeeping self.release_lock() cli.log.info('Wrote configuration to %s', shlex.quote(str(self.config_file))) def __call__(self): """Execute the entrypoint function. """ if not self._inside_context_manager: # If they didn't use the context manager use it ourselves with self: return self.__call__() if not self._entrypoint: raise RuntimeError('No entrypoint provided!') return self._entrypoint(self) def entrypoint(self, description): """Set the entrypoint for when no subcommand is provided. """ if self._inside_context_manager: raise RuntimeError('You must run this before cli()!') self.acquire_lock() self.description = description self.release_lock() def entrypoint_func(handler): self.acquire_lock() self._entrypoint = handler self.release_lock() return handler return entrypoint_func def add_subcommand(self, handler, description, name=None, hidden=False, **kwargs): """Register a subcommand. If name is not provided we use `handler.__name__`. """ if self._inside_context_manager: raise RuntimeError('You must run this before the with statement!') if self._subparsers is None: self.add_subparsers(metavar="") if not name: name = handler.__name__.replace("_", "-") self.acquire_lock() if not hidden: self._subparsers.metavar = "{%s,%s}" % (self._subparsers.metavar[1:-1], name) if self._subparsers.metavar else "{%s%s}" % (self._subparsers.metavar[1:-1], name) kwargs['help'] = description self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs)) self.subcommands[name].set_defaults(entrypoint=handler) self.release_lock() return handler def subcommand(self, description, hidden=False, **kwargs): """Decorator to register a subcommand. """ def subcommand_function(handler): return self.add_subcommand(handler, description, hidden=hidden, **kwargs) return subcommand_function def setup_logging(self): """Called by __enter__() to setup the logging configuration. """ if len(logging.root.handlers) != 0: # MILC is the only thing that should have root log handlers logging.root.handlers = [] self.acquire_lock() if self.config['general']['verbose']: self.log_print_level = logging.DEBUG self.log_file = self.config['general']['log_file'] or self.log_file self.log_file_format = self.config['general']['log_file_fmt'] self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt']) self.log_format = self.config['general']['log_fmt'] if self.config.general.color: self.log_format = ANSIEmojiLoglevelFormatter(self.args.log_fmt, self.config.general.datetime_fmt) else: self.log_format = ANSIStrippingFormatter(self.args.log_fmt, self.config.general.datetime_fmt) if self.log_file: self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode) self.log_file_handler.setLevel(self.log_file_level) self.log_file_handler.setFormatter(self.log_file_format) logging.root.addHandler(self.log_file_handler) if self.log_print: self.log_print_handler = logging.StreamHandler(self.log_print_to) self.log_print_handler.setLevel(self.log_print_level) self.log_print_handler.setFormatter(self.log_format) logging.root.addHandler(self.log_print_handler) self.release_lock() def __enter__(self): if self._inside_context_manager: self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.') return self.acquire_lock() self._inside_context_manager = True self.release_lock() colorama.init() self.parse_args() self.merge_args_into_config() self.setup_logging() return self def __exit__(self, exc_type, exc_val, exc_tb): self.acquire_lock() self._inside_context_manager = False self.release_lock() if exc_type is not None and not isinstance(SystemExit(), exc_type): print(exc_type) logging.exception(exc_val) exit(255) cli = MILC() if __name__ == '__main__': @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean') @cli.entrypoint('My useful CLI tool with subcommands.') def main(cli): comma = ',' if cli.config.general.comma else '' cli.log.info('{bg_green}{fg_red}Hello%s World!', comma) @cli.argument('-n', '--name', help='Name to greet', default='World') @cli.subcommand('Description of hello subcommand here.') def hello(cli): comma = ',' if cli.config.general.comma else '' cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name) def goodbye(cli): comma = ',' if cli.config.general.comma else '' cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name) @cli.argument('-n', '--name', help='Name to greet', default='World') @cli.subcommand('Think a bit before greeting the user.') def thinking(cli): comma = ',' if cli.config.general.comma else '' spinner = cli.spinner(text='Just a moment...', spinner='earth') spinner.start() sleep(2) spinner.stop() with cli.spinner(text='Almost there!', spinner='moon'): sleep(2) cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name) @cli.subcommand('Show off our ANSI colors.') def pride(cli): cli.echo('{bg_red} ') cli.echo('{bg_lightred_ex} ') cli.echo('{bg_lightyellow_ex} ') cli.echo('{bg_green} ') cli.echo('{bg_blue} ') cli.echo('{bg_magenta} ') # You can register subcommands using decorators as seen above, or using functions like like this: cli.add_subcommand(goodbye, 'This will show up in --help output.') cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World') cli() # Automatically picks between main(), hello() and goodbye()