# encoding=utf-8
from argparse import ArgumentParser, Namespace, SUPPRESS
from datetime import datetime, timezone, tzinfo
from getpass import getpass, getuser
import os
import shlex
import sys
import warnings
from dateutil.tz import gettz
from logbook import DEBUG, INFO, WARNING
from logbook import FileHandler, Logger, NullHandler, set_datetime_format, StderrHandler
from logbook.compat import redirected_logging, redirected_warnings
import requests
from hansken import __version__, envvars
from hansken.auth import HanskenAuthBase, resolve
from hansken.remote import Connection, MultiProjectContext, ProjectContext
from hansken.util import MultiContext
try:
# ensure urllib3 is a module-level name for testability
import urllib3
except ImportError:
urllib3 = None
log = Logger(__name__)
def from_envvar(envvar, provided=None):
if provided:
return provided
return os.environ.get(envvar, None)
def prompt_user():
try:
user = getuser()
read = input('username [{}]: '.format(user))
if read.strip():
user = read
except (KeyboardInterrupt, EOFError):
user = None
except Exception: # documentation of getuser: "otherwise, an exception is raised"
user = input('username: ')
return user
def prompt_verify_key():
"""
Prompts for a key twice, verifying the entered values are identical before
returning it. Aborts on keyboard interrupt or EOF.
:return: the key entered by the user
"""
try:
while True:
key = getpass(prompt='Key (base64-encoded): ').strip()
if key == getpass(prompt='Key (again): ').strip():
return key
else:
print("Provided keys don't match, please try again (^C or {eof} to abort)".format(
eof='^Z' if 'win32' in sys.platform else '^D'
))
except (KeyboardInterrupt, EOFError): # user pressed either ^C or ^D (^Z on windows)
raise SystemExit('Aborted')
# define a parent parser to define global options
parent = ArgumentParser(add_help=False)
parent.add_argument('-v', '--verbose', action='count', help='be verbose')
parent.add_argument('-l', '--log', metavar='FILE',
default=from_envvar(envvars.log),
help='log messages to FILE (use - for standard error, '
'log messages are hidden by default)')
parent.add_argument('-z', '--timezone', metavar='TZ', type=gettz, default=timezone.utc,
help='log messages with timestamps in timezone TZ (e.g. Europe/Amsterdam), '
'defaults to UTC')
# default endpoint and project to the value in env
parent.add_argument('-e', '--endpoint', default=from_envvar(envvars.endpoint),
help='connect to the Hansken REST endpoint at ENDPOINT '
'(defaults to environment variable {})'.format(envvars.endpoint))
parent.add_argument('--keystore', metavar='ENDPOINT',
default=from_envvar(envvars.keystore),
help='connect to the Hansken keystore REST endpoint at ENDPOINT '
'(defaults to environment variable {})'.format(envvars.keystore))
parent.add_argument('--preference', metavar='ENDPOINT',
default=from_envvar(envvars.preference),
help='connect to the Hansken preference REST endpoint at ENDPOINT '
'(defaults to environment variable {})'.format(envvars.preference))
parent.add_argument('--idp', metavar='IDP_ID',
default=from_envvar(envvars.idp),
help='select SAML identity provider named IDP_ID if available '
'(defaults to environment variable {})'.format(envvars.idp))
parent.add_argument('--idp-url', metavar='ENDPOINT',
default=from_envvar(envvars.idp_url),
help=SUPPRESS)
parent.add_argument('--idp-realm', metavar='REALM',
default=from_envvar(envvars.idp_realm),
help=SUPPRESS)
parent.add_argument('--sso-url', metavar='ENDPOINT',
default=from_envvar(envvars.sso_url),
help=SUPPRESS)
parent.add_argument('-u', '--user', '--username', dest='username', default=None,
help='specify a username (will otherwise be prompted for if needed)')
parent.add_argument('-U', '--prompt-username', action='store_const', const=prompt_user, dest='username',
help=SUPPRESS)
parent.add_argument('--password', dest='password', default=None,
help=SUPPRESS)
parent.add_argument('--connection-pool-size', type=int, default=None,
help='override default HTTP(S) connection pool size')
# add an argument to disable ssl/tls certificate verification
parent.add_argument('--no-verify', dest='verify', action='store_false', default=True,
help='disable certification verification for secure connections, use with caution')
# NB: add a hidden hdfs configuration option for operators, allowing a single argument file per Hansken instance
# (value for hdfs_url is ignored for most commands, can be overridden by -o/--output for upload command)
parent.add_argument('--hdfs-url', dest='hdfs_url', help=SUPPRESS)
# password and auth are not really arguments, but used in combo with user/idp/sso; make sure they're defined
parent.set_defaults(password=None, auth=None)
# define the actual parser to be used
# (defined as a local class to override result for and repr() in doc as default for arg)
class hansken_parser(ArgumentParser): # noqa: N801
def __repr__(self):
return '<hansken.py default parser>'
hansken_parser = hansken_parser(fromfile_prefix_chars='@')
# define the version switch only on the global parser
hansken_parser.add_argument('--version', action='version', version=__version__)
# parse arguments like a shell when reading args from file
# (the default effectively only splits lines)
hansken_parser.convert_arg_line_to_args = lambda line: shlex.split(line, comments=True)
# enable subcommands, saving the name of the command in argument command
sub_commands = hansken_parser.add_subparsers(dest='command', metavar='COMMAND')
sub_commands.required = True # 'fix' for required=True not being allowed in add_subparsers above
def create_parser(requires_project=False, **kwargs):
warnings.warn('hansken.tool.create_parser has been renamed to hansken.tool.create_argument_parser',
DeprecationWarning)
return create_argument_parser(requires_project=requires_project, **kwargs)
[docs]
def create_argument_parser(requires_project=False, **kwargs):
"""
Creates an `ArgumentParser` to be used with `.run` alongside
``with_context`` or ``with_admin``. This parser's main feature is to *not*
add a ``-h/--help`` argument, which is attached to the main argument
parser used by `.run`.
:param requires_project: whether the parser should be fitted with a
required positional argument ``project``
:param kwargs: any `ArgumentParser` constructor arguments (see `argparse`
documentation)
:return: an `ArgumentParser` that won't clash with ``hansken.py``'s parent
parser
:rtype: `argparse.ArgumentParser`
"""
# don't add help, returned parser will serve as a parent that will itself add help
if kwargs.pop('add_help', False):
raise ValueError('parser cannot specify -h/--help')
parser = ArgumentParser(add_help=False, **kwargs)
if requires_project:
parser.add_argument('project', metavar='PROJECT',
help='attach to project with id PROJECT')
return parser
def add_command(name, command, optional_project=True, **kwargs):
"""
Adds a named command to be run when specified on the command line. Use the
return value to add additional arguments for the newly added command.
:param name: name of the command to add
:param command: callable that runs the actual command, first positional
argument to command is the `argparse.Namespace` object created by
`argparse`.
:param optional_project: whether to include a -p/--project option
:param kwargs: any `ArgumentParser` constructor arguments (see `argparse`
documentation)
:return: an `ArgumentParser` for the newly added command
:rtype: `argparse.ArgumentParser`
"""
# add a new parser to sub commands, pass along the parent (enables command -e http://...) and any kwargs
sub_parser = sub_commands.add_parser(name, parents=[parent], **kwargs)
# set the callable as the actual command to be run
sub_parser.set_defaults(run_command=command)
if optional_project:
# only add the optional -p/--project if sub command needs it
sub_parser.add_argument('-p', '--project', default=from_envvar(envvars.project),
help='attach to project with id PROJECT '
'(defaults to environment variable {})'.format(envvars.project))
return sub_parser
def set_command(command, parents=None, **kwargs):
"""
Overwrite the default functionality of the ``hansken.py`` command line
interface. Note that this method should be called only once, either from
user code, passing the resulting `ArgumentParser` to `.run` or by `.run`
itself when used with one of the *with_\\** keyword arguments.
:param command: a `callable` to be called with a single positional
argument of type `argparse.Namespace` after parsing arguments
:param parents: a sequence of parent parsers to use aside from the always
inserted parent parser defining the global configuration options
:param kwargs: any `ArgumentParser` constructor arguments (see `argparse`
documentation)
:return: an `ArgumentParser` for the new command
:rtype: `argparse.ArgumentParser`
"""
if hasattr(set_command, 'called') and set_command.called:
raise ValueError('cannot set main command more than once')
if not callable(command):
raise ValueError('command must be callable')
# 'copy' behaviour of global parser to the new one
fromfile_chars = kwargs.pop('fromfile_prefix_chars', '@')
fromfile_splitter = kwargs.pop('convert_arg_line_to_args', hansken_parser.convert_arg_line_to_args)
# add the global parent to any parents provided by the caller
parents = list(parents or [])
parents.append(parent)
# define a new default nameless command (the empty name will make --help appear as if there's no sub command)
sub_parser = sub_commands.add_parser('', parents=parents, fromfile_prefix_chars=fromfile_chars, **kwargs)
sub_parser.convert_arg_line_to_args = fromfile_splitter
sub_parser.set_defaults(command='', run_command=command)
# mark this function as called, subsequent calls produce confusing behaviour and should fail
setattr(set_command, 'called', True)
# provide the sub parser to the caller, let them add additional arguments
return sub_parser
def resolve_logging(args):
"""
Creates `logbook.Handler` instances from available parameters.
:param args: an `argparse.Namespace` object with parsed commandline
arguments
:return: a context manager within which logging is configured from the
provided command line arguments
"""
levels = [WARNING, INFO, DEBUG]
verbosity = args.verbose
if verbosity not in levels:
# make sure verbosity is an int (default None)
verbosity = max(0, min(verbosity or 0, len(levels) - 1))
verbosity = levels[verbosity]
if args.log:
tz = args.timezone
if not isinstance(tz, tzinfo):
# should timezone be provided through run, argument won't have been passed to gettz
tz = gettz(tz)
def utc_dt():
return datetime.now(tz=tz)
# force logbook to use tz-aware timestamps, log in requested timezone (default UTC)
set_datetime_format(utc_dt)
# when logging is actively used, also steal both logging.* and warnings.warn calls
# bind the logbook handlers application-wide
if args.log == '-':
return MultiContext(redirected_logging(),
redirected_warnings(),
StderrHandler(level=verbosity).applicationbound())
elif args.log:
return MultiContext(redirected_logging(),
redirected_warnings(),
FileHandler(args.log, level=verbosity).applicationbound())
else:
# no dest, no real handler
return NullHandler().applicationbound()
def resolve_auth(args):
"""
Creates a HanskenAuthBase instance from available parameters.
- endpoint + idp -> select named IDP from REST API, probe for
implementation to use
- idp_url + sso_url -> OpenAM auth with Kerberos SSO
- idp_url + user + pass -> OpenAM auth using REST login
- user + pass -> SAML proxy auth using REST login
- otherwise -> None
:param args: an argparse.Namespace object with parsed commandline arguments
:return: a HanskenAuthBase instance if args contained parameters that
enable one of the options, None otherwise
"""
return resolve(args.username, args.password,
base_url=args.endpoint, idp_id=args.idp,
idp_url=args.idp_url, idp_realm=args.idp_realm, sso_url=args.sso_url,
verify=args.verify)
def _disable_insecure_request_warning():
try:
# attempt to disable repeated verbose warnings when certificate verification is explicitly turned off
# (remove when hansken.py depends on requests>=2.16.0)
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
return True
except Exception: # nosec
pass
try:
# newer installations will fail the above on the 'packages' module being absent, fall back to the 'new way'
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
return True
except Exception: # nosec
pass
# disabling the warning failed
return False
[docs]
def run(args=None, using_parser=hansken_parser, with_connection=None, with_context=None,
with_multi_context=None, error='raise', with_admin=None, **defaults):
"""
Run the command line tool, either as the command line interface defined by
``hansken.py`` itself, or running user code, piggybacking on all the
command line argument parsing and resolving done by ``hansken.py``.
.. note::
`.run` parses the command line by default. If user code does its own
command line parsing, either pass applicable *args* to `.run` or use
`.set_command`, passing the resulting `argparse.ArgumentParser` to
`.run` as *using_parser* after adding additional user arguments.
.. note::
By default, `.run` calls `exit` on the `argparse.ArgumentParser` in
case of an error raised from whatever command is called (this includes
user-provided commands). This call results in a CLI-like error message
containing the exception message of the error that was caught. As the
call to `exit` raises a `SystemExit`, any user code *after* the call
to `.run` will not be executed. When the *error* argument to `.run` is
set to ``'raise'`` or the arguments parser sets verbose mode, `exit`
is *not* called, the exception is reraised instead.
:param args: arguments to the tool, a `list` of `str`, or `None`, in which
case *args* will be read from the command line
:param using_parser: the `argparse.ArgumentParser` to be used, either the
parser that processes the command line interface defined by
``hansken.py`` itself or one that is the result of `.set_command`
:param with_connection: a user-defined `callable` to be run with a
`.Connection` instance, passed as keyword argument ``connection``
:param with_context: a user-defined `callable` to be run with a
`.ProjectContext` instance, passed as keyword argument ``context``
:param with_multi_context: a user-defined `callable` to be run with a
`.MultiProjectContext` instance, passed as keyword argument ``context``
:param error: how to deal with an error raised from the selected command
runner, defaults to propagating errors to the caller of `.run`, set to
``'exit'`` to call ``using_parser.exit()`` with a non-zero return code
and failure message
:param defaults: named arguments to be used as defaults when parsing the
command line, matching the destinations of defined arguments (e.g.
``project='a-project-uuid'``)
:return: the return value of the command that was ultimately called (when
`.run` was called with either *with_context* or *with_admin*, the
return value of that callable will be returned by `.run`)
"""
def has_project(parser):
# try to determine whether parser has already defined either default or argument for a project
if parser.get_default('project'):
return True
return any(action.dest == 'project' for action in parser._actions)
# run with either connection or context, not both
if (with_context, with_connection, with_admin).count(None) < 2:
raise ValueError('use either with_context=<my_func> or with_connection=<my_func>, not both')
# check whether the parser being used is a custom one
custom_parser = using_parser is not hansken_parser
# user requests to be called with a context, overwrite our main command (…)
if with_context:
def run(args):
# (…) with something that creates a ProjectContext from the env/command line (…)
context = ProjectContext(args.endpoint, args.project, args.keystore, args.preference,
auth=args.auth, connection_pool_size=args.connection_pool_size,
verify=args.verify)
kwargs = {'context': context}
if custom_parser:
# (optionally add the parsed args to the passed arguments)
kwargs.update(args=args)
# (…) and calls the user's function with that context
return with_context(**kwargs)
# set the parser to be used to the one resulting in the user's function being called
# add the custom parser as one of the parents of the resulting one to incorporate parsed user args
using_parser = set_command(run, parents=[using_parser] if custom_parser else None)
if not defaults.get('project') and not has_project(using_parser):
# user did not supply an explicit project id to run(), we'll require it form the command line
using_parser.add_argument('project', metavar='PROJECT',
help='attach to project with id PROJECT')
if with_multi_context:
def run(args):
# (…) with something that creates a MultiProjectContext from the env/command line (…)
context = MultiProjectContext(args.endpoint, args.project, args.keystore, args.preference,
auth=args.auth, connection_pool_size=args.connection_pool_size,
verify=args.verify)
kwargs = {'context': context}
if custom_parser:
# (optionally add the parsed args to the passed arguments)
kwargs.update(args=args)
# (…) and calls the user's function with that context
return with_multi_context(**kwargs)
# set the parser to be used to the one resulting in the user's function being called
# add the custom parser as one of the parents of the resulting one to incorporate parsed user args
using_parser = set_command(run, parents=[using_parser] if custom_parser else None)
if not defaults.get('project') and not has_project(using_parser):
# user did not supply an explicit project id to run(), we'll require it form the command line
using_parser.add_argument('project', metavar='PROJECT', nargs='+',
help='attach to project with id PROJECT')
connection_kwarg = 'connection'
if with_admin:
warnings.warn('hansken.tool.run(with_admin=…) has been deprecated, use hansken.tool.run(with_connection=…) '
'instead', DeprecationWarning)
connection_kwarg = 'admin'
connection_callable = with_connection or with_admin
# user requests to be called with connection, overwrite our main command (…)
if connection_callable:
def run(args):
# (…) with something that creates a Connection from the env/command line (…)
connection = Connection(args.endpoint, args.keystore, args.preference,
auth=args.auth, connection_pool_size=args.connection_pool_size,
verify=args.verify)
kwargs = {connection_kwarg: connection}
if custom_parser:
# (optionally add the parsed args to the passed arguments)
kwargs.update(args=args)
# (…) and calls the user's function with that connection
return connection_callable(**kwargs)
# set the parser to be used to the one resulting in the user's function being called
# add the custom parser as one of the parents of the resulting one to incorporate parsed user args
using_parser = set_command(run, parents=[using_parser] if custom_parser else None)
# parse args (defaults to sys.argv[1:]) using hansken_parser or something resulting in a user function being
# called, exiting here if not satisfiable
# pre-create the Namespace where arguments are stored, initialized to user defaults
args = using_parser.parse_args(args, Namespace(**defaults))
# create a log handler from parsed arguments, use it while running command
with resolve_logging(args):
# when certificate validation is disabled, attempt to disable the insecure warning (…)
if not args.verify and not _disable_insecure_request_warning():
# (…) but log when that fails
log.warn('failed to disable InsecureRequestWarning in 3rd party package')
if not isinstance(args.auth, HanskenAuthBase):
# allow auth to be 'turned off' if explicitly set to False
args.auth = None if args.auth is False else resolve_auth(args)
try:
log.info('initial argument handling complete, hansken.py version {}', __version__)
# run action with the parsed arguments
return args.run_command(args)
except Exception as e:
if error == 'exit' and not args.verbose:
# calls to run with_* will likely have an empty command string, fall back to the parser's prog
command = args.command or using_parser.prog
command = command.strip()
using_parser.exit(1, '\n{} failed: {}\n'.format(command, str(e)))
else:
# error == 'raise' or verbose mode, propagate error
raise
# force adding commands from command sub-modules
# (note that a from ... import ... can't be used here as that would cause a circular import)
import hansken.tool.command_backup # noqa: E402,F401,I100,I202
import hansken.tool.command_export # noqa: E402,F401
import hansken.tool.command_extract # noqa: E402,F401
import hansken.tool.command_grant # noqa: E402,F401
import hansken.tool.command_import # noqa: E402,F401
import hansken.tool.command_mount # noqa: E402,F401
import hansken.tool.command_quickstart # noqa: E402,F401
import hansken.tool.command_shell # noqa: E402,F401
import hansken.tool.command_stats # noqa: E402,F401
import hansken.tool.command_tasks # noqa: E402,F401
import hansken.tool.command_tools # noqa: E402,F401
import hansken.tool.command_upload # noqa: E402,F401
import hansken.tool.command_versions # noqa: E402,F401