Source code for executor.tcp
# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: March 2, 2020
# URL: https://executor.readthedocs.io
"""
Miscellaneous TCP networking functionality.
The functionality in this module originated in the :class:`executor.ssh.server`
module with the purpose of facilitating a robust automated test suite for the
:class:`executor.ssh.client` module. While working on SSH tunnel support I
needed similar logic again and I decided to extract this code from the
:class:`executor.ssh.server` module.
"""
# Standard library modules.
import itertools
import logging
import random
import socket
# Modules included in our package.
from executor import ExternalCommand
# External dependencies.
from humanfriendly import Timer, format_timespan
from humanfriendly.terminal.spinners import Spinner
from humanfriendly.text import format, pluralize
from property_manager import (
PropertyManager,
lazy_property,
mutable_property,
required_property,
set_property,
)
# Public identifiers that require documentation.
__all__ = (
'EphemeralPortAllocator',
'EphemeralTCPServer',
'TimeoutError',
'WaitUntilConnected',
'logger',
)
# Initialize a logger.
logger = logging.getLogger(__name__)
[docs]class WaitUntilConnected(PropertyManager):
"""Wait for a TCP endpoint to start accepting connections."""
[docs] @mutable_property
def connect_timeout(self):
"""The timeout in seconds for individual connection attempts (a number, defaults to 2)."""
return 2
@property
def endpoint(self):
"""A human friendly representation of the TCP endpoint (a string containing a URL)."""
return format("%s://%s:%i", self.scheme, self.hostname, self.port_number)
[docs] @mutable_property
def hostname(self):
"""The host name or IP address to connect to (a string, defaults to ``localhost``)."""
return 'localhost'
@property
def is_connected(self):
""":data:`True` if a connection was accepted, :data:`False` otherwise."""
timer = Timer()
logger.debug("Checking whether %s is accepting connections ..", self.endpoint)
try:
socket.create_connection((self.hostname, self.port_number), self.connect_timeout)
logger.debug("Yes %s is accepting connections (took %s).", self.endpoint, timer)
return True
except Exception:
logger.debug("No %s isn't accepting connections (took %s).", self.endpoint, timer)
return False
[docs] @required_property
def port_number(self):
"""The port number to connect to (an integer)."""
[docs] @mutable_property
def scheme(self):
"""A URL scheme that indicates the purpose of the ephemeral port (a string, defaults to 'tcp')."""
return 'tcp'
[docs] @mutable_property
def wait_timeout(self):
"""The timeout in seconds for :func:`wait_until_connected()` (a number, defaults to 30)."""
return 30
[docs] def wait_until_connected(self):
"""
Wait until connections are being accepted.
:raises: :exc:`TimeoutError` when the SSH server isn't fast enough to
initialize.
"""
timer = Timer()
with Spinner(timer=timer) as spinner:
while not self.is_connected:
if timer.elapsed_time > self.wait_timeout:
raise TimeoutError(format(
"Failed to establish connection to %s within configured timeout of %s!",
self.endpoint, format_timespan(self.wait_timeout),
))
spinner.step(label="Waiting for %s to accept connections" % self.endpoint)
spinner.sleep()
logger.debug("Waited %s for %s to accept connections.", timer, self.endpoint)
[docs]class EphemeralPortAllocator(WaitUntilConnected):
"""
Allocate a free `ephemeral port number`_.
.. _ephemeral port number: \
http://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic.2C_private_or_ephemeral_ports
"""
[docs] @lazy_property
def port_number(self):
"""A dynamically selected free ephemeral port number (an integer between 49152 and 65535)."""
timer = Timer()
logger.debug("Looking for free ephemeral port number ..")
for i in itertools.count(1):
value = self.ephemeral_port_number
set_property(self, 'port_number', value)
if not self.is_connected:
logger.debug("Found free ephemeral port number %s after %s (took %s).",
value, pluralize(i, "attempt"), timer)
return value
@property
def ephemeral_port_number(self):
"""A random ephemeral port number (an integer between 49152 and 65535)."""
return random.randint(49152, 65535)
[docs]class EphemeralTCPServer(ExternalCommand, EphemeralPortAllocator):
"""
Make it easy to launch ephemeral TCP servers.
The :class:`EphemeralTCPServer` class makes it easy to allocate an
`ephemeral port number`_ that is not (yet) in use.
.. _ephemeral port number: \
http://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic.2C_private_or_ephemeral_ports
"""
@property
def asynchronous(self):
"""Ephemeral TCP servers always set :attr:`.ExternalCommand.asynchronous` to :data:`True`."""
return True
[docs] def start(self, **options):
"""
Start the TCP server and wait for it to start accepting connections.
:param options: Any keyword arguments are passed to the
:func:`~executor.ExternalCommand.start()` method of the
superclass.
:raises: Any exceptions raised by :func:`~executor.ExternalCommand.start()`
and :func:`~executor.tcp.WaitUntilConnected.wait_until_connected()`.
If the TCP server doesn't start accepting connections within the
configured timeout (see :attr:`~executor.tcp.WaitUntilConnected.wait_timeout`)
the process will be terminated and the timeout exception is propagated.
"""
if not self.was_started:
logger.debug("Preparing to start %s server ..", self.scheme.upper())
super(EphemeralTCPServer, self).start(**options)
try:
self.wait_until_connected()
except TimeoutError:
self.terminate()
raise
[docs]class TimeoutError(Exception):
"""
Raised when a TCP server doesn't start accepting connections quickly enough.
This exception is raised by :func:`~executor.tcp.WaitUntilConnected.wait_until_connected()`
when the TCP server doesn't start accepting connections within a reasonable time.
"""