# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: May 14, 2020
# URL: https://executor.readthedocs.io
r"""
Dependency injection for command execution contexts.
The :mod:`~executor.contexts` module defines the :class:`LocalContext`,
:class:`RemoteContext` and :class:`SecureChangeRootContext` classes. All of
these classes support the same API for executing external commands, they are
simple wrappers for :class:`.ExternalCommand`, :class:`.RemoteCommand` and
:class:`.SecureChangeRootCommand`.
This allows you to script interaction with external commands in Python and
perform that interaction on your local system, on remote systems over SSH_ or
inside chroots_ using the exact same Python code. `Dependency injection`_ on
steroids anyone? :-)
Here's a simple example:
.. code-block:: python
from executor.contexts import LocalContext, RemoteContext
from humanfriendly import format_timespan
def details_about_system(context):
return "\n".join([
"Information about %s:" % context,
" - Host name: %s" % context.capture('hostname', '--fqdn'),
" - Uptime: %s" % format_timespan(float(context.capture('cat', '/proc/uptime').split()[0])),
])
print(details_about_system(LocalContext()))
# Information about local system (peter-macbook):
# - Host name: peter-macbook
# - Uptime: 1 week, 3 days and 10 hours
print(details_about_system(RemoteContext('file-server')))
# Information about remote system (file-server):
# - Host name: file-server
# - Uptime: 18 weeks, 3 days and 4 hours
Whether this functionality looks exciting or horrible I'll leave up to your
judgment. I created it because I'm always building "tools that help me build
tools" and this functionality enables me to *very rapidly* prototype system
integration tools developed using Python:
**During development:**
I *write* code on my workstation which I prefer because of the "rich editing
environment" but I *run* the code against a remote system over SSH (a backup
server, database server, hypervisor, mail server, etc.).
**In production:**
I change one line of code to inject a :class:`LocalContext` object instead of
a :class:`RemoteContext` object, I install the `executor` package and the code
I wrote on the remote system and I'm done!
.. _SSH: https://en.wikipedia.org/wiki/Secure_Shell
.. _chroots: http://en.wikipedia.org/wiki/Chroot
.. _Dependency injection: http://en.wikipedia.org/wiki/Dependency_injection
"""
# Standard library modules.
import contextlib
import glob
import logging
import multiprocessing
import os
import random
import shlex
import socket
# External dependencies.
from humanfriendly.text import dedent, pluralize, split
from property_manager import (
PropertyManager,
lazy_property,
mutable_property,
required_property,
writable_property,
)
from six import PY2
# Modules included in our package.
from executor import DEFAULT_SHELL, ExternalCommand, quote
from executor.chroot import ChangeRootCommand
from executor.schroot import DEFAULT_NAMESPACE, SCHROOT_PROGRAM_NAME, SecureChangeRootCommand
from executor.ssh.client import RemoteAccount, RemoteCommand
MIRROR_TO_DISTRIB_MAPPING = {
u'http://deb.debian.org/debian': u'debian',
u'http://archive.ubuntu.com/ubuntu': u'ubuntu',
}
"""
Mapping of canonical package mirror URLs to "distributor ID" strings.
Each key in this dictionary is the canonical package mirror URL of a Debian
based Linux distribution and each value is the corresponding distributor ID.
The following canonical mirror URLs are currently supported:
================================= ==========
Mirror URL Value
================================= ==========
http://deb.debian.org/debian ``debian``
http://archive.ubuntu.com/ubuntu/ ``ubuntu``
================================= ==========
For more details refer to the :attr:`AbstractContext.apt_sources_info` property.
"""
# Initialize a logger.
logger = logging.getLogger(__name__)
[docs]def create_context(**options):
"""
Create an execution context.
:param options: Any keyword arguments are passed on to the context's initializer.
:returns: A :class:`LocalContext`, :class:`SecureChangeRootContext` or
:class:`RemoteContext` object.
This function provides an easy to use shortcut for constructing context
objects:
- If the keyword argument ``chroot_name`` is given (and not :data:`None`)
then a :class:`SecureChangeRootContext` object will be created.
- If the keyword argument ``ssh_alias`` is given (and not :data:`None`)
then a :class:`RemoteContext` object will be created.
- Otherwise a :class:`LocalContext` object is created.
"""
# Remove the `chroot_name' and `ssh_alias' keyword arguments from the
# options dictionary to make sure these keyword arguments are only ever
# passed to a constructor that supports them.
chroot_name = options.pop('chroot_name', None)
ssh_alias = options.pop('ssh_alias', None)
if chroot_name is not None:
return SecureChangeRootContext(chroot_name, **options)
elif ssh_alias is not None:
return RemoteContext(ssh_alias, **options)
else:
return LocalContext(**options)
[docs]class AbstractContext(PropertyManager):
"""Abstract base class for shared logic of all context classes."""
[docs] def __init__(self, *args, **options):
"""
Initialize an :class:`AbstractContext` object.
:param args: Any positional arguments are passed on to the initializer
of the :class:`~property_manager.PropertyManager` class
(for future extensibility).
:param options: The keyword arguments are handled as follows:
- Keyword arguments whose name matches a property of
the context object are used to set that property
(by passing them to the initializer of the
:class:`~property_manager.PropertyManager` class).
- Any other keyword arguments are collected into the
:attr:`options` dictionary.
"""
# Separate the command and context options.
context_opts = {}
command_opts = options.pop('options', {})
for name, value in options.items():
if self.have_property(name):
context_opts[name] = value
else:
command_opts[name] = value
# Embed the command options in the context options.
context_opts['options'] = command_opts
# Initialize the superclass.
super(AbstractContext, self).__init__(*args, **context_opts)
# Initialize instance variables.
self.undo_stack = []
[docs] @lazy_property
def apt_sources_info(self):
"""
A tuple with two strings (the distributor ID and distribution codename).
The values of the :attr:`distributor_id` and :attr:`distribution_codename`
properties are determined by one of the following three methods (in
decreasing order of preference):
1. If :attr:`lsb_release_variables` is available it's used.
2. If the :man:`lsb_release` program is available it's used.
3. Finally ``/etc/apt/sources.list`` is parsed for hints.
The :attr:`apt_sources_info` property concerns the third step which
works as follows:
- The ``deb`` directives in ``/etc/apt/sources.list`` are parsed to
determine the primary package mirror URL (it's fine if this file
doesn't exist, no error will be reported).
- The :data:`MIRROR_TO_DISTRIB_MAPPING` dictionary is used to look up
the distributor ID corresponding to the package mirror URL that was
found in ``/etc/apt/sources.list``.
- If the mirror URL is successfully translated to a distributor ID, the
third token in the ``deb`` directive is taken to be the distribution
codename.
The :attr:`apt_sources_info` property was added in response to `issue
#17`_ where it was reported that official Debian Docker images don't
contain the ``/etc/lsb-release`` file nor the :man:`lsb_release`
program.
This is only used as a last resort because of its specificness to
Debian based Linux distributions and because I have concerns about how
robust this new functionality will turn out to be.
.. _issue #17: https://github.com/xolox/python-executor/issues/17
"""
distributor_id = u''
distribution_codename = u''
listing = self.read_file('/etc/apt/sources.list', check=False, silent=True)
for line in listing.decode('UTF-8').splitlines():
tokens = line.split()
# We check for at least four whitespace separated tokens even
# though we only use the first three because a well formed 'deb'
# directive is supposed to contain at least four tokens.
if len(tokens) >= 4 and tokens[0] == u'deb':
logger.debug("Parsing /etc/apt/sources 'deb' directive: %s", tokens)
mirror_url = normalize_mirror_url(tokens[1])
if mirror_url in MIRROR_TO_DISTRIB_MAPPING:
distributor_id = MIRROR_TO_DISTRIB_MAPPING[mirror_url]
distribution_codename = tokens[2]
logger.debug(
"Determined distributor ID (%s) and codename (%s) from /etc/apt/sources.list.",
distributor_id,
distribution_codename,
)
break
else:
logger.debug("Unrecognized mirror URL.")
return distributor_id, distribution_codename
[docs] @required_property
def command_type(self):
"""The type of command objects created by this context (:class:`.ExternalCommand` or a subclass)."""
@property
def cpu_count(self):
"""
The number of CPUs in the system (an integer).
.. note:: This is an abstract property that must be implemented by subclasses.
"""
raise NotImplementedError()
[docs] @lazy_property
def distribution_codename(self):
"""
The code name of the system's distribution (a lowercased string like ``precise`` or ``trusty``).
How this property is computed depends on the execution context:
1. When the file ``/etc/lsb-release`` exists and defines the variable
``DISTRIB_CODENAME`` then this is the preferred source (for details
see :attr:`lsb_release_variables`).
2. When :man:`lsb_release` is installed the output of the command
``lsb_release --short --codename`` is used.
3. Finally :attr:`apt_sources_info` is used if possible.
The returned string is guaranteed to be lowercased, in order to enable
reliable string comparison.
"""
logger.debug("Trying to discover distribution codename using /etc/lsb-release ..")
value = self.lsb_release_variables.get('DISTRIB_CODENAME')
if not value:
logger.debug("Falling back to 'lsb_release --short --codename' ..")
value = self.capture('lsb_release', '--short', '--codename', check=False, silent=True)
if not value:
logger.debug("Falling back to parsing /etc/apt/sources.list ..")
value = self.apt_sources_info[1]
return value.lower()
[docs] @lazy_property
def distributor_id(self):
"""
The distributor ID of the system (a lowercased string like ``debian`` or ``ubuntu``).
How this property is computed depends on the execution context:
1. When the file ``/etc/lsb-release`` exists and defines the variable
``DISTRIB_ID`` then this is the preferred source (for details see
:attr:`lsb_release_variables`).
2. When :man:`lsb_release` is installed the output of the command
``lsb_release --short --id`` is used.
3. Finally :attr:`apt_sources_info` is used if possible.
The returned string is guaranteed to be lowercased, in order to enable
reliable string comparison.
"""
logger.debug("Trying to discover distributor ID using /etc/lsb-release ..")
value = self.lsb_release_variables.get('DISTRIB_ID')
if not value:
logger.debug("Falling back to 'lsb_release --short --id' ..")
value = self.capture('lsb_release', '--short', '--id', check=False, silent=True)
if not value:
logger.debug("Falling back to parsing /etc/apt/sources.list ..")
value = self.apt_sources_info[0]
return value.lower()
[docs] @lazy_property
def have_ionice(self):
""":data:`True` when ionice_ is installed, :data:`False` otherwise."""
return bool(self.find_program('ionice'))
@property
def have_superuser_privileges(self):
""":data:`True` if the context has superuser privileges, :data:`False` otherwise."""
prototype = self.prepare('true')
return prototype.have_superuser_privileges or prototype.sudo
[docs] @lazy_property
def lsb_release_variables(self):
"""
The contents of ``/etc/lsb-release`` as a dictionary.
The values of :attr:`distributor_id` and :attr:`distribution_codename`
are based on the information provided by :attr:`lsb_release_variables`.
If ``/etc/lsb-release`` doesn't exist or can't be parsed a debug
message is logged and an empty dictionary is returned. Here's an
example:
>>> from executor.contexts import LocalContext
>>> context = LocalContext()
>>> context.lsb_release_variables
{'DISTRIB_CODENAME': 'bionic',
'DISTRIB_DESCRIPTION': 'Ubuntu 18.04.1 LTS',
'DISTRIB_ID': 'Ubuntu',
'DISTRIB_RELEASE': '18.04'}
The :attr:`lsb_release_variables` property was added in response to
`issue #10`_ where it was reported that the :man:`lsb_release` program
wasn't available in vanilla Ubuntu 18.04 Docker images.
.. _issue #10: https://github.com/xolox/python-executor/issues/10
"""
variables = dict()
# We proceed under the assumption that the file exists, but avoid
# raising an exception when it doesn't and we don't leak error messages
# to the standard error stream. We could have used is_file() and
# is_readable() to "ask for permission instead of forgiveness" (so to
# speak) but that requires the execution of three external commands
# instead of one to accomplish the exact same thing :-P.
logger.debug("Trying to read /etc/lsb-release ..")
contents = self.capture('cat', '/etc/lsb-release', check=False, silent=True)
logger.debug("Parsing /etc/lsb-release contents: %r", contents)
for lnum, line in enumerate(contents.splitlines()):
name, delimiter, value = line.partition(u'=')
# The following encode/decode trick works around shlex.split() not
# properly supporting Unicode strings on Python 2.7, for details
# refer to https://stackoverflow.com/a/14219159/788200.
if PY2:
tokens = shlex.split(value.encode('UTF-8'))
parsed_value = [t.decode('UTF-8') for t in tokens]
else:
parsed_value = shlex.split(value)
# The null byte check below guards against a weird edge case
# that has so far only manifested in the Python 2.6 environment
# of Travis CI: The parsing of /etc/lsb-release results in the
# expected variable names but values containing binary
# data including nul bytes, for details refer to
# https://github.com/xolox/python-executor/issues/15.
if len(parsed_value) == 1 and u'\0' not in parsed_value[0]:
variables[name.strip()] = parsed_value[0]
else:
logger.debug("Failed to parse line %i: %r", lnum + 1, line)
if variables:
logger.debug("Extracted %s from /etc/lsb-release: %r", pluralize(len(variables), "variable"), variables)
else:
logger.debug("Failed to read /etc/lsb-release ..")
return variables
[docs] @writable_property
def options(self):
"""The options that are passed to commands created by the context (a dictionary)."""
[docs] @mutable_property
def parent(self):
"""
The parent context (a context object or :data:`None`).
The :attr:`parent` property (and the code in :func:`prepare_command()`
that uses the :attr:`parent` property) enables the use of "nested
contexts".
For example :func:`find_chroots()` creates :class:`SecureChangeRootContext`
objects whose :attr:`parent` is set to the context that found the
chroots. Because of this the :class:`SecureChangeRootContext` objects can be
used to create commands without knowing or caring whether the chroots
reside on the local system or on a remote system accessed via SSH.
.. warning:: Support for parent contexts was introduced in `executor`
version 15 and for now this feature is considered
experimental and subject to change. While I'm definitely
convinced of the usefulness of nested contexts I'm not
happy with the current implementation at all. The most
important reason for this is that it's *very surprising*
(and not in a good way) that a context with a
:attr:`parent` will create commands with the parent's
:attr:`command_type` instead of the expected type.
"""
[docs] def __enter__(self):
"""Initialize a new "undo stack" (refer to :func:`cleanup()`)."""
self.undo_stack.append([])
return self
[docs] def __exit__(self, exc_type=None, exc_value=None, traceback=None):
"""Execute any commands on the "undo stack" (refer to :func:`cleanup()`)."""
old_scope = self.undo_stack.pop()
while old_scope:
args, kw = old_scope.pop()
if args and callable(args[0]):
args = list(args)
function = args.pop(0)
function(*args, **kw)
else:
self.execute(*args, **kw)
[docs] @contextlib.contextmanager
def atomic_write(self, filename):
"""
Create or update the contents of a file atomically.
:param filename: The pathname of the file to create/update (a string).
:returns: A context manager (see the :keyword:`with` keyword) that
returns a single string which is the pathname of the
temporary file where the contents should be written to
initially.
If an exception is raised from the :keyword:`with` block and the
temporary file exists, an attempt will be made to remove it but failure
to do so will be silenced instead of propagated (to avoid obscuring the
original exception).
The temporary file is created in the same directory as the real file,
but a dot is prefixed to the name (making it a hidden file) and the
suffix '.tmp-' followed by a random integer number is used.
"""
directory, entry = os.path.split(filename)
temporary_file = os.path.join(directory, '.%s.tmp-%i' % (entry, random.randint(1, 100000)))
try:
yield temporary_file
except Exception:
self.execute('rm', '-f', temporary_file, check=False)
else:
self.execute('mv', temporary_file, filename)
[docs] def capture(self, *command, **options):
"""
Execute an external command in the current context and capture its output.
:param command: All positional arguments are passed on to the
initializer of the :attr:`command_type` class.
:param options: All keyword arguments are passed on to the
initializer of the :attr:`command_type` class.
:returns: The value of :attr:`.ExternalCommand.output`.
"""
options['capture'] = True
cmd = self.prepare_command(command, options)
cmd.start()
return cmd.output
[docs] def cleanup(self, *args, **kw):
"""
Register an action to be performed before the context ends.
:param args: The external command to execute or callable to invoke.
:param kw: Options to the command or keyword arguments to the callable.
:raises: :exc:`~exceptions.ValueError` when :func:`cleanup()` is called
outside a :keyword:`with` statement.
This method registers *the intent* to perform an action just before the
context ends. To actually perform the action(s) you need to use (the
subclass of) the :class:`AbstractContext` object as a context manager
using the :keyword:`with` statement.
The last action that is registered is the first one to be
performed. This gives the equivalent functionality of a
deeply nested :keyword:`try` / :keyword:`finally` structure
without actually having to write such ugly code :-).
The handling of arguments in :func:`cleanup()` depends on the type of
the first positional argument:
- If the first positional argument is a string, the positional
arguments and keyword arguments are passed on to the initializer
of the :attr:`command_type` class to execute an external command
just before the context ends.
- If the first positional argument is a callable, it is called with any
remaining positional arguments and keyword arguments before the
context ends.
.. warning:: If a cleanup command fails and raises an exception no
further cleanup commands are executed. If you don't care
if a specific cleanup command reports an error, set its
:attr:`~.ExternalCommand.check` property to
:data:`False`.
"""
if not self.undo_stack:
raise ValueError("Cleanup stack can only be used inside with statements!")
self.undo_stack[-1].append((args, kw))
[docs] def execute(self, *command, **options):
"""
Execute an external command in the current context.
:param command: All positional arguments are passed on to the
initializer of the :attr:`command_type` class.
:param options: All keyword arguments are passed on to the
initializer of the :attr:`command_type` class.
:returns: The :attr:`command_type` object.
.. note:: After constructing a :attr:`command_type` object this method
calls :func:`~executor.ExternalCommand.start()` on the
command before returning it to the caller, so by the time the
caller gets the command object a synchronous command will
have already ended. Asynchronous commands don't have this
limitation of course.
"""
cmd = self.prepare_command(command, options)
cmd.start()
return cmd
[docs] def exists(self, pathname):
"""
Check whether the given pathname exists.
:param pathname: The pathname to check (a string).
:returns: :data:`True` if the pathname exists,
:data:`False` otherwise.
This is a shortcut for the ``test -e ...`` command.
"""
return self.test('test', '-e', pathname)
[docs] def find_chroots(self, namespace=DEFAULT_NAMESPACE):
"""
Find the chroots available in the current context.
:param namespace: The chroot namespace to look for (a string, defaults
to :data:`~executor.schroot.DEFAULT_NAMESPACE`).
Refer to the schroot_ documentation for more
information about chroot namespaces.
:returns: A generator of :class:`SecureChangeRootContext` objects whose
:attr:`~AbstractContext.parent` is set to the context where
the chroots were found.
:raises: :exc:`~executor.ExternalCommandFailed` (or a subclass) when
the ``schroot`` program isn't installed or the ``schroot
--list`` command fails.
"""
for entry in self.capture(SCHROOT_PROGRAM_NAME, '--list').splitlines():
entry_ns, _, entry_name = entry.rpartition(':')
if not entry_ns:
entry_ns = DEFAULT_NAMESPACE
if entry_ns == namespace:
short_name = entry_name if entry_ns == DEFAULT_NAMESPACE else entry
yield SecureChangeRootContext(chroot_name=short_name, parent=self)
[docs] def find_program(self, program_name, *args):
"""
Find the absolute pathname(s) of one or more programs.
:param program_name: Each of the positional arguments is expected to
be a string containing the name of a program to
search for in the ``$PATH``. At least one is
required.
:returns: A list of strings with absolute pathnames.
This method is a simple wrapper around ``which``.
"""
return self.capture('which', program_name, *args, check=False).splitlines()
[docs] def get_options(self):
"""
Get the options that are passed to commands created by the context.
:returns: A dictionary of command options.
By default this method simply returns the :attr:`options` dictionary,
however the purpose of :func:`get_options()` is to enable subclasses to
customize the options passed to commands on the fly.
"""
return self.options
[docs] def glob(self, pattern):
"""
Find matches for a given filename pattern.
:param pattern: A filename pattern (a string).
:returns: A list of strings with matches.
Some implementation notes:
- This method *emulates* filename globbing as supported by system
shells like Bash and ZSH. It works by forking a Python interpreter
and using that to call the :func:`glob.glob()` function. This
approach is of course rather heavyweight.
- Initially this method used Bash for filename matching (similar to
`this StackOverflow answer <https://unix.stackexchange.com/a/34012/44309>`_)
but I found it impossible to make this work well for patterns
containing whitespace.
- I took the whitespace issue as a sign that I was heading down the
wrong path (trying to add robustness to a fragile solution) and so
the new implementation was born (which prioritizes robustness over
performance).
"""
listing = self.capture(
'python',
input=dedent(
r'''
import glob
matches = glob.glob({pattern})
print('\x00'.join(matches))
''',
pattern=repr(pattern),
),
)
return split(listing, '\x00')
[docs] def is_directory(self, pathname):
"""
Check whether the given pathname points to an existing directory.
:param pathname: The pathname to check (a string).
:returns: :data:`True` if the pathname points to an existing directory,
:data:`False` otherwise.
This is a shortcut for the ``test -d ...`` command.
"""
return self.test('test', '-d', pathname)
[docs] def is_executable(self, pathname):
"""
Check whether the given pathname points to an executable file.
:param pathname: The pathname to check (a string).
:returns: :data:`True` if the pathname points to an executable file,
:data:`False` otherwise.
This is a shortcut for the ``test -x ...`` command.
"""
return self.test('test', '-x', pathname)
[docs] def is_file(self, pathname):
"""
Check whether the given pathname points to an existing file.
:param pathname: The pathname to check (a string).
:returns: :data:`True` if the pathname points to an existing file,
:data:`False` otherwise.
This is a shortcut for the ``test -f ...`` command.
"""
return self.test('test', '-f', pathname)
[docs] def is_readable(self, pathname):
"""
Check whether the given pathname exists and is readable.
:param pathname: The pathname to check (a string).
:returns: :data:`True` if the pathname exists and is readable,
:data:`False` otherwise.
This is a shortcut for the ``test -r ...`` command.
"""
return self.test('test', '-r', pathname)
[docs] def is_writable(self, pathname):
"""
Check whether the given pathname exists and is writable.
:param pathname: The pathname to check (a string).
:returns: :data:`True` if the pathname exists and is writable,
:data:`False` otherwise.
This is a shortcut for the ``test -w ...`` command.
"""
return self.test('test', '-w', pathname)
[docs] def list_entries(self, directory):
"""
List the entries in a directory.
:param directory: The pathname of the directory (a string).
:returns: A list of strings with the names of the directory entries.
This method uses ``find -mindepth 1 -maxdepth 1 -print0`` to list
directory entries instead of going for the more obvious choice ``ls
-A1`` because ``find`` enables more reliable parsing of command output
(with regards to whitespace).
"""
listing = self.capture('find', directory, '-mindepth', '1', '-maxdepth', '1', '-print0')
return [os.path.basename(fn) for fn in listing.split('\0') if fn]
[docs] def merge_options(self, overrides):
"""
Merge default options and overrides into a single dictionary.
:param overrides: A dictionary with any keyword arguments given to
:func:`execute()` or :func:`start_interactive_shell()`.
:returns: The dictionary with overrides, but any keyword arguments
given to the initializer of :class:`AbstractContext` that are
not set in the overrides are set to the value of the
initializer argument.
The :attr:`~executor.ExternalCommand.ionice` option is automatically
unset when :attr:`have_ionice` is :data:`False`, regardless of whether
the option was set from defaults or overrides.
"""
defaults = self.get_options()
for name, value in defaults.items():
overrides.setdefault(name, value)
if overrides.get('ionice') and not self.have_ionice:
logger.debug("Ignoring `ionice' option because required program isn't installed.")
overrides.pop('ionice')
return overrides
[docs] def prepare(self, *command, **options):
"""
Prepare to execute an external command in the current context.
:param command: All positional arguments are passed on to the
initializer of the :attr:`command_type` class.
:param options: All keyword arguments are passed on to the
initializer of the :attr:`command_type` class.
:returns: The :attr:`command_type` object.
.. note:: After constructing a :attr:`command_type` object this method
doesn't call :func:`~executor.ExternalCommand.start()` which
means you control if and when the command is started. This
can be useful to prepare a large batch of commands and
execute them concurrently using a :class:`.CommandPool`.
"""
return self.prepare_command(command, options)
[docs] def prepare_command(self, command, options):
"""
Create a :attr:`command_type` object based on :attr:`options`.
:param command: A tuple of strings (the positional arguments to the
initializer of the :attr:`command_type` class).
:param options: A dictionary (the keyword arguments to the initializer
of the :attr:`command_type` class).
:returns: A :attr:`command_type` object *that hasn't been started yet*.
"""
# Prepare our command.
options = self.merge_options(options)
cmd = self.command_type(*command, **options)
# Prepare the command of the parent context?
if self.parent:
# Figure out if any of our command options are unknown to the
# parent context because we need to avoid passing any of these
# options to the parent's prepare_command() method.
nested_opts = set(dir(self.command_type))
parent_opts = set(dir(self.parent.command_type))
for name in nested_opts - parent_opts:
if options.pop(name, None) is not None:
logger.debug("Swallowing %r option! (parent context won't understand)", name)
# Prepare the command of the parent context.
cmd = self.parent.prepare_command(cmd.command_line, options)
return cmd
[docs] def prepare_interactive_shell(self, options):
"""
Create a :attr:`command_type` object that starts an interactive shell.
:param options: A dictionary (the keyword arguments to the initializer
of the :attr:`command_type` class).
:returns: A :attr:`command_type` object *that hasn't been started yet*.
"""
options = self.merge_options(options)
options.update(shell=False, tty=True)
return self.prepare(DEFAULT_SHELL, **options)
[docs] def read_file(self, filename, **options):
"""
Read the contents of a file.
:param filename: The pathname of the file to read (a string).
:param options: Optional keyword arguments to :func:`execute()`.
:returns: The contents of the file (a byte string).
This method uses cat_ to read the contents of files so that options
like :attr:`~.ExternalCommand.sudo` are respected (regardless of
whether we're dealing with a :class:`LocalContext` or
:class:`RemoteContext`).
.. _cat: http://linux.die.net/man/1/cat
"""
options.update(capture=True)
return self.execute('cat', filename, **options).stdout
[docs] def start_interactive_shell(self, **options):
"""
Start an interactive shell in the current context.
:param options: All keyword arguments are passed on to the
initializer of the :attr:`command_type` class.
:returns: The :attr:`command_type` object.
.. note:: After constructing a :attr:`command_type` object this method
calls :func:`~executor.ExternalCommand.start()` on the
command before returning it to the caller, so by the time the
caller gets the command object a synchronous command will
have already ended. Asynchronous commands don't have this
limitation of course.
"""
cmd = self.prepare_interactive_shell(options)
cmd.start()
return cmd
[docs] def test(self, *command, **options):
"""
Execute an external command in the current context and get its status.
:param command: All positional arguments are passed on to the
initializer of the :attr:`command_type` class.
:param options: All keyword arguments are passed on to the
initializer of the :attr:`command_type` class.
:returns: The value of :attr:`.ExternalCommand.succeeded`.
This method automatically sets :attr:`~.ExternalCommand.check` to
:data:`False` and :attr:`~.ExternalCommand.silent` to :data:`True`.
"""
options.update(check=False, silent=True)
cmd = self.prepare_command(command, options)
cmd.start()
return cmd.succeeded
[docs] def write_file(self, filename, contents, **options):
"""
Change the contents of a file.
:param filename: The pathname of the file to write (a string).
:param contents: The contents to write to the file (a byte string).
:param options: Optional keyword arguments to :func:`execute()`.
This method uses a combination of cat_ and `output redirection`_ to
change the contents of files so that options like
:attr:`~.ExternalCommand.sudo` are respected (regardless of whether
we're dealing with a :class:`LocalContext` or :class:`RemoteContext`).
Due to the use of cat_ this method will create files that don't exist
yet, assuming the directory containing the file already exists and the
context provides permission to write to the directory.
.. _output redirection: https://en.wikipedia.org/wiki/Redirection_(computing)
"""
options.update(input=contents, shell=True)
return self.execute('cat > %s' % quote(filename), **options)
[docs]class LocalContext(AbstractContext):
"""Context for executing commands on the local system."""
@property
def command_type(self):
"""The type of command objects created by this context (:class:`.ExternalCommand`)."""
return ExternalCommand
[docs] @lazy_property
def cpu_count(self):
"""
The number of CPUs in the system (an integer).
This property's value is computed using :func:`multiprocessing.cpu_count()`.
"""
return multiprocessing.cpu_count()
[docs] def glob(self, pattern):
"""
Find matches for a given filename pattern.
:param pattern: A filename pattern (a string).
:returns: A list of strings with matches.
This method overrides :func:`AbstractContext.glob()` to call
:func:`glob.glob()` directly instead of forking a new Python
interpreter.
This optimization is skipped when :attr:`~AbstractContext.options`
contains :attr:`~executor.ExternalCommand.sudo`,
:attr:`~executor.ExternalCommand.uid` or
:attr:`~executor.ExternalCommand.user` to avoid reporting wrong matches
due to insufficient filesystem permissions.
"""
if any(map(self.options.get, ('sudo', 'uid', 'user'))):
return super(LocalContext, self).glob(pattern)
else:
return glob.glob(pattern)
[docs] def __str__(self):
"""Render a human friendly string representation of the context."""
return "local system (%s)" % socket.gethostname()
[docs]class ChangeRootContext(AbstractContext):
"""Context for executing commands in change roots using chroot_."""
[docs] def __init__(self, *args, **options):
"""
Initialize a :class:`ChangeRootContext` object.
:param args: Positional arguments are passed on to the initializer of
the :class:`AbstractContext` class (for future
extensibility).
:param options: Any keyword arguments are passed on to the initializer
of the :class:`AbstractContext` class.
If the keyword argument `chroot` isn't given but positional arguments
are provided, the first positional argument is used to set the
:attr:`chroot` property.
"""
# Enable modification of the positional arguments.
args = list(args)
# We allow `chroot' to be passed as a keyword argument but use the
# first positional argument when the keyword argument isn't given.
if options.get('chroot') is None and args:
options['chroot'] = args.pop(0)
# Initialize the superclass.
super(ChangeRootContext, self).__init__(*args, **options)
[docs] @required_property
def chroot(self):
"""The pathname of the root directory of the chroot (a string)."""
@property
def command_type(self):
"""The type of command objects created by this context (:class:`.ChangeRootCommand`)."""
return ChangeRootCommand
[docs] @lazy_property
def cpu_count(self):
"""
The number of CPUs in the system (an integer).
This property's value is computed using :func:`multiprocessing.cpu_count()`.
"""
return multiprocessing.cpu_count()
[docs] def get_options(self):
"""The :attr:`~AbstractContext.options` including :attr:`chroot`."""
options = dict(self.options)
options.update(chroot=self.chroot)
return options
[docs] def __str__(self):
"""Render a human friendly string representation of the context."""
return "chroot (%s)" % self.chroot
[docs]class SecureChangeRootContext(AbstractContext):
"""Context for executing commands in change roots using schroot_."""
[docs] def __init__(self, *args, **options):
"""
Initialize a :class:`SecureChangeRootContext` object.
:param args: Positional arguments are passed on to the initializer of
the :class:`AbstractContext` class (for future
extensibility).
:param options: Any keyword arguments are passed on to the initializer
of the :class:`AbstractContext` class.
If the keyword argument `chroot_name` isn't given but positional
arguments are provided, the first positional argument is used to set
the :attr:`chroot_name` property.
"""
# Enable modification of the positional arguments.
args = list(args)
# We allow `chroot_name' to be passed as a keyword argument but use the
# first positional argument when the keyword argument isn't given.
if options.get('chroot_name') is None and args:
options['chroot_name'] = args.pop(0)
# Initialize the superclass.
super(SecureChangeRootContext, self).__init__(*args, **options)
[docs] @required_property
def chroot_name(self):
"""The name of a chroot managed by schroot_ (a string)."""
@property
def command_type(self):
"""The type of command objects created by this context (:class:`.SecureChangeRootCommand`)."""
return SecureChangeRootCommand
[docs] @lazy_property
def cpu_count(self):
"""
The number of CPUs in the system (an integer).
This property's value is computed using :func:`multiprocessing.cpu_count()`.
"""
return multiprocessing.cpu_count()
[docs] def get_options(self):
"""The :attr:`~AbstractContext.options` including :attr:`chroot_name`."""
options = dict(self.options)
options.update(chroot_name=self.chroot_name)
return options
[docs] def __str__(self):
"""Render a human friendly string representation of the context."""
return "secure chroot (%s)" % self.chroot_name
[docs]class RemoteContext(RemoteAccount, AbstractContext):
"""Context for executing commands on a remote system over SSH."""
@property
def command_type(self):
"""The type of command objects created by this context (:class:`.RemoteCommand`)."""
return RemoteCommand
[docs] @lazy_property
def cpu_count(self):
"""
The number of CPUs in the system (an integer).
This property's value is computed by executing the remote command
nproc_. If that command fails :attr:`cpu_count` falls back to the
command ``grep -ci '^processor\\s*:' /proc/cpuinfo``.
.. _nproc: http://linux.die.net/man/1/nproc
"""
try:
return int(self.capture('nproc', shell=False, silent=True))
except Exception:
return int(self.capture('grep', '-ci', r'^processor\s*:', '/proc/cpuinfo'))
[docs] def get_options(self):
"""The :attr:`~AbstractContext.options` including the SSH alias and remote user."""
options = dict(self.options)
options.update(ssh_alias=self.ssh_alias, ssh_user=self.ssh_user)
return options
[docs] def __str__(self):
"""Render a human friendly string representation of the context."""
return "remote system (%s)" % self.ssh_alias
[docs]def normalize_mirror_url(value):
"""Normalize a package mirror URL to enable string equality comparison."""
# If Debian and/or Ubuntu ever decide to switch from HTTP to HTTPS I don't
# want this to invalidate the matching against MIRROR_TO_DISTRIB_MAPPING.
value = value.replace(u'https://', u'http://')
# Domain names are case insensitive (technically anything after the domain
# name isn't, but for our limited purposes it really doesn't matter).
value = value.lower()
# Ignore insignificant trailing slash.
return value.rstrip(u'/')