# -*- coding: utf-8 -*- # vim: set ts=4 sw=4 et ai: """ | This file is part of the web2py Web Framework | Copyrighted by Massimo Di Pierro | License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) Command line interface ---------------------- The processing of all command line arguments is done using the argparse library in the console function. The basic principle is to process and check for all options in a single place, this place is the parse_args function. Notice that when I say all options I mean really all, options sourced from a configuration file are included. A brief summary of options style follows, for the benefit of code maintainers/developers: - use the underscore to split words in long names (as in '--run_system_tests') - remember to allow the '-' too as word separator (e.g. '--run-system-tests') but do not use this form on help (add the minus version of the option to _omitted_opts to hide it in usage help) - prefer short names on help messages, instead use all options names in warning/error messages (e.g. '-R/--run requires -S/--shell') Notice that options must be included into opt_map dictionary (defined in parse_args function) to be available in configuration file. """ from __future__ import print_function __author__ = "Paolo Pastori" import argparse import ast import copy import logging import os.path import re import socket import sys from collections import OrderedDict from gluon._compat import PY2 from gluon.settings import global_settings from gluon.shell import die from gluon.utils import is_valid_ip_address def warn(msg): print("%s: warning: %s" % (sys.argv[0], msg), file=sys.stderr) def is_appdir(applications_parent, app): return os.path.isdir(os.path.join(applications_parent, "applications", app)) def console(version): """ Load command line options. Trivial -h/--help and --version options are also processed. Returns a namespace object (in the sense of argparse) with all options loaded. """ # replacement hints for deprecated options deprecated_opts = { "--debug": "--log_level", "--nogui": "--no_gui", "--ssl_private_key": "--server_key", "--ssl_certificate": "--server_cert", "--interfaces": None, # dest is 'interfaces', hint is '--interface' "-n": "--min_threads", "--numthreads": "--min_threads", "--minthreads": "--min_threads", "--maxthreads": "--max_threads", "-z": None, "--shutdown_timeout": None, "--profiler": "--profiler_dir", "--run-cron": "--with_cron", "--softcron": "--soft_cron", "--cron": "--cron_run", "--test": "--run_doctests", } class HelpFormatter2(argparse.HelpFormatter): """Hides the options listed in _hidden_options in usage help.""" # NOTE: preferred style for long options name is to use '_' # between words (as in 'no_gui'), also accept the '-' in # most of the options but do not show both versions on help _omitted_opts = ( "--add-options", "--errors-to-console", "--no-banner", "--log-level", "--no-gui", "--import-models", "--force-migrate", "--server-name", "--server-key", "--server-cert", "--ca-cert", "--pid-filename", "--log-filename", "--min-threads", "--max-threads", "--request-queue-size", "--socket-timeout", "--profiler-dir", "--with-scheduler", "--with-cron", "--cron-threads", "--soft-cron", "--cron-run", "--run-doctests", "--run-system-tests", "--with-coverage", ) _hidden_options = _omitted_opts + tuple(deprecated_opts.keys()) def _format_action_invocation(self, action): if not action.option_strings: return super(HelpFormatter2, self)._format_action_invocation(action) parts = [] if action.nargs == 0: parts.extend( filter( lambda o: o not in self._hidden_options, action.option_strings ) ) else: default = action.dest.upper() args_string = self._format_args(action, default) for option_string in action.option_strings: if option_string in self._hidden_options: continue parts.append("%s %s" % (option_string, args_string)) return ", ".join(parts) class ExtendAction(argparse._AppendAction): """Action to accumulate values in a flat list.""" def __call__(self, parser, namespace, values, option_string=None): if isinstance(values, list): # must copy to avoid altering the option default value value = getattr(namespace, self.dest, None) if value is None: value = [] setattr(namespace, self.dest, value) items = value[:] # for options that allows multiple args (i.e. those declared # with add_argument(..., nargs='+', ...)) the values are # always placed into a list while len(values) == 1 and isinstance(values[0], list): values = values[0] items.extend(values) setattr(namespace, self.dest, items) else: super(ExtendAction, self).__call__( parser, namespace, values, option_string ) parser = argparse.ArgumentParser( usage="python %(prog)s [options]", description="web2py Web Framework startup script.", epilog="""NOTE: unless a password is specified (-a 'passwd') web2py will attempt to run a GUI to ask for it when starting the web server (if not disabled with --no_gui).""", formatter_class=HelpFormatter2, add_help=False, ) # do not add -h/--help option # global options g = parser.add_argument_group("global options") g.add_argument( "-h", "--help", action="help", help="show this help message and exit" ) g.add_argument( "--version", action="version", version=version, help="show program's version and exit", ) folder = os.getcwd() g.add_argument( "-f", "--folder", default=folder, metavar="WEB2PY_DIR", help="web2py installation directory (%(default)s)", ) def existing_file(v): if not v: raise argparse.ArgumentTypeError("empty argument") if not os.path.exists(v): raise argparse.ArgumentTypeError("file %r not found" % v) return v g.add_argument( "-L", "--config", type=existing_file, metavar="PYTHON_FILE", help="read all options from PYTHON_FILE", ) g.add_argument( "--add_options", "--add-options", default=False, action="store_true", help="add options to existing ones, useful with -L only", ) g.add_argument( "-a", "--password", default="", help='password to be used for administration (use "" ' "to reuse the last password), when no password is available " "the administrative web interface will be disabled", ) g.add_argument( "-e", "--errors_to_console", "--errors-to-console", default=False, action="store_true", help="log application errors to console", ) g.add_argument( "--no_banner", "--no-banner", default=False, action="store_true", help="do not print header banner", ) g.add_argument( "-Q", "--quiet", default=False, action="store_true", help="disable all output" ) integer_log_level = [] def log_level(v): # try to convert a lgging level name to its numeric value, # could use logging.getLevelName but not with # 3.4 <= Python < 3.4.2, see # https://docs.python.org/3/library/logging.html#logging.getLevelName) try: name2level = logging._levelNames except AttributeError: # logging._levelNames has gone with Python 3.4, see # https://github.com/python/cpython/commit/3b84eae03ebd8122fdbdced3d85999dd9aedfc7e name2level = logging._nameToLevel try: return name2level[v.upper()] except KeyError: pass try: ill = int(v) # value deprecated: integer in range(101) if 0 <= ill <= 100: integer_log_level.append(ill) return ill except ValueError: pass raise argparse.ArgumentTypeError("bad level %r" % v) g.add_argument( "-D", "--log_level", "--log-level", "--debug", # deprecated default="WARNING", type=log_level, metavar="LOG_LEVEL", help="set log level, allowed values are: NOTSET, DEBUG, INFO, WARN, " "WARNING, ERROR, and CRITICAL, also lowercase (default is " "%(default)s)", ) # GUI options g = parser.add_argument_group("GUI options") g.add_argument( "--no_gui", "--no-gui", "--nogui", # deprecated default=False, action="store_true", help="do not run GUI", ) g.add_argument( "-t", "--taskbar", default=False, action="store_true", help="run in taskbar (system tray)", ) # console options g = parser.add_argument_group("console options") g.add_argument( "-S", "--shell", metavar="APP_ENV", help="run web2py in Python interactive shell or IPython (if installed) " "with specified application environment (if application does not " "exist it will be created). APP_ENV like a/c/f?x=y (c, f and vars " "optional), if APP_ENV include the action f then after the " "action execution the interpreter is exited", ) g.add_argument( "-B", "--bpython", default=False, action="store_true", help="use bpython (if installed) when running in interactive shell, " "see -S above", ) g.add_argument( "-P", "--plain", default=False, action="store_true", help="use plain Python shell when running in interactive shell, " "see -S above", ) g.add_argument( "-M", "--import_models", "--import-models", default=False, action="store_true", help="auto import model files when running in interactive shell " "(default is %(default)s), see -S above. NOTE: when the APP_ENV " "argument of -S include a controller c automatic import of " "models is always enabled", ) g.add_argument( "--fake_migrate", default=False, action="store_true", help="force DAL to fake migrate all tables; " "monkeypatch in the DAL class to force _fake_migrate=True", ) g.add_argument( "--force_migrate", "--force-migrate", default=False, action="store_true", help="force DAL to migrate all tables that should be migrated when enabled; " "monkeypatch in the DAL class to force _migrate_enabled=True", ) g.add_argument( "-R", "--run", type=existing_file, metavar="PYTHON_FILE", help="run PYTHON_FILE in web2py environment; require -S", ) g.add_argument( "-A", "--args", default=[], nargs=argparse.REMAINDER, help="use this to pass arguments to the PYTHON_FILE above; require " "-R. NOTE: must be the last option because eat all remaining " "arguments", ) # web server options g = parser.add_argument_group("web server options") g.add_argument( "-s", "--server_name", "--server-name", default=socket.gethostname(), help="web server name (%(default)s)", ) def ip_addr(v): if not is_valid_ip_address(v): raise argparse.ArgumentTypeError("bad IP address %s" % v) return v g.add_argument( "-i", "--ip", default="127.0.0.1", type=ip_addr, metavar="IP_ADDR", help="IP address of the server (%(default)s), accept either IPv4 or " "IPv6 (e.g. ::1) addresses. NOTE: this option is ignored if " "--interface is specified", ) def not_negative_int(v, err_label="value"): try: iv = int(v) if iv < 0: raise ValueError() return iv except ValueError: pass raise argparse.ArgumentTypeError("bad %s %s" % (err_label, v)) def port(v): return not_negative_int(v, err_label="port") g.add_argument( "-p", "--port", default=8000, type=port, metavar="NUM", help="port of server (%(default)d). " "NOTE: this option is ignored if --interface is specified", ) g.add_argument( "-k", "--server_key", "--server-key", "--ssl_private_key", # deprecated type=existing_file, metavar="FILE", help="server private key", ) g.add_argument( "-c", "--server_cert", "--server-cert", "--ssl_certificate", # deprecated type=existing_file, metavar="FILE", help="server certificate", ) g.add_argument( "--ca_cert", "--ca-cert", type=existing_file, metavar="FILE", help="CA certificate", ) def iface(v, sep=","): if not v: raise argparse.ArgumentTypeError("empty argument") if sep == ":": # deprecated --interfaces ip:port:key:cert:ca_cert # IPv6 addresses in square brackets if v.startswith("["): # IPv6 ip, v_remainder = v.split("]", 1) ip = ip[1:] ifp = v_remainder[1:].split(":") ifp.insert(0, ip) else: # IPv4 ifp = v.split(":") else: # --interface ifp = v.split(sep, 5) if not len(ifp) in (2, 4, 5): raise argparse.ArgumentTypeError("bad interface %r" % v) try: ip_addr(ifp[0]) ifp[1] = port(ifp[1]) for fv in ifp[2:]: existing_file(fv) except argparse.ArgumentTypeError as ex: raise argparse.ArgumentTypeError("bad interface %r (%s)" % (v, ex)) return tuple(ifp) g.add_argument( "--interface", dest="interfaces", default=[], action=ExtendAction, type=iface, nargs="+", metavar="IF_INFO", help="listen on specified interface, IF_INFO = " "IP_ADDR,PORT[,KEY_FILE,CERT_FILE[,CA_CERT_FILE]]." " NOTE: this option can be used multiple times to provide additional " "interfaces to choose from but you can choose which one to listen to " "only using the GUI otherwise the first interface specified is used", ) def ifaces(v): # deprecated --interfaces 'if1;if2;...' if not v: raise argparse.ArgumentTypeError("empty argument") return [iface(i, ":") for i in v.split(";")] g.add_argument( "--interfaces", # deprecated default=argparse.SUPPRESS, # do not set if absent action=ExtendAction, type=ifaces, help=argparse.SUPPRESS, ) # do not show on help g.add_argument( "-d", "--pid_filename", "--pid-filename", default="httpserver.pid", metavar="FILE", help="server pid file (%(default)s)", ) g.add_argument( "-l", "--log_filename", "--log-filename", default="httpserver.log", metavar="FILE", help="server log file (%(default)s)", ) g.add_argument( "--min_threads", "--min-threads", "--minthreads", "-n", "--numthreads", # deprecated type=not_negative_int, metavar="NUM", help="minimum number of server threads", ) g.add_argument( "--max_threads", "--max-threads", "--maxthreads", # deprecated type=not_negative_int, metavar="NUM", help="maximum number of server threads", ) g.add_argument( "-q", "--request_queue_size", "--request-queue-size", default=5, type=not_negative_int, metavar="NUM", help="max number of queued requests when server busy (%(default)d)", ) g.add_argument( "-o", "--timeout", default=10, type=not_negative_int, metavar="SECONDS", help="timeout for individual request (%(default)d seconds)", ) g.add_argument( "--socket_timeout", "--socket-timeout", default=5, type=not_negative_int, metavar="SECONDS", help="timeout for socket (%(default)d seconds)", ) g.add_argument( "-z", "--shutdown_timeout", # deprecated type=not_negative_int, help=argparse.SUPPRESS, ) # do not show on help g.add_argument( "-F", "--profiler_dir", "--profiler-dir", "--profiler", # deprecated help="profiler directory", ) # scheduler options g = parser.add_argument_group("scheduler options") g.add_argument( "-X", "--with_scheduler", "--with-scheduler", default=False, action="store_true", help="run schedulers alongside web server; require --K", ) def is_app(app): return is_appdir(folder, app) def scheduler(v): if not v: raise argparse.ArgumentTypeError("empty argument") if "," in v: # legacy "app1,..." vl = [n.strip() for n in v.split(",")] return [scheduler(iv) for iv in vl] vp = [n.strip() for n in v.split(":")] app = vp[0] if not app: raise argparse.ArgumentTypeError("empty application") if not is_app(app): warn("argument -K/--scheduler: bad application %r, skipped" % app) return None return ":".join(filter(None, vp)) g.add_argument( "-K", "--scheduler", dest="schedulers", default=[], action=ExtendAction, type=scheduler, nargs="+", metavar="APP_INFO", help="run scheduler for the specified application(s), APP_INFO = " "APP_NAME[:GROUPS], that is an optional list of groups can follow " "the application name (e.g. app:group1:group2); require a scheduler " "to be defined in the application's models. NOTE: this option can " "be used multiple times to add schedulers", ) # cron options g = parser.add_argument_group("cron options") g.add_argument( "-Y", "--with_cron", "--with-cron", "--run-cron", # deprecated default=False, action="store_true", help="run cron service alongside web server", ) def crontab(v): if not v: raise argparse.ArgumentTypeError("empty argument") if not is_app(v): warn("argument --crontab: bad application %r, skipped" % v) return None return v g.add_argument( "--crontab", dest="crontabs", default=[], action=ExtendAction, type=crontab, nargs="+", metavar="APP_NAME", help="tell cron to read the crontab for the specified application(s) " "only, the default behaviour is to read the crontab for all of the " "installed applications. NOTE: this option can be used multiple " "times to build the list of crontabs to be processed by cron", ) def positive_int(v, err_label="value"): try: iv = int(v) if iv <= 0: raise ValueError() return iv except ValueError: pass raise argparse.ArgumentTypeError("bad %s %s" % (err_label, v)) def cron_threads(v): return positive_int(v, err_label="cron_threads") g.add_argument( "--cron_threads", "--cron-threads", type=cron_threads, metavar="NUM", help="maximum number of cron threads (5)", ) g.add_argument( "--soft_cron", "--soft-cron", "--softcron", # deprecated default=False, action="store_true", help="use cron software emulation instead of separate cron process; " "require -Y. NOTE: use of cron software emulation is strongly " "discouraged", ) g.add_argument( "-C", "--cron_run", "--cron-run", "--cron", # deprecated default=False, action="store_true", help="trigger a cron run and exit; usually used when invoked " "from a system (external) crontab", ) g.add_argument( "--cron_job", # NOTE: this is intended for internal use only default=False, action="store_true", help=argparse.SUPPRESS, ) # do not show on help # test options g = parser.add_argument_group("test options") g.add_argument( "-v", "--verbose", default=False, action="store_true", help="increase verbosity" ) g.add_argument( "-T", "--run_doctests", "--run-doctests", "--test", # deprecated metavar="APP_ENV", help="run doctests in application environment. APP_ENV like a/c/f (c, f " "optional)", ) g.add_argument( "--run_system_tests", "--run-system-tests", default=False, action="store_true", help="run web2py test suite", ) g.add_argument( "--with_coverage", "--with-coverage", default=False, action="store_true", help="collect coverage data when used with --run_system_tests; " "require Python 2.7+ and the coverage module installed", ) # other options g = parser.add_argument_group("other options") g.add_argument( "-G", "--GAE", dest="gae", metavar="APP_NAME", help="will create app.yaml and gaehandler.py and exit", ) options = parse_args(parser, sys.argv[1:], deprecated_opts, integer_log_level) # make a copy of all options for global_settings copy_options = copy.deepcopy(options) copy_options.password = "******" global_settings.cmd_options = copy_options return options REGEX_PEP263 = r"^[ \t\f]*#.*?coding[:=][ \t]*([-_.a-zA-Z0-9]+)" def get_pep263_encoding(source): """ Read python source file encoding, according to PEP 263, see https://www.python.org/dev/peps/pep-0263/ """ with open(source, "r") as sf: l12 = (sf.readline(), sf.readline()) m12 = re.match(REGEX_PEP263, l12[0]) or re.match(REGEX_PEP263, l12[1]) return m12 and m12.group(1) IGNORE = lambda: None def load_config(config_file, opt_map): """ Load options from config file (a Python script). config_file(str): file name opt_map(dict): mapping fom option name (key) to callable (val), used to post-process parsed value for the option Notice that the configuring Python script is never executed/imported, instead the ast library is used to evaluate each option assignment, provided that it is written on a single line. Returns an OrderedDict with sourced options. """ REGEX_ASSIGN_EXP = re.compile(r"\s*=\s*(.+)") map_items = opt_map.items() # preserve the order of loaded options even though this is not needed pl = OrderedDict() config_encoding = get_pep263_encoding(config_file) # NOTE: assume 'ascii' encoding when not explicitly stated (Python 2), # this is not correct for Python 3 where the default is 'utf-8' open_kwargs = dict() if PY2 else dict(encoding=config_encoding or "ascii") with open(config_file, "r", **open_kwargs) as cfil: for linenum, clin in enumerate(cfil, start=1): if PY2 and config_encoding: clin = unicode(clin, config_encoding) clin = clin.strip() for opt, mapr in map_items: if clin.startswith(opt): m = REGEX_ASSIGN_EXP.match(clin[len(opt) :]) if m is None: continue try: val = opt_map[opt](ast.literal_eval(m.group(1))) except: die( "cannot parse config file %r at line %d" % (config_file, linenum) ) if val is not IGNORE: pl[opt] = val return pl def parse_args(parser, cli_args, deprecated_opts, integer_log_level, namespace=None): # print('PARSING ARGS:', cli_args) del integer_log_level[:] options = parser.parse_args(cli_args, namespace) # print('PARSED OPTIONS:', options) # warn for deprecated options deprecated_args = [a for a in cli_args if a in deprecated_opts] for da in deprecated_args: # verify if it was a real option by looking into # parsed values for the actual destination hint = deprecated_opts[da] dest = (hint or da).lstrip("-") default = parser.get_default(dest) if da == "--interfaces": hint = "--interface" if getattr(options, dest) is not default: # the option has been specified msg = "%s is deprecated" % da if hint: msg += ", use %s instead" % hint warn(msg) # warn for deprecated values if integer_log_level and "--debug" not in deprecated_args: warn("integer argument for -D/--log_level is deprecated, " "use label instead") # fix schedulers and die if all were skipped if None in options.schedulers: options.schedulers = [i for i in options.schedulers if i is not None] if not options.schedulers: die("no scheduler left") # fix crontabs and die if all were skipped if None in options.crontabs: options.crontabs = [i for i in options.crontabs if i is not None] if not options.crontabs: die("no crontab left") # taskbar if options.taskbar and os.name != "nt": warn("--taskbar not supported on this platform, skipped") options.taskbar = False # options consistency checkings if options.run and not options.shell: die("-R/--run requires -S/--shell", exit_status=2) if options.args and not options.run: die("-A/--args requires -R/--run", exit_status=2) if options.with_scheduler and not options.schedulers: die("-X/--with_scheduler requires -K/--scheduler", exit_status=2) if options.soft_cron and not options.with_cron: die("--soft_cron requires -Y/--with_cron", exit_status=2) if options.shell: for o, os in dict( with_scheduler="-X/--with_scheduler", schedulers="-K/--scheduler", with_cron="-Y/--with_cron", cron_run="-C/--cron_run", run_doctests="-T/--run_doctests", run_system_tests="--run_system_tests", ).items(): if getattr(options, o): die("-S/--shell and %s are conflicting options" % os, exit_status=2) if options.bpython and options.plain: die("-B/--bpython and -P/--plain are conflicting options", exit_status=2) if options.cron_run: for o, os in dict( with_cron="-Y/--with_cron", run_doctests="-T/--run_doctests", run_system_tests="--run_system_tests", ).items(): if getattr(options, o): die("-C/--cron_run and %s are conflicting options" % os, exit_status=2) if options.run_doctests and options.run_system_tests: die( "-T/--run_doctests and --run_system_tests are conflicting options", exit_status=2, ) if options.config: # load options from file, # all options sourced from file that evaluates to False # are skipped, the special IGNORE value is used for this store_true = lambda v: True if v else IGNORE str_or_default = lambda v: str(v) if v else IGNORE list_or_default = ( lambda v: ([str(i) for i in v] if isinstance(v, list) else [str(v)]) if v else IGNORE ) # NOTE: 'help', 'version', 'folder', 'cron_job' and 'GAE' are not # sourced from file, the same applies to deprecated options opt_map = { # global options "config": str_or_default, "add_options": store_true, "password": str_or_default, "errors_to_console": store_true, "no_banner": store_true, "quiet": store_true, "log_level": str_or_default, # GUI options "no_gui": store_true, "taskbar": store_true, # console options "shell": str_or_default, "bpython": store_true, "plain": store_true, "import_models": store_true, "force_migrate": store_true, "run": str_or_default, "args": list_or_default, # web server options "server_name": str_or_default, "ip": str_or_default, "port": str_or_default, "server_key": str_or_default, "server_cert": str_or_default, "ca_cert": str_or_default, "interface": list_or_default, "pid_filename": str_or_default, "log_filename": str_or_default, "min_threads": str_or_default, "max_threads": str_or_default, "request_queue_size": str_or_default, "timeout": str_or_default, "socket_timeout": str_or_default, "profiler_dir": str_or_default, # scheduler options "with_scheduler": store_true, "scheduler": list_or_default, # cron options "with_cron": store_true, "crontab": list_or_default, "cron_threads": str_or_default, "soft_cron": store_true, "cron_run": store_true, # test options "verbose": store_true, "run_doctests": str_or_default, "run_system_tests": store_true, "with_coverage": store_true, } od = load_config(options.config, opt_map) # print("LOADED FROM %s:" % options.config, od) # convert loaded options dict as retuned by load_config # into a list of arguments for further parsing by parse_args file_args = [] args_args = [] # '--args' must be the last for key, val in od.items(): if key != "args": file_args.append("--" + key) if isinstance(val, list): file_args.extend(val) elif not isinstance(val, bool): file_args.append(val) else: args_args = ["--args"] + val file_args += args_args if options.add_options: # add options to existing ones, # must clear config to avoid infinite recursion options.config = options.add_options = None return parse_args( parser, file_args, deprecated_opts, integer_log_level, options ) return parse_args(parser, file_args, deprecated_opts, integer_log_level) return options