Source code for executor.ssh.server

# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: May 4, 2018
# URL: https://executor.readthedocs.io

"""
OpenSSH server automation for testing.

The :mod:`executor.ssh.server` module defines the :class:`SSHServer` class
which can be used to start temporary OpenSSH servers that are isolated enough
from the host system to make them usable in the :mod:`executor` test suite (to
test remote command execution).
"""

# Standard library modules.
import logging
import os
import shutil
import tempfile

# Modules included in our package.
from executor import execute, which
from executor.tcp import EphemeralTCPServer, TimeoutError

# External dependencies.
from humanfriendly import Timer

# Public identifiers that require documentation.
__all__ = (
    'SSHD_PROGRAM_NAME',
    'SSHServer',
    'logger',
    # Backwards compatibility.
    'EphemeralTCPServer',
    'TimeoutError',
)

# Initialize a logger.
logger = logging.getLogger(__name__)

SSHD_PROGRAM_NAME = 'sshd'
"""The name of the SSH server executable (a string)."""


[docs]class SSHServer(EphemeralTCPServer): """ Subclass of :class:`.ExternalCommand` that manages a temporary SSH server. The OpenSSH server spawned by the :class:`SSHServer` class doesn't need `superuser privileges`_ and doesn't require any changes to ``/etc/passwd`` or ``/etc/shadow``. """
[docs] def __init__(self, **options): """ Initialize an :class:`SSHServer` object. :param options: All keyword arguments are passed on to :func:`executor.ExternalCommand.__init__()`. """ self.temporary_directory = tempfile.mkdtemp(prefix='executor-', suffix='-ssh-server') """ The pathname of the temporary directory used to store the files required to run the SSH server (a string). """ self.client_key_file = os.path.join(self.temporary_directory, 'client-key') """The pathname of the generated OpenSSH client key file (a string).""" self.config_file = os.path.join(self.temporary_directory, 'config') """The pathname of the generated OpenSSH server configuration file (a string).""" self.host_key_file = os.path.join(self.temporary_directory, 'host-key') """The random port number on which the SSH server will listen (an integer).""" # Initialize the superclass. options.setdefault('scheme', 'ssh') options.setdefault('logger', logger) super(SSHServer, self).__init__(self.sshd_path, '-D', '-f', self.config_file, **options)
@property def sshd_path(self): """The absolute pathname of :data:`SSHD_PROGRAM_NAME` (a string).""" executables = which(SSHD_PROGRAM_NAME) return executables[0] if executables else SSHD_PROGRAM_NAME @property def client_options(self): """ The options for the OpenSSH client (required to connect with the server). This is a dictionary of keyword arguments for :class:`.RemoteCommand` to make it connect with the OpenSSH server (assuming the remote command connects to an IP address in the 127.0.0.0/24 range). """ return dict(identity_file=self.client_key_file, ignore_known_hosts=True, port=self.port_number)
[docs] def start(self, **options): """ Start the SSH server and wait for it to start accepting connections. :param options: Any keyword arguments are passed to the :func:`~EphemeralTCPServer.start()` method of the superclass. :raises: Any exceptions raised by the :func:`~EphemeralTCPServer.start()` method of the superclass. The :func:`start()` method automatically calls the :func:`generate_key_file()` and :func:`generate_config()` methods. """ if not self.was_started: self.logger.debug("Preparing to start SSH server ..") for key_file in (self.host_key_file, self.client_key_file): self.generate_key_file(key_file) self.generate_config() super(SSHServer, self).start()
[docs] def generate_key_file(self, filename): """ Generate a temporary host or client key for the OpenSSH server. The :func:`start()` method automatically calls :func:`generate_key_file()` to generate :data:`host_key_file` and :attr:`client_key_file`. This method uses the ``ssh-keygen`` program to generate the keys. """ if not os.path.isfile(filename): timer = Timer() self.logger.debug("Generating SSH key file (%s) ..", filename) execute('ssh-keygen', '-f', filename, '-N', '', '-t', 'rsa', silent=True, logger=self.logger) self.logger.debug("Generated key file %s in %s.", filename, timer)
[docs] def generate_config(self): """ Generate a configuration file for the OpenSSH server. The :func:`start()` method automatically calls :func:`generate_config()`. """ if not os.path.isfile(self.config_file): self.logger.debug("Generating SSH server configuration (%s) ..", self.config_file) with open(self.config_file, 'w') as handle: handle.write("AllowUsers %s\n" % os.environ['USER']) handle.write("AuthorizedKeysFile %s.pub\n" % (self.client_key_file)) handle.write("HostKey %s\n" % self.host_key_file) handle.write("LogLevel QUIET\n") handle.write("PasswordAuthentication no\n") handle.write("PidFile %s/sshd.pid\n" % self.temporary_directory) handle.write("Port %i\n" % self.port_number) handle.write("StrictModes no\n") handle.write("UsePAM no\n") handle.write("UsePrivilegeSeparation no\n")
[docs] def cleanup(self): """Clean up :attr:`temporary_directory` after the test server finishes.""" if self.temporary_directory: if os.path.isdir(self.temporary_directory): self.logger.debug("Cleaning up temporary directory %s ..", self.temporary_directory) shutil.rmtree(self.temporary_directory) self.temporary_directory = None super(SSHServer, self).cleanup()