diff options
Diffstat (limited to 'lib/python/milc.py')
-rw-r--r-- | lib/python/milc.py | 778 |
1 files changed, 0 insertions, 778 deletions
diff --git a/lib/python/milc.py b/lib/python/milc.py deleted file mode 100644 index eb18984eb3..0000000000 --- a/lib/python/milc.py +++ /dev/null @@ -1,778 +0,0 @@ -#!/usr/bin/env python3 -# coding=utf-8 -"""MILC - A CLI Framework - -PYTHON_ARGCOMPLETE_OK - -MILC is an opinionated framework for writing CLI apps. It optimizes for the -most common unix tool pattern- small tools that are run from the command -line but generally do not feature any user interaction while they run. - -For more details see the MILC documentation: - - <https://github.com/clueboard/milc/tree/master/docs> -""" -from __future__ import division, print_function, unicode_literals -import argparse -import logging -import os -import re -import shlex -import sys -from decimal import Decimal -from pathlib import Path -from tempfile import NamedTemporaryFile -from time import sleep - -try: - from ConfigParser import RawConfigParser -except ImportError: - from configparser import RawConfigParser - -try: - import thread - import threading -except ImportError: - thread = None - -import argcomplete -import colorama -from appdirs import user_config_dir - -# Disable logging until we can configure it how the user wants -logging.basicConfig(stream=os.devnull) - -# Log Level Representations -EMOJI_LOGLEVELS = { - 'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}', - 'ERROR': '{fg_red}☒{style_reset_all}', - 'WARNING': '{fg_yellow}⚠{style_reset_all}', - 'INFO': '{fg_blue}ℹ{style_reset_all}', - 'DEBUG': '{fg_cyan}☐{style_reset_all}', - 'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯' -} -EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL'] -EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING'] -UNICODE_SUPPORT = sys.stdout.encoding.lower().startswith('utf') - -# ANSI Color setup -# Regex was gratefully borrowed from kfir on stackoverflow: -# https://stackoverflow.com/a/45448194 -ansi_regex = r'\x1b(' \ - r'(\[\??\d+[hl])|' \ - r'([=<>a-kzNM78])|' \ - r'([\(\)][a-b0-2])|' \ - r'(\[\d{0,2}[ma-dgkjqi])|' \ - r'(\[\d+;\d+[hfy]?)|' \ - r'(\[;?[hf])|' \ - r'(#[3-68])|' \ - r'([01356]n)|' \ - r'(O[mlnp-z]?)|' \ - r'(/Z)|' \ - r'(\d+)|' \ - r'(\[\?\d;\d0c)|' \ - r'(\d;\dR))' -ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE) -ansi_styles = ( - ('fg', colorama.ansi.AnsiFore()), - ('bg', colorama.ansi.AnsiBack()), - ('style', colorama.ansi.AnsiStyle()), -) -ansi_colors = {} - -for prefix, obj in ansi_styles: - for color in [x for x in obj.__dict__ if not x.startswith('_')]: - ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color) - - -def format_ansi(text): - """Return a copy of text with certain strings replaced with ansi. - """ - # Avoid .format() so we don't have to worry about the log content - for color in ansi_colors: - text = text.replace('{%s}' % color, ansi_colors[color]) - return text + ansi_colors['style_reset_all'] - - -class ANSIFormatter(logging.Formatter): - """A log formatter that inserts ANSI color. - """ - def format(self, record): - msg = super(ANSIFormatter, self).format(record) - return format_ansi(msg) - - -class ANSIEmojiLoglevelFormatter(ANSIFormatter): - """A log formatter that makes the loglevel an emoji on UTF capable terminals. - """ - def format(self, record): - if UNICODE_SUPPORT: - record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors) - return super(ANSIEmojiLoglevelFormatter, self).format(record) - - -class ANSIStrippingFormatter(ANSIFormatter): - """A log formatter that strips ANSI. - """ - def format(self, record): - msg = super(ANSIStrippingFormatter, self).format(record) - return ansi_escape.sub('', msg) - - -class Configuration(object): - """Represents the running configuration. - - This class never raises IndexError, instead it will return None if a - section or option does not yet exist. - """ - def __contains__(self, key): - return self._config.__contains__(key) - - def __iter__(self): - return self._config.__iter__() - - def __len__(self): - return self._config.__len__() - - def __repr__(self): - return self._config.__repr__() - - def keys(self): - return self._config.keys() - - def items(self): - return self._config.items() - - def values(self): - return self._config.values() - - def __init__(self, *args, **kwargs): - self._config = {} - - def __getattr__(self, key): - return self.__getitem__(key) - - def __getitem__(self, key): - """Returns a config section, creating it if it doesn't exist yet. - """ - if key not in self._config: - self.__dict__[key] = self._config[key] = ConfigurationSection(self) - - return self._config[key] - - def __setitem__(self, key, value): - self.__dict__[key] = value - self._config[key] = value - - def __delitem__(self, key): - if key in self.__dict__ and key[0] != '_': - del self.__dict__[key] - if key in self._config: - del self._config[key] - - -class ConfigurationSection(Configuration): - def __init__(self, parent, *args, **kwargs): - super(ConfigurationSection, self).__init__(*args, **kwargs) - self.parent = parent - - def __getitem__(self, key): - """Returns a config value, pulling from the `user` section as a fallback. - This is called when the attribute is accessed either via the get method or through [ ] index. - """ - if key in self._config and self._config.get(key) is not None: - return self._config[key] - - elif key in self.parent.user: - return self.parent.user[key] - - return None - - def __getattr__(self, key): - """Returns the config value from the `user` section. - This is called when the attribute is accessed via dot notation but does not exists. - """ - if key in self.parent.user: - return self.parent.user[key] - - return None - - -def handle_store_boolean(self, *args, **kwargs): - """Does the add_argument for action='store_boolean'. - """ - disabled_args = None - disabled_kwargs = kwargs.copy() - disabled_kwargs['action'] = 'store_false' - disabled_kwargs['dest'] = self.get_argument_name(*args, **kwargs) - disabled_kwargs['help'] = 'Disable ' + kwargs['help'] - kwargs['action'] = 'store_true' - kwargs['help'] = 'Enable ' + kwargs['help'] - - for flag in args: - if flag[:2] == '--': - disabled_args = ('--no-' + flag[2:],) - break - - self.add_argument(*args, **kwargs) - self.add_argument(*disabled_args, **disabled_kwargs) - - return (args, kwargs, disabled_args, disabled_kwargs) - - -class SubparserWrapper(object): - """Wrap subparsers so we can track what options the user passed. - """ - def __init__(self, cli, submodule, subparser): - self.cli = cli - self.submodule = submodule - self.subparser = subparser - - for attr in dir(subparser): - if not hasattr(self, attr): - setattr(self, attr, getattr(subparser, attr)) - - def completer(self, completer): - """Add an arpcomplete completer to this subcommand. - """ - self.subparser.completer = completer - - def add_argument(self, *args, **kwargs): - """Add an argument for this subcommand. - - This also stores the default for the argument in `self.cli.default_arguments`. - """ - if kwargs.get('action') == 'store_boolean': - # Store boolean will call us again with the enable/disable flag arguments - return handle_store_boolean(self, *args, **kwargs) - - self.cli.acquire_lock() - argument_name = self.cli.get_argument_name(*args, **kwargs) - - self.subparser.add_argument(*args, **kwargs) - - if kwargs.get('action') == 'store_false': - self.cli._config_store_false.append(argument_name) - - if kwargs.get('action') == 'store_true': - self.cli._config_store_true.append(argument_name) - - if self.submodule not in self.cli.default_arguments: - self.cli.default_arguments[self.submodule] = {} - self.cli.default_arguments[self.submodule][argument_name] = kwargs.get('default') - self.cli.release_lock() - - -class MILC(object): - """MILC - An Opinionated Batteries Included Framework - """ - def __init__(self): - """Initialize the MILC object. - - version - The version string to associate with your CLI program - """ - # Setup a lock for thread safety - self._lock = threading.RLock() if thread else None - - # Define some basic info - self.acquire_lock() - self._config_store_true = [] - self._config_store_false = [] - self._description = None - self._entrypoint = None - self._inside_context_manager = False - self.ansi = ansi_colors - self.arg_only = {} - self.config = self.config_source = None - self.config_file = None - self.default_arguments = {} - self.version = 'unknown' - self.release_lock() - - # Figure out our program name - self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0] - self.prog_name = self.prog_name.split('/')[-1] - - # Initialize all the things - self.read_config_file() - self.initialize_argparse() - self.initialize_logging() - - @property - def description(self): - return self._description - - @description.setter - def description(self, value): - self._description = self._arg_parser.description = value - - def echo(self, text, *args, **kwargs): - """Print colorized text to stdout. - - ANSI color strings (such as {fg-blue}) will be converted into ANSI - escape sequences, and the ANSI reset sequence will be added to all - strings. - - If *args or **kwargs are passed they will be used to %-format the strings. - """ - if args and kwargs: - raise RuntimeError('You can only specify *args or **kwargs, not both!') - - args = args or kwargs - text = format_ansi(text) - - print(text % args) - - def initialize_argparse(self): - """Prepare to process arguments from sys.argv. - """ - kwargs = { - 'fromfile_prefix_chars': '@', - 'conflict_handler': 'resolve', - } - - self.acquire_lock() - self.subcommands = {} - self._subparsers = None - self.argwarn = argcomplete.warn - self.args = None - self._arg_parser = argparse.ArgumentParser(**kwargs) - self.set_defaults = self._arg_parser.set_defaults - self.print_usage = self._arg_parser.print_usage - self.print_help = self._arg_parser.print_help - self.release_lock() - - def completer(self, completer): - """Add an argcomplete completer to this subcommand. - """ - self._arg_parser.completer = completer - - def add_argument(self, *args, **kwargs): - """Wrapper to add arguments and track whether they were passed on the command line. - """ - if 'action' in kwargs and kwargs['action'] == 'store_boolean': - return handle_store_boolean(self, *args, **kwargs) - - self.acquire_lock() - - self._arg_parser.add_argument(*args, **kwargs) - if 'general' not in self.default_arguments: - self.default_arguments['general'] = {} - self.default_arguments['general'][self.get_argument_name(*args, **kwargs)] = kwargs.get('default') - - self.release_lock() - - def initialize_logging(self): - """Prepare the defaults for the logging infrastructure. - """ - self.acquire_lock() - self.log_file = None - self.log_file_mode = 'a' - self.log_file_handler = None - self.log_print = True - self.log_print_to = sys.stderr - self.log_print_level = logging.INFO - self.log_file_level = logging.DEBUG - self.log_level = logging.INFO - self.log = logging.getLogger(self.__class__.__name__) - self.log.setLevel(logging.DEBUG) - logging.root.setLevel(logging.DEBUG) - self.release_lock() - - self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit') - self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose') - self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes') - self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output') - self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.') - self.add_argument('--log-file', help='File to write log messages to') - self.add_argument('--color', action='store_boolean', default=True, help='color in output') - self.add_argument('--config-file', help='The location for the configuration file') - self.arg_only['config_file'] = ['general'] - - def add_subparsers(self, title='Sub-commands', **kwargs): - if self._inside_context_manager: - raise RuntimeError('You must run this before the with statement!') - - self.acquire_lock() - self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs) - self.release_lock() - - def acquire_lock(self): - """Acquire the MILC lock for exclusive access to properties. - """ - if self._lock: - self._lock.acquire() - - def release_lock(self): - """Release the MILC lock. - """ - if self._lock: - self._lock.release() - - def find_config_file(self): - """Locate the config file. - """ - if self.config_file: - return self.config_file - - if '--config-file' in sys.argv: - return Path(sys.argv[sys.argv.index('--config-file') + 1]).expanduser().resolve() - - filedir = user_config_dir(appname='qmk', appauthor='QMK') - filename = '%s.ini' % self.prog_name - return Path(filedir) / filename - - def get_argument_name(self, *args, **kwargs): - """Takes argparse arguments and returns the dest name. - """ - try: - return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest'] - except ValueError: - return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest'] - - def argument(self, *args, **kwargs): - """Decorator to call self.add_argument or self.<subcommand>.add_argument. - """ - if self._inside_context_manager: - raise RuntimeError('You must run this before the with statement!') - - def argument_function(handler): - subcommand_name = handler.__name__.replace("_", "-") - - if kwargs.get('arg_only'): - arg_name = self.get_argument_name(*args, **kwargs) - if arg_name not in self.arg_only: - self.arg_only[arg_name] = [] - self.arg_only[arg_name].append(subcommand_name) - del kwargs['arg_only'] - - if handler is self._entrypoint: - self.add_argument(*args, **kwargs) - - elif subcommand_name in self.subcommands: - self.subcommands[subcommand_name].add_argument(*args, **kwargs) - - else: - raise RuntimeError('Decorated function is not entrypoint or subcommand!') - - return handler - - return argument_function - - def arg_passed(self, arg): - """Returns True if arg was passed on the command line. - """ - return self.default_arguments.get(arg) != self.args[arg] - - def parse_args(self): - """Parse the CLI args. - """ - if self.args: - self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!') - return - - argcomplete.autocomplete(self._arg_parser) - - self.acquire_lock() - self.args = self._arg_parser.parse_args() - - if 'entrypoint' in self.args: - self._entrypoint = self.args.entrypoint - - self.release_lock() - - def read_config_file(self): - """Read in the configuration file and store it in self.config. - """ - self.acquire_lock() - self.config = Configuration() - self.config_source = Configuration() - self.config_file = self.find_config_file() - - if self.config_file and self.config_file.exists(): - config = RawConfigParser(self.config) - config.read(str(self.config_file)) - - # Iterate over the config file options and write them into self.config - for section in config.sections(): - for option in config.options(section): - value = config.get(section, option) - - # Coerce values into useful datatypes - if value.lower() in ['1', 'yes', 'true', 'on']: - value = True - elif value.lower() in ['0', 'no', 'false', 'off']: - value = False - elif value.lower() in ['none']: - continue - elif value.replace('.', '').isdigit(): - if '.' in value: - value = Decimal(value) - else: - value = int(value) - - self.config[section][option] = value - self.config_source[section][option] = 'config_file' - - self.release_lock() - - def merge_args_into_config(self): - """Merge CLI arguments into self.config to create the runtime configuration. - """ - self.acquire_lock() - for argument in vars(self.args): - if argument in ('subparsers', 'entrypoint'): - continue - - # Find the argument's section - # Underscores in command's names are converted to dashes during initialization. - # TODO(Erovia) Find a better solution - entrypoint_name = self._entrypoint.__name__.replace("_", "-") - if entrypoint_name in self.default_arguments and argument in self.default_arguments[entrypoint_name]: - argument_found = True - section = self._entrypoint.__name__ - if argument in self.default_arguments['general']: - argument_found = True - section = 'general' - - if not argument_found: - raise RuntimeError('Could not find argument in `self.default_arguments`. This should be impossible!') - exit(1) - - if argument not in self.arg_only or section not in self.arg_only[argument]: - # Determine the arg value and source - arg_value = getattr(self.args, argument) - if argument in self._config_store_true and arg_value: - passed_on_cmdline = True - elif argument in self._config_store_false and not arg_value: - passed_on_cmdline = True - elif arg_value is not None: - passed_on_cmdline = True - else: - passed_on_cmdline = False - - # Merge this argument into self.config - if passed_on_cmdline and (argument in self.default_arguments['general'] or argument in self.default_arguments[entrypoint_name] or argument not in self.config[entrypoint_name]): - self.config[section][argument] = arg_value - self.config_source[section][argument] = 'argument' - - self.release_lock() - - def save_config(self): - """Save the current configuration to the config file. - """ - self.log.debug("Saving config file to '%s'", str(self.config_file)) - - if not self.config_file: - self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__) - return - - self.acquire_lock() - - # Generate a sanitized version of our running configuration - config = RawConfigParser() - for section_name, section in self.config._config.items(): - config.add_section(section_name) - for option_name, value in section.items(): - if section_name == 'general': - if option_name in ['config_file']: - continue - if value is not None: - config.set(section_name, option_name, str(value)) - - # Write out the config file - config_dir = self.config_file.parent - if not config_dir.exists(): - config_dir.mkdir(parents=True, exist_ok=True) - - with NamedTemporaryFile(mode='w', dir=str(config_dir), delete=False) as tmpfile: - config.write(tmpfile) - - # Move the new config file into place atomically - if os.path.getsize(tmpfile.name) > 0: - os.replace(tmpfile.name, str(self.config_file)) - else: - self.log.warning('Config file saving failed, not replacing %s with %s.', str(self.config_file), tmpfile.name) - - # Housekeeping - self.release_lock() - cli.log.info('Wrote configuration to %s', shlex.quote(str(self.config_file))) - - def __call__(self): - """Execute the entrypoint function. - """ - if not self._inside_context_manager: - # If they didn't use the context manager use it ourselves - with self: - return self.__call__() - - if not self._entrypoint: - raise RuntimeError('No entrypoint provided!') - - return self._entrypoint(self) - - def entrypoint(self, description): - """Set the entrypoint for when no subcommand is provided. - """ - if self._inside_context_manager: - raise RuntimeError('You must run this before cli()!') - - self.acquire_lock() - self.description = description - self.release_lock() - - def entrypoint_func(handler): - self.acquire_lock() - self._entrypoint = handler - self.release_lock() - - return handler - - return entrypoint_func - - def add_subcommand(self, handler, description, name=None, hidden=False, **kwargs): - """Register a subcommand. - - If name is not provided we use `handler.__name__`. - """ - - if self._inside_context_manager: - raise RuntimeError('You must run this before the with statement!') - - if self._subparsers is None: - self.add_subparsers(metavar="") - - if not name: - name = handler.__name__.replace("_", "-") - - self.acquire_lock() - if not hidden: - self._subparsers.metavar = "{%s,%s}" % (self._subparsers.metavar[1:-1], name) if self._subparsers.metavar else "{%s%s}" % (self._subparsers.metavar[1:-1], name) - kwargs['help'] = description - self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs)) - self.subcommands[name].set_defaults(entrypoint=handler) - - self.release_lock() - - return handler - - def subcommand(self, description, hidden=False, **kwargs): - """Decorator to register a subcommand. - """ - def subcommand_function(handler): - return self.add_subcommand(handler, description, hidden=hidden, **kwargs) - - return subcommand_function - - def setup_logging(self): - """Called by __enter__() to setup the logging configuration. - """ - if len(logging.root.handlers) != 0: - # MILC is the only thing that should have root log handlers - logging.root.handlers = [] - - self.acquire_lock() - - if self.config['general']['verbose']: - self.log_print_level = logging.DEBUG - - self.log_file = self.config['general']['log_file'] or self.log_file - self.log_file_format = self.config['general']['log_file_fmt'] - self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt']) - self.log_format = self.config['general']['log_fmt'] - - if self.config.general.color: - self.log_format = ANSIEmojiLoglevelFormatter(self.args.log_fmt, self.config.general.datetime_fmt) - else: - self.log_format = ANSIStrippingFormatter(self.args.log_fmt, self.config.general.datetime_fmt) - - if self.log_file: - self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode) - self.log_file_handler.setLevel(self.log_file_level) - self.log_file_handler.setFormatter(self.log_file_format) - logging.root.addHandler(self.log_file_handler) - - if self.log_print: - self.log_print_handler = logging.StreamHandler(self.log_print_to) - self.log_print_handler.setLevel(self.log_print_level) - self.log_print_handler.setFormatter(self.log_format) - logging.root.addHandler(self.log_print_handler) - - self.release_lock() - - def __enter__(self): - if self._inside_context_manager: - self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.') - return - - self.acquire_lock() - self._inside_context_manager = True - self.release_lock() - - colorama.init() - self.parse_args() - self.merge_args_into_config() - self.setup_logging() - - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.acquire_lock() - self._inside_context_manager = False - self.release_lock() - - if exc_type is not None and not isinstance(SystemExit(), exc_type): - print(exc_type) - logging.exception(exc_val) - exit(255) - - -cli = MILC() - -if __name__ == '__main__': - - @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean') - @cli.entrypoint('My useful CLI tool with subcommands.') - def main(cli): - comma = ',' if cli.config.general.comma else '' - cli.log.info('{bg_green}{fg_red}Hello%s World!', comma) - - @cli.argument('-n', '--name', help='Name to greet', default='World') - @cli.subcommand('Description of hello subcommand here.') - def hello(cli): - comma = ',' if cli.config.general.comma else '' - cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name) - - def goodbye(cli): - comma = ',' if cli.config.general.comma else '' - cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name) - - @cli.argument('-n', '--name', help='Name to greet', default='World') - @cli.subcommand('Think a bit before greeting the user.') - def thinking(cli): - comma = ',' if cli.config.general.comma else '' - spinner = cli.spinner(text='Just a moment...', spinner='earth') - spinner.start() - sleep(2) - spinner.stop() - - with cli.spinner(text='Almost there!', spinner='moon'): - sleep(2) - - cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name) - - @cli.subcommand('Show off our ANSI colors.') - def pride(cli): - cli.echo('{bg_red} ') - cli.echo('{bg_lightred_ex} ') - cli.echo('{bg_lightyellow_ex} ') - cli.echo('{bg_green} ') - cli.echo('{bg_blue} ') - cli.echo('{bg_magenta} ') - - # You can register subcommands using decorators as seen above, or using functions like like this: - cli.add_subcommand(goodbye, 'This will show up in --help output.') - cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World') - - cli() # Automatically picks between main(), hello() and goodbye() |