From 701dca49e59bedf29c978dd78c5d0f293e8dfa02 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 12:24:50 -0400 Subject: [PATCH 01/25] Add some additional `logdir` passing tests Relates to #20 --- tests/test_agent.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/tests/test_agent.py b/tests/test_agent.py index db396bb..25ce27c 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -51,7 +51,7 @@ def test_logdir(funcname): check_log_files(ua, tempfile.gettempdir()) -def test_scen_logdir(): +def test_scen_assign_logdir(): """Verify log file arguments when logdir is set using Scenario.defaults """ scen = pysipp.scenario() @@ -61,6 +61,32 @@ def test_scen_logdir(): check_log_files(ua, logdir) +def test_scen_pass_logdir(): + """Verify log file arguments when logdir is set using Scenario.defaults + """ + logdir = tempfile.mkdtemp(suffix='_pysipp') + scen = pysipp.scenario(logdir=logdir) + assert scen.defaults.logdir == logdir + + # logdir isn't set until the scenario is "prepared" + assert scen.agents['uac'].logdir is None + + # logdir is set once scenario is "rendered" + for ua in scen.prepare(): + check_log_files(ua, logdir) + + +def test_walk_pass_logdir(): + logdir = tempfile.mkdtemp(suffix='_pysipp') + scen = next(pysipp.walk( + './tests/scens/default/', logdir=logdir))[1] + assert scen.logdir == logdir + + # logdir is set once scenario is "rendered" + for ua in scen.prepare(): + check_log_files(ua, logdir) + + def test_client(): # check the built-in uac xml scenario remote_sock = ('192.168.1.1', 5060) From b83378937417c2c762e4a2d155057cec02886b3c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 12:26:40 -0400 Subject: [PATCH 02/25] Assign kwargs to `Scenario.defaults` at creation The `defaults` kwarg to `pysipp.agent.Scenario` isn't used so just drop it and instead comb through provided `kwargs` and pop out values passed by the user into `pysipp.agent.Scenario.defaults` making these the "new" defaults applied to agents. Also, rename the default settings dicts more explicitly. Resolves #20 --- pysipp/agent.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/pysipp/agent.py b/pysipp/agent.py index 2947755..5af5a6e 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -76,7 +76,9 @@ def is_client(self): def is_server(self): return 'uas' in self.name.lower() - def iter_logfile_items(self, types_attr='_log_types', enable_screen_file=True): + def iter_logfile_items( + self, types_attr='_log_types', enable_screen_file=True + ): for name in getattr(self, types_attr): if name != 'screen' or enable_screen_file: attr_name = name + '_file' @@ -118,11 +120,14 @@ def enable_tracing(self): attr_name = 'trace_' + name setattr(self, attr_name, True) - def enable_logging(self, logdir=None, debug=False, enable_screen_file=True): + def enable_logging( + self, logdir=None, debug=False, enable_screen_file=True + ): """Enable agent logging by appending appropriately named log file arguments to the underlying command. """ - logattrs = self.iter_logfile_items(enable_screen_file=enable_screen_file) + logattrs = self.iter_logfile_items( + enable_screen_file=enable_screen_file) if debug: logattrs = itertools.chain( logattrs, @@ -196,21 +201,22 @@ def client(**kwargs): return ua(**defaults) -_dd = { +# default values every scenario should define at a minimum +_minimum_defaults_template = { 'key_vals': {}, 'global_vars': {}, } -_defaults = { +_scen_defaults_template = { 'recv_timeout': 5000, 'call_count': 1, 'rate': 1, 'limit': 1, 'logdir': tempfile.gettempdir(), } -_defaults.update(deepcopy(_dd)) +_scen_defaults_template.update(deepcopy(_minimum_defaults_template)) -def Scenario(agents, defaults=None, **kwargs): +def Scenario(agents, **kwargs): """Wraps (subsets of) user agents in global state pertaining to configuration, routing, and default arguments. @@ -218,9 +224,12 @@ def Scenario(agents, defaults=None, **kwargs): """ scentype = type('Scenario', (ScenarioType,), {}) - _defs = OrderedDict(deepcopy(_defaults)) - if defaults: - _defs.update(defaults) + _defs = OrderedDict(deepcopy(_scen_defaults_template)) + # for any passed kwargs that have keys in ``_defaults_template``, set them + # as the new defaults for the scenario + for key, val in kwargs.copy().items(): + if key in _defs: + _defs[key] = kwargs.pop(key) # this gives us scen. attribute access to scen.defaults utils.DictProxy(_defs, UserAgent.keys(), cls=scentype) @@ -245,11 +254,13 @@ def __init__(self, agents, defaults, clientdefaults=None, self.defaults = utils.DictProxy(self._defaults, ua_attrs)() # client settings - self._clientdefaults = OrderedDict(clientdefaults or deepcopy(_dd)) + self._clientdefaults = OrderedDict( + clientdefaults or deepcopy(_minimum_defaults_template)) self.clientdefaults = utils.DictProxy(self._clientdefaults, ua_attrs)() # server settings - self._serverdefaults = OrderedDict(serverdefaults or deepcopy(_dd)) + self._serverdefaults = OrderedDict( + serverdefaults or deepcopy(_minimum_defaults_template)) self.serverdefaults = utils.DictProxy(self._serverdefaults, ua_attrs)() # hook module From c3d6bbd348c3b48aecda5aa18d1e9c0dcb3f4d4b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 12:33:17 -0400 Subject: [PATCH 03/25] Add official release stamp and be looser on pluggy version --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index a13dc99..6bed2d6 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version='0.1.alpha', + version='0.1.0', description='pysipp is a SIPp scenario launcher for for use in' ' automated VoIP testing', long_description=readme, @@ -36,7 +36,7 @@ url='https://github.com/SIPp/pysipp', platforms=['linux'], packages=['pysipp', 'pysipp.cli'], - install_requires=['pluggy==0.11.0'], + install_requires=['pluggy>=0.11.0'], tests_require=['pytest'], entry_points={ 'console_scripts': [ From c275334f3f80f4dcb8337d7bd8fc9694fb8fd2dd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 12:41:50 -0400 Subject: [PATCH 04/25] Raise `ValueError` on non-tuple assignment These fancy tuple-attribute-properties are supposed to be strictly assigned tuple types; let's enforce that. Resolves #12 --- pysipp/agent.py | 2 ++ tests/test_agent.py | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/pysipp/agent.py b/pysipp/agent.py index 5af5a6e..568c410 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -23,6 +23,8 @@ def getter(self): return None def setter(self, pair): + if not isinstance(pair, tuple): + raise ValueError("{} must be a tuple".format(pair)) for attr, val in zip(attrs, pair or itertools.repeat(None)): setattr(self, attr, val) diff --git a/tests/test_agent.py b/tests/test_agent.py index 25ce27c..0545952 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -186,3 +186,8 @@ def test_scenario(): assert uac.uri_username != doggy assert scen.name == 'uas_uac' + + +def test_pass_bad_socket_addr(): + with pytest.raises(ValueError): + pysipp.client(proxyaddr='10.10.8.88') From 4bfe5db6e18ec420275a9f8cba5771dc87470593 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 18:52:11 -0400 Subject: [PATCH 05/25] Further fixes As per the change for #20, handle the case where the user passes in a `defaults` kwarg to `pysipp.scenario` and use it override the normal template. To fully solve #12 also support assignment of `None` which results in the tuple attribute taking a value of `(None, None)` implicitly. --- pysipp/agent.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pysipp/agent.py b/pysipp/agent.py index 568c410..988eaeb 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -24,8 +24,11 @@ def getter(self): def setter(self, pair): if not isinstance(pair, tuple): - raise ValueError("{} must be a tuple".format(pair)) - for attr, val in zip(attrs, pair or itertools.repeat(None)): + if pair is None: + pair = (None, None) + else: + raise ValueError("{} must be a tuple".format(pair)) + for attr, val in zip(attrs, pair): setattr(self, attr, val) doc = "{} parameters composed as a tuple".format(', '.join(attrs)) @@ -233,6 +236,12 @@ def Scenario(agents, **kwargs): if key in _defs: _defs[key] = kwargs.pop(key) + # if a `defaults` kwarg is passed in by the user override template values with + # values from that as well + user_defaults = kwargs.pop('defaults', None) + if user_defaults: + _defs.update(user_defaults) + # this gives us scen. attribute access to scen.defaults utils.DictProxy(_defs, UserAgent.keys(), cls=scentype) return scentype(agents, _defs, **kwargs) From 7544b5d7ba1903f6b7b57dff3b01b73556652617 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 19:09:06 -0400 Subject: [PATCH 06/25] Doc tweak --- pysipp/netplug.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pysipp/netplug.py b/pysipp/netplug.py index 4d2e87a..fa3dffa 100644 --- a/pysipp/netplug.py +++ b/pysipp/netplug.py @@ -28,8 +28,8 @@ def getsockaddr(host, family=socket.AF_INET, port=0, sockmod=socket): @plugin.hookimpl def pysipp_conf_scen(agents, scen): - """Allocate a random socket addresses from the local OS for - each agent in the scenario. + """Automatically allocate random socket addresses from the local OS for + each agent in the scenario if not previously set by the user. """ host = scen.defaults.local_host or socket.getfqdn() for ua in scen.agents.values(): From 8ecbdb092a4fb2d498b62530c82b68ec91092d4a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 19:09:25 -0400 Subject: [PATCH 07/25] Use latest official sipp release in CI --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1d07d66..1acac4a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,9 +28,9 @@ install: - pip list before_script: - - wget https://github.com/SIPp/sipp/releases/download/v3.5.2/sipp-3.5.2.tar.gz - - tar -xvzf sipp-3.5.2.tar.gz - - cd sipp-3.5.2 + - wget https://github.com/SIPp/sipp/releases/download/v3.6.0/sipp-3.6.0.tar.gz + - tar -xvzf sipp-3.6.0.tar.gz + - cd sipp-3.6.0 - ./configure - make - export PATH="$PWD:$PATH" From a41df327d8b5af1715907a0691e484716fa2db14 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 6 Jul 2019 14:20:19 -0400 Subject: [PATCH 08/25] Moderninze setup.py to new PyPa standards --- setup.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index 6bed2d6..ced8544 100755 --- a/setup.py +++ b/setup.py @@ -27,12 +27,12 @@ setup( name="pysipp", version='0.1.0', - description='pysipp is a SIPp scenario launcher for for use in' - ' automated VoIP testing', + description="A SIPp scenario launcher", long_description=readme, + long_description_content_type="text/markdown", license='GPLv2', author='Tyler Goodlet', - author_email='tgoodlet@gmail.com', + author_email='jgbt@protonmail.com', url='https://github.com/SIPp/pysipp', platforms=['linux'], packages=['pysipp', 'pysipp.cli'], @@ -46,8 +46,8 @@ classifiers=[ 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers', - 'License :: OSI Approved :: GNU General Public License v2', - 'Operating System :: Linux', + 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', + 'Operating System :: POSIX :: Linux', 'Programming Language :: Python :: 2.7', 'Topic :: Software Development', 'Topic :: Software Development :: Testing', From 1ecc5139655931a49195c4a26ff47e1b2f0f6d5a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 21:46:01 -0400 Subject: [PATCH 09/25] Drop runner related hooks --- pysipp/hookspec.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pysipp/hookspec.py b/pysipp/hookspec.py index c82a21e..fa953c0 100644 --- a/pysipp/hookspec.py +++ b/pysipp/hookspec.py @@ -66,18 +66,3 @@ def pysipp_conf_scen(agents, scen): socket arguments. It it the recommended hook for applying a default scenario configuration. """ - - -@hookspec(firstresult=True) -def pysipp_new_runner(): - """Create and return a runner instance to be used for invoking - multiple SIPp commands. The runner must be callable and support both a - `block` and `timeout` kwarg. - """ - - -@hookspec(firstresult=True) -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """Perform steps to execute all SIPp commands usually by calling a - preconfigured command launcher/runner. - """ From 060e9b50b5da4180fcdad9be25eaa54f55e1cc2f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 21:47:12 -0400 Subject: [PATCH 10/25] Use `trio` for process and scenario launching! After attempting to find an OS portable way to spawn subprocesses using the stdlib and coming out unsatisfied, I've decided use the new subprocess launching support in `trio`! This will of course require that the project moves to python 3.6+ giving us access to a lot of neat features of modern python including async/await support and adherence to the structured concurrency principles prominent in `trio`. It turns out this is a good fit since SIPp already has a built in cancellation mechanism via the SIGUSR1 signal. There's a lot of "core" changes to go over in this commit: - drop the "run protocol" and "runner creation" related hooks since they really shouldn't be overridden until there's some need for it and it's likely smarter to keep those "machinery" details strictly internal for now - the run "protocol" has now been relegated to an async function: `pysipp.launch.run_all_agents()` - many routines have been converted to async functions particularly at the runner (`pysipp.TrioRunner.run()`, `.get()`) and scenario (`pysipp.Scenario.arun()`) levels allowing us to expose both a sync and async interface for running subprocesses / agents - drop all the epoll/select loop stuff as this is entirely delegated to `trio.open_process()` and it's underlying machinery and APIs Resolves #53 --- pysipp/__init__.py | 70 +--------------- pysipp/agent.py | 66 +++++++++++---- pysipp/launch.py | 204 ++++++++++++++++++++++++--------------------- pysipp/report.py | 5 +- 4 files changed, 163 insertions(+), 182 deletions(-) diff --git a/pysipp/__init__.py b/pysipp/__init__.py index 46eb248..fc703b8 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -20,16 +20,11 @@ ''' import sys from os.path import dirname -from . import launch, report, plugin, netplug, agent +from . import plugin, netplug, agent from .load import iter_scen_dirs from .agent import client, server -class SIPpFailure(RuntimeError): - """SIPp commands failed - """ - - __package__ = 'pysipp' __author__ = 'Tyler Goodlet (tgoodlet@gmail.com)' @@ -106,12 +101,11 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[uas, uac], confpy=None, - scenkwargs=scenkwargs ) - if proxyaddr: - assert isinstance( - proxyaddr, tuple), 'proxyaddr must be a (addr, port) tuple' + if proxyaddr is not None: + assert isinstance(proxyaddr, tuple), ( + 'proxyaddr must be a (addr, port) tuple') scen.clientdefaults.proxyaddr = proxyaddr return scen @@ -196,61 +190,5 @@ def pysipp_conf_scen(agents, scen): ua.rtp_echo = True -@plugin.hookimpl -def pysipp_new_runner(): - """Provision and assign a default cmd runner - """ - return launch.PopenRunner() - - -@plugin.hookimpl -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """"Run all rendered commands with the provided runner or the built-in - PopenRunner which runs commands locally. - """ - # use provided runner or default provided by hook - runner = runner or plugin.mng.hook.pysipp_new_runner() - agents = scen.prepare() - - def finalize(cmds2procs=None, timeout=180, raise_exc=True): - """Wait for all remaining agents in the scenario to finish executing - and perform error and logfile reporting. - """ - cmds2procs = cmds2procs or runner.get(timeout=timeout) - agents2procs = list(zip(agents, cmds2procs.values())) - msg = report.err_summary(agents2procs) - if msg: - # report logs and stderr - report.emit_logfiles(agents2procs) - if raise_exc: - # raise RuntimeError on agent failure(s) - # (HINT: to rerun type `scen()` from the debugger) - raise SIPpFailure(msg) - - return cmds2procs - - try: - # run all agents (raises RuntimeError on timeout) - cmds2procs = runner( - (ua.render() for ua in agents), - block=block, timeout=timeout - ) - except launch.TimeoutError: # sucessful timeout - cmds2procs = finalize(timeout=0, raise_exc=False) - if raise_exc: - raise - else: - # async - if not block: - # XXX async run must bundle up results for later processing - scen.finalize = finalize - return finalize - - # sync - finalize(cmds2procs, raise_exc=raise_exc) - - return runner - - # register the default hook set plugin.mng.register(sys.modules[__name__]) diff --git a/pysipp/agent.py b/pysipp/agent.py index 988eaeb..ceb211f 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -5,10 +5,14 @@ import re import itertools import tempfile +from functools import partial from copy import deepcopy from distutils import spawn from collections import namedtuple, OrderedDict -from . import command, plugin, utils + +import trio + +from . import command, plugin, utils, launch, report log = utils.get_logger() @@ -60,20 +64,21 @@ def name(self): ipcaddr = tuple_property(('ipc_host', 'ipc_port')) call_load = tuple_property(('rate', 'limit', 'call_count')) - def __call__(self, block=True, timeout=180, runner=None, raise_exc=True, - **kwargs): + + def __call__(self, *args, **kwargs): + return self.run(*args, **kwargs) + + def run( + self, + timeout=180, + **kwargs + ): # create and configure a temp scenario scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[self], confpy=None, scenkwargs={}, ) - # run the standard protocol - # (attach allocted runner for reuse/post-portem) - return plugin.mng.hook.pysipp_run_protocol( - scen=scen, block=block, timeout=timeout, - runner=runner, - raise_exc=raise_exc, **kwargs - ) + return scen.run(timeout=timeout, **kwargs) def is_client(self): return 'uac' in self.name.lower() @@ -254,8 +259,13 @@ class ScenarioType(object): If called it will invoke the standard run hooks. """ - def __init__(self, agents, defaults, clientdefaults=None, - serverdefaults=None, confpy=None, enable_screen_file=True): + def __init__( + self, agents, defaults, clientdefaults=None, + serverdefaults=None, confpy=None, enable_screen_file=True + ): + # placeholder for process "runner" + self._runner = None + # agents iterable in launch-order self._agents = agents ua_attrs = UserAgent.keys() @@ -431,10 +441,30 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs): return type(self)( self.prepare(agents), self._defaults, confpy=self.mod) - def __call__(self, agents=None, block=True, timeout=180, runner=None, - raise_exc=True, copy_agents=False, **kwargs): - return plugin.mng.hook.pysipp_run_protocol( - scen=self, - block=block, timeout=timeout, runner=runner, - raise_exc=raise_exc, **kwargs + async def arun( + self, + timeout=180, + runner=None, + ): + agents = self.prepare() + runner = runner or launch.TrioRunner() + + return await launch.run_all_agents(runner, agents, timeout=timeout) + + def run( + self, + timeout=180, + **kwargs + ): + """Run scenario blocking to completion.""" + return trio.run( + partial( + self.arun, + timeout=timeout, + **kwargs + ) ) + + def __call__(self, *args, **kwargs): + # TODO: deprecation warning here + return self.run(*args, **kwargs) diff --git a/pysipp/launch.py b/pysipp/launch.py index a750128..63b2d29 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -1,17 +1,18 @@ """ Launchers for invoking SIPp user agents """ -import subprocess -import os import shlex -import select -import threading import signal +import subprocess import time from . import utils from pprint import pformat from collections import OrderedDict, namedtuple +import trio + +from . import report + log = utils.get_logger() Streams = namedtuple("Streams", "stdout stderr") @@ -21,30 +22,29 @@ class TimeoutError(Exception): "SIPp process timeout exception" -class PopenRunner(object): - """Run a sequence of SIPp agents asynchronously. If any process terminates - with a non-zero exit code, immediately kill all remaining processes and - collect std streams. +class SIPpFailure(RuntimeError): + """SIPp commands failed + """ + - Adheres to an interface similar to `multiprocessing.pool.AsyncResult`. +class TrioRunner(object): + """Run a sequence of SIPp cmds asynchronously. If any process terminates + with a non-zero exit code, immediately canacel all remaining processes and + collect std streams. """ def __init__( self, - subprocmod=subprocess, - osmod=os, - poller=select.epoll, ): - # these could optionally be rpyc proxy objs - self.spm = subprocmod - self.osm = osmod - self.poller = poller() - # collector thread placeholder - self._waiter = None # store proc results self._procs = OrderedDict() - def __call__(self, cmds, block=True, rate=300, **kwargs): - if self._waiter and self._waiter.is_alive(): + async def run( + self, + cmds, + rate=300, + **kwargs + ): + if self.is_alive(): raise RuntimeError( "Not all processes from a prior run have completed" ) @@ -52,85 +52,78 @@ def __call__(self, cmds, block=True, rate=300, **kwargs): raise RuntimeError( "Process results have not been cleared from previous run" ) - sp = self.spm - os = self.osm - DEVNULL = open(os.devnull, 'wb') - fds2procs = OrderedDict() - # run agent commands in sequence for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - proc = sp.Popen( + proc = await trio.open_process( shlex.split(cmd), - stdout=DEVNULL, - stderr=sp.PIPE + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE ) - fd = proc.stderr.fileno() - log.debug("registering fd '{}' for pid '{}'".format( - fd, proc.pid)) - fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + self._procs[cmd] = proc + # limit launch rate time.sleep(1. / rate) - # launch waiter - self._waiter = threading.Thread(target=self._wait, args=(fds2procs,)) - self._waiter.daemon = True - self._waiter.start() - - return self.get(**kwargs) if block else self._procs + return self._procs - def _wait(self, fds2procs): - log.debug("started waiter for procs {}".format(fds2procs)) + async def get(self, timeout=180): + '''Block up to `timeout` seconds for all agents to complete. + Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + ''' signalled = None - left = len(fds2procs) - collected = 0 - while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: - collected += 1 - proc = fds2procs[fd] - # attach streams so they can be read more then once - log.debug("collecting streams for {}".format(proc)) - proc.streams = Streams(*proc.communicate()) # timeout=2)) - if proc.returncode != 0 and not signalled: + + # taken mostly verbatim from ``trio.run_process()`` + async def read_output(stream): + chunks = [] + async with stream: + try: + while True: + chunk = await stream.receive_some(32768) + if not chunk: + break + chunks.append(chunk) + except trio.ClosedResourceError: + pass + + return b"".join(chunks) + + async def wait_on_proc(proc): + nonlocal signalled + async with proc as proc: + rc = await proc.wait() + if rc != 0 and not signalled: # stop all other agents if there is a failure signalled = self.stop() - log.debug("terminating waiter thread") + # collect stderr output + proc.stderr_output = await read_output(proc.stderr) + + try: + with trio.fail_after(timeout): + async with trio.open_nursery() as n: + for cmd, proc in self._procs.items(): + # async wait on each process to complete + n.start_soon(wait_on_proc, proc) + + return self._procs + + except trio.TooSlowError: + # kill all SIPp processes + signalled = self.stop() + # all procs were killed by SIGUSR1 + raise TimeoutError( + "pids '{}' failed to complete after '{}' seconds".format( + pformat([p.pid for p in signalled.values()]), timeout) + ) - def get(self, timeout=180): - '''Block up to `timeout` seconds for all agents to complete. - Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + def iterprocs(self): + '''Iterate all processes which are still alive yielding + (cmd, proc) pairs ''' - if self._waiter.is_alive(): - self._waiter.join(timeout=timeout) - - if self._waiter.is_alive(): - # kill them mfin SIPps - signalled = self.stop() - self._waiter.join(timeout=10) - - if self._waiter.is_alive(): - # try to stop a few more times - for _ in range(3): - signalled = self.stop() - self._waiter.join(timeout=1) - - if self._waiter.is_alive(): - # some procs failed to terminate via signalling - raise RuntimeError("Unable to kill all agents!?") - - # all procs were killed by SIGUSR1 - raise TimeoutError( - "pids '{}' failed to complete after '{}' seconds".format( - pformat([p.pid for p in signalled.values()]), timeout) - ) - - return self._procs + return ((cmd, proc) for cmd, proc in self._procs.items() + if proc and proc.poll() is None) def stop(self): '''Stop all agents with SIGUSR1 as per SIPp's signal handling @@ -151,25 +144,44 @@ def _signalall(self, signum): signalled[cmd] = proc return signalled - def iterprocs(self): - '''Iterate all processes which are still alive yielding - (cmd, proc) pairs - ''' - return ((cmd, proc) for cmd, proc in self._procs.items() - if proc and proc.poll() is None) - def is_alive(self): '''Return bool indicating whether some agents are still alive ''' return any(self.iterprocs()) - def ready(self): - '''Return bool indicating whether all agents have completed - ''' - return not self.is_alive() - def clear(self): '''Clear all processes from the last run ''' - assert self.ready(), "Not all processes have completed" + assert not self.is_alive(), "Not all processes have completed" self._procs.clear() + + +async def run_all_agents(runner, agents, timeout=180): + """Run a sequencec of agents using a ``TrioRunner``. + """ + async def finalize(): + # this might raise TimeoutError + cmds2procs = await runner.get(timeout=timeout) + agents2procs = list(zip(agents, cmds2procs.values())) + msg = report.err_summary(agents2procs) + if msg: + # report logs and stderr + await report.emit_logfiles(agents2procs) + raise SIPpFailure(msg) + + return cmds2procs + + try: + await runner.run( + (ua.render() for ua in agents), + timeout=timeout + ) + await finalize() + return runner + except TimeoutError as terr: + # print error logs even when we timeout + try: + await finalize() + except SIPpFailure as err: + assert 'exit code -9' in str(err) + raise terr diff --git a/pysipp/report.py b/pysipp/report.py index 9643892..b4dbb49 100644 --- a/pysipp/report.py +++ b/pysipp/report.py @@ -16,6 +16,7 @@ 99: "Normal exit without calls processed", -1: "Fatal error", -2: "Fatal error binding a socket", + -9: "Signalled to stop with SIGUSR1", -10: "Signalled to stop with SIGUSR1", 254: "Connection Error: socket already in use", 255: "Command or syntax error: check stderr output", @@ -41,7 +42,7 @@ def err_summary(agents2procs): return msg -def emit_logfiles(agents2procs, level='warn', max_lines=100): +async def emit_logfiles(agents2procs, level='warn', max_lines=100): """Log all available SIPp log-file contents """ emit = getattr(log, level) @@ -49,7 +50,7 @@ def emit_logfiles(agents2procs, level='warn', max_lines=100): # print stderr emit("stderr for '{}' @ {}\n{}\n".format( - ua.name, ua.srcaddr, proc.streams.stderr)) + ua.name, ua.srcaddr, proc.stderr_output)) # FIXME: no idea, but some logs are not being printed without this # logging mod bug? time.sleep(0.01) From de44f506331a06b3450e20e0aabaa85b64549372 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:16:23 -0400 Subject: [PATCH 11/25] Adjust tests to match new `trio` apis --- tests/test_agent.py | 13 ++++++++++--- tests/test_launcher.py | 30 +++++++++++++++++++----------- tests/test_scenario.py | 3 +++ tests/test_stack.py | 2 +- 4 files changed, 33 insertions(+), 15 deletions(-) create mode 100644 tests/test_scenario.py diff --git a/tests/test_agent.py b/tests/test_agent.py index 0545952..9f46857 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -118,15 +118,22 @@ def test_server(): (agent.client(destaddr=('99.99.99.99', 5060)), 1, {}, RuntimeError), # test if server times out it is signalled - (agent.server(), 0, {'timeout': 1}, launch.TimeoutError)], + (agent.server(), -9, {'timeout': 1}, launch.TimeoutError)], ids=['ua', 'uac', 'uas'], ) def test_failures(ua, retcode, kwargs, exc): """Test failure cases for all types of agents """ + runner = launch.TrioRunner() + # run it without raising - runner = ua(raise_exc=False, **kwargs) - cmds2procs = runner.get(timeout=0) + if exc: + with pytest.raises(exc): + ua(runner=runner, **kwargs) + + # runner = ua(raise_exc=False, **kwargs) + + cmds2procs = runner._procs assert not runner.is_alive() assert len(list(runner.iterprocs())) == 0 # tests transparency of the defaults config pipeline diff --git a/tests/test_launcher.py b/tests/test_launcher.py index e1d435a..5c17070 100644 --- a/tests/test_launcher.py +++ b/tests/test_launcher.py @@ -1,14 +1,18 @@ ''' Basic agent/scenario launching ''' +import time + +import trio +import pytest + from pysipp.agent import client, server -from pysipp.launch import PopenRunner +from pysipp.launch import TrioRunner, run_all_agents, SIPpFailure -def run_blocking(*agents): - runner = PopenRunner() +def run_blocking(runner, agents): assert not runner.is_alive() - runner(ua.render() for ua in agents) + trio.run(run_all_agents, runner, agents) assert not runner.is_alive() return runner @@ -22,24 +26,28 @@ def test_agent_fails(): uac.recv_timeout = 1 # avoids SIPp issue #176 uac.call_count = 1 # avoids SIPp issue #176 - runner = run_blocking(uas, uac) + runner = TrioRunner() + with pytest.raises(SIPpFailure): + run_blocking(runner, (uas, uac)) # fails due to invalid ip - uasproc = runner.get(timeout=0)[uas.render()] - assert uasproc.streams.stderr + uasproc = runner._procs[uas.render()] + print(uasproc.stderr_output) + assert uasproc.stderr_output assert uasproc.returncode == 255, uasproc.streams.stderr # killed by signal - uacproc = runner.get(timeout=0)[uac.render()] - # assert not uacproc.streams.stderr # sometimes this has a log msg? + uacproc = runner._procs[uac.render()] + # assert not uacproc.stderr_output # sometimes this has a log msg? ret = uacproc.returncode # killed by SIGUSR1 or terminates before it starts (racy) assert ret == -10 or ret == 0 def test_default_scen(default_agents): - runner = run_blocking(*default_agents) + runner = TrioRunner() + runner = run_blocking(runner, default_agents) # both agents should be successful - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert not proc.returncode diff --git a/tests/test_scenario.py b/tests/test_scenario.py new file mode 100644 index 0000000..60c7d65 --- /dev/null +++ b/tests/test_scenario.py @@ -0,0 +1,3 @@ +''' +pysipp.agent module tests +''' diff --git a/tests/test_stack.py b/tests/test_stack.py index 3aa9b82..2b8b8f6 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -76,7 +76,7 @@ def test_sync_run(scenwalk): """ for path, scen in scenwalk(): runner = scen(timeout=6) - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert proc.returncode == 0 From 3911de9037158d1746a9dfa81508de897c8d307d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:20:18 -0400 Subject: [PATCH 12/25] Drop py2.7 from CI --- .travis.yml | 1 - tox.ini | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1acac4a..8454dd0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,6 @@ cache: - pip python: - - 2.7 - 3.5 - 3.6 # - 3.7 diff --git a/tox.ini b/tox.ini index cd9502a..5397180 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27, py35, py36, pypy +envlist = py36, py37 [testenv] deps = From d11fe44deb53e868e3e6d046627809238b9a72f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:19:58 -0400 Subject: [PATCH 13/25] Prepare setup script for 1.0 release --- setup.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index ced8544..9d26679 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version='0.1.0', + version='1.0.0.dev', description="A SIPp scenario launcher", long_description=readme, long_description_content_type="text/markdown", @@ -36,7 +36,10 @@ url='https://github.com/SIPp/pysipp', platforms=['linux'], packages=['pysipp', 'pysipp.cli'], - install_requires=['pluggy>=0.11.0'], + install_requires=[ + 'pluggy >= 0.11.0', + 'trio>=0.11.0' + ], tests_require=['pytest'], entry_points={ 'console_scripts': [ @@ -48,7 +51,7 @@ 'Intended Audience :: Developers', 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.6', 'Topic :: Software Development', 'Topic :: Software Development :: Testing', 'Topic :: Software Development :: Quality Assurance', From 53bf5115b4146df4943e1e9db7e1e94998301421 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 12 Jul 2019 17:35:20 -0400 Subject: [PATCH 14/25] trio.open_process() isn't released yet --- pysipp/__init__.py | 1 + pysipp/launch.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pysipp/__init__.py b/pysipp/__init__.py index fc703b8..d88bdfa 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -101,6 +101,7 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[uas, uac], confpy=None, + scenkwargs=scenkwargs, ) if proxyaddr is not None: diff --git a/pysipp/launch.py b/pysipp/launch.py index 63b2d29..f859586 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -56,7 +56,8 @@ async def run( for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - proc = await trio.open_process( + # proc = await trio.open_process( + proc = trio.Process( shlex.split(cmd), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE From e8a819afbeafc18e2dab9178fd3cb7081b6bd517 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Jul 2019 12:41:50 -0400 Subject: [PATCH 15/25] Raise `ValueError` on non-tuple assignment These fancy tuple-attribute-properties are supposed to be strictly assigned tuple types; let's enforce that. Resolves #12 --- pysipp/agent.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pysipp/agent.py b/pysipp/agent.py index 988eaeb..2059b2f 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -23,6 +23,7 @@ def getter(self): return None def setter(self, pair): + if not isinstance(pair, tuple): if pair is None: pair = (None, None) From 0492895b893221d4325b15df9632fb40a2ba2935 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 21:46:01 -0400 Subject: [PATCH 16/25] Drop runner related hooks --- pysipp/hookspec.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pysipp/hookspec.py b/pysipp/hookspec.py index c82a21e..fa953c0 100644 --- a/pysipp/hookspec.py +++ b/pysipp/hookspec.py @@ -66,18 +66,3 @@ def pysipp_conf_scen(agents, scen): socket arguments. It it the recommended hook for applying a default scenario configuration. """ - - -@hookspec(firstresult=True) -def pysipp_new_runner(): - """Create and return a runner instance to be used for invoking - multiple SIPp commands. The runner must be callable and support both a - `block` and `timeout` kwarg. - """ - - -@hookspec(firstresult=True) -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """Perform steps to execute all SIPp commands usually by calling a - preconfigured command launcher/runner. - """ From 3df7a5f871d40e2e436a5d0a315aea6c3e929b1f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 21:47:12 -0400 Subject: [PATCH 17/25] Use `trio` for process and scenario launching! After attempting to find an OS portable way to spawn subprocesses using the stdlib and coming out unsatisfied, I've decided use the new subprocess launching support in `trio`! This will of course require that the project moves to python 3.6+ giving us access to a lot of neat features of modern python including async/await support and adherence to the structured concurrency principles prominent in `trio`. It turns out this is a good fit since SIPp already has a built in cancellation mechanism via the SIGUSR1 signal. There's a lot of "core" changes to go over in this commit: - drop the "run protocol" and "runner creation" related hooks since they really shouldn't be overridden until there's some need for it and it's likely smarter to keep those "machinery" details strictly internal for now - the run "protocol" has now been relegated to an async function: `pysipp.launch.run_all_agents()` - many routines have been converted to async functions particularly at the runner (`pysipp.TrioRunner.run()`, `.get()`) and scenario (`pysipp.Scenario.arun()`) levels allowing us to expose both a sync and async interface for running subprocesses / agents - drop all the epoll/select loop stuff as this is entirely delegated to `trio.open_process()` and it's underlying machinery and APIs Resolves #53 --- pysipp/__init__.py | 70 +--------------- pysipp/agent.py | 66 +++++++++++---- pysipp/launch.py | 204 ++++++++++++++++++++++++--------------------- pysipp/report.py | 5 +- 4 files changed, 163 insertions(+), 182 deletions(-) diff --git a/pysipp/__init__.py b/pysipp/__init__.py index 46eb248..fc703b8 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -20,16 +20,11 @@ ''' import sys from os.path import dirname -from . import launch, report, plugin, netplug, agent +from . import plugin, netplug, agent from .load import iter_scen_dirs from .agent import client, server -class SIPpFailure(RuntimeError): - """SIPp commands failed - """ - - __package__ = 'pysipp' __author__ = 'Tyler Goodlet (tgoodlet@gmail.com)' @@ -106,12 +101,11 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[uas, uac], confpy=None, - scenkwargs=scenkwargs ) - if proxyaddr: - assert isinstance( - proxyaddr, tuple), 'proxyaddr must be a (addr, port) tuple' + if proxyaddr is not None: + assert isinstance(proxyaddr, tuple), ( + 'proxyaddr must be a (addr, port) tuple') scen.clientdefaults.proxyaddr = proxyaddr return scen @@ -196,61 +190,5 @@ def pysipp_conf_scen(agents, scen): ua.rtp_echo = True -@plugin.hookimpl -def pysipp_new_runner(): - """Provision and assign a default cmd runner - """ - return launch.PopenRunner() - - -@plugin.hookimpl -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """"Run all rendered commands with the provided runner or the built-in - PopenRunner which runs commands locally. - """ - # use provided runner or default provided by hook - runner = runner or plugin.mng.hook.pysipp_new_runner() - agents = scen.prepare() - - def finalize(cmds2procs=None, timeout=180, raise_exc=True): - """Wait for all remaining agents in the scenario to finish executing - and perform error and logfile reporting. - """ - cmds2procs = cmds2procs or runner.get(timeout=timeout) - agents2procs = list(zip(agents, cmds2procs.values())) - msg = report.err_summary(agents2procs) - if msg: - # report logs and stderr - report.emit_logfiles(agents2procs) - if raise_exc: - # raise RuntimeError on agent failure(s) - # (HINT: to rerun type `scen()` from the debugger) - raise SIPpFailure(msg) - - return cmds2procs - - try: - # run all agents (raises RuntimeError on timeout) - cmds2procs = runner( - (ua.render() for ua in agents), - block=block, timeout=timeout - ) - except launch.TimeoutError: # sucessful timeout - cmds2procs = finalize(timeout=0, raise_exc=False) - if raise_exc: - raise - else: - # async - if not block: - # XXX async run must bundle up results for later processing - scen.finalize = finalize - return finalize - - # sync - finalize(cmds2procs, raise_exc=raise_exc) - - return runner - - # register the default hook set plugin.mng.register(sys.modules[__name__]) diff --git a/pysipp/agent.py b/pysipp/agent.py index 2059b2f..bc2b9ba 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -5,10 +5,14 @@ import re import itertools import tempfile +from functools import partial from copy import deepcopy from distutils import spawn from collections import namedtuple, OrderedDict -from . import command, plugin, utils + +import trio + +from . import command, plugin, utils, launch, report log = utils.get_logger() @@ -61,20 +65,21 @@ def name(self): ipcaddr = tuple_property(('ipc_host', 'ipc_port')) call_load = tuple_property(('rate', 'limit', 'call_count')) - def __call__(self, block=True, timeout=180, runner=None, raise_exc=True, - **kwargs): + + def __call__(self, *args, **kwargs): + return self.run(*args, **kwargs) + + def run( + self, + timeout=180, + **kwargs + ): # create and configure a temp scenario scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[self], confpy=None, scenkwargs={}, ) - # run the standard protocol - # (attach allocted runner for reuse/post-portem) - return plugin.mng.hook.pysipp_run_protocol( - scen=scen, block=block, timeout=timeout, - runner=runner, - raise_exc=raise_exc, **kwargs - ) + return scen.run(timeout=timeout, **kwargs) def is_client(self): return 'uac' in self.name.lower() @@ -255,8 +260,13 @@ class ScenarioType(object): If called it will invoke the standard run hooks. """ - def __init__(self, agents, defaults, clientdefaults=None, - serverdefaults=None, confpy=None, enable_screen_file=True): + def __init__( + self, agents, defaults, clientdefaults=None, + serverdefaults=None, confpy=None, enable_screen_file=True + ): + # placeholder for process "runner" + self._runner = None + # agents iterable in launch-order self._agents = agents ua_attrs = UserAgent.keys() @@ -432,10 +442,30 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs): return type(self)( self.prepare(agents), self._defaults, confpy=self.mod) - def __call__(self, agents=None, block=True, timeout=180, runner=None, - raise_exc=True, copy_agents=False, **kwargs): - return plugin.mng.hook.pysipp_run_protocol( - scen=self, - block=block, timeout=timeout, runner=runner, - raise_exc=raise_exc, **kwargs + async def arun( + self, + timeout=180, + runner=None, + ): + agents = self.prepare() + runner = runner or launch.TrioRunner() + + return await launch.run_all_agents(runner, agents, timeout=timeout) + + def run( + self, + timeout=180, + **kwargs + ): + """Run scenario blocking to completion.""" + return trio.run( + partial( + self.arun, + timeout=timeout, + **kwargs + ) ) + + def __call__(self, *args, **kwargs): + # TODO: deprecation warning here + return self.run(*args, **kwargs) diff --git a/pysipp/launch.py b/pysipp/launch.py index a750128..63b2d29 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -1,17 +1,18 @@ """ Launchers for invoking SIPp user agents """ -import subprocess -import os import shlex -import select -import threading import signal +import subprocess import time from . import utils from pprint import pformat from collections import OrderedDict, namedtuple +import trio + +from . import report + log = utils.get_logger() Streams = namedtuple("Streams", "stdout stderr") @@ -21,30 +22,29 @@ class TimeoutError(Exception): "SIPp process timeout exception" -class PopenRunner(object): - """Run a sequence of SIPp agents asynchronously. If any process terminates - with a non-zero exit code, immediately kill all remaining processes and - collect std streams. +class SIPpFailure(RuntimeError): + """SIPp commands failed + """ + - Adheres to an interface similar to `multiprocessing.pool.AsyncResult`. +class TrioRunner(object): + """Run a sequence of SIPp cmds asynchronously. If any process terminates + with a non-zero exit code, immediately canacel all remaining processes and + collect std streams. """ def __init__( self, - subprocmod=subprocess, - osmod=os, - poller=select.epoll, ): - # these could optionally be rpyc proxy objs - self.spm = subprocmod - self.osm = osmod - self.poller = poller() - # collector thread placeholder - self._waiter = None # store proc results self._procs = OrderedDict() - def __call__(self, cmds, block=True, rate=300, **kwargs): - if self._waiter and self._waiter.is_alive(): + async def run( + self, + cmds, + rate=300, + **kwargs + ): + if self.is_alive(): raise RuntimeError( "Not all processes from a prior run have completed" ) @@ -52,85 +52,78 @@ def __call__(self, cmds, block=True, rate=300, **kwargs): raise RuntimeError( "Process results have not been cleared from previous run" ) - sp = self.spm - os = self.osm - DEVNULL = open(os.devnull, 'wb') - fds2procs = OrderedDict() - # run agent commands in sequence for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - proc = sp.Popen( + proc = await trio.open_process( shlex.split(cmd), - stdout=DEVNULL, - stderr=sp.PIPE + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE ) - fd = proc.stderr.fileno() - log.debug("registering fd '{}' for pid '{}'".format( - fd, proc.pid)) - fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + self._procs[cmd] = proc + # limit launch rate time.sleep(1. / rate) - # launch waiter - self._waiter = threading.Thread(target=self._wait, args=(fds2procs,)) - self._waiter.daemon = True - self._waiter.start() - - return self.get(**kwargs) if block else self._procs + return self._procs - def _wait(self, fds2procs): - log.debug("started waiter for procs {}".format(fds2procs)) + async def get(self, timeout=180): + '''Block up to `timeout` seconds for all agents to complete. + Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + ''' signalled = None - left = len(fds2procs) - collected = 0 - while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: - collected += 1 - proc = fds2procs[fd] - # attach streams so they can be read more then once - log.debug("collecting streams for {}".format(proc)) - proc.streams = Streams(*proc.communicate()) # timeout=2)) - if proc.returncode != 0 and not signalled: + + # taken mostly verbatim from ``trio.run_process()`` + async def read_output(stream): + chunks = [] + async with stream: + try: + while True: + chunk = await stream.receive_some(32768) + if not chunk: + break + chunks.append(chunk) + except trio.ClosedResourceError: + pass + + return b"".join(chunks) + + async def wait_on_proc(proc): + nonlocal signalled + async with proc as proc: + rc = await proc.wait() + if rc != 0 and not signalled: # stop all other agents if there is a failure signalled = self.stop() - log.debug("terminating waiter thread") + # collect stderr output + proc.stderr_output = await read_output(proc.stderr) + + try: + with trio.fail_after(timeout): + async with trio.open_nursery() as n: + for cmd, proc in self._procs.items(): + # async wait on each process to complete + n.start_soon(wait_on_proc, proc) + + return self._procs + + except trio.TooSlowError: + # kill all SIPp processes + signalled = self.stop() + # all procs were killed by SIGUSR1 + raise TimeoutError( + "pids '{}' failed to complete after '{}' seconds".format( + pformat([p.pid for p in signalled.values()]), timeout) + ) - def get(self, timeout=180): - '''Block up to `timeout` seconds for all agents to complete. - Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + def iterprocs(self): + '''Iterate all processes which are still alive yielding + (cmd, proc) pairs ''' - if self._waiter.is_alive(): - self._waiter.join(timeout=timeout) - - if self._waiter.is_alive(): - # kill them mfin SIPps - signalled = self.stop() - self._waiter.join(timeout=10) - - if self._waiter.is_alive(): - # try to stop a few more times - for _ in range(3): - signalled = self.stop() - self._waiter.join(timeout=1) - - if self._waiter.is_alive(): - # some procs failed to terminate via signalling - raise RuntimeError("Unable to kill all agents!?") - - # all procs were killed by SIGUSR1 - raise TimeoutError( - "pids '{}' failed to complete after '{}' seconds".format( - pformat([p.pid for p in signalled.values()]), timeout) - ) - - return self._procs + return ((cmd, proc) for cmd, proc in self._procs.items() + if proc and proc.poll() is None) def stop(self): '''Stop all agents with SIGUSR1 as per SIPp's signal handling @@ -151,25 +144,44 @@ def _signalall(self, signum): signalled[cmd] = proc return signalled - def iterprocs(self): - '''Iterate all processes which are still alive yielding - (cmd, proc) pairs - ''' - return ((cmd, proc) for cmd, proc in self._procs.items() - if proc and proc.poll() is None) - def is_alive(self): '''Return bool indicating whether some agents are still alive ''' return any(self.iterprocs()) - def ready(self): - '''Return bool indicating whether all agents have completed - ''' - return not self.is_alive() - def clear(self): '''Clear all processes from the last run ''' - assert self.ready(), "Not all processes have completed" + assert not self.is_alive(), "Not all processes have completed" self._procs.clear() + + +async def run_all_agents(runner, agents, timeout=180): + """Run a sequencec of agents using a ``TrioRunner``. + """ + async def finalize(): + # this might raise TimeoutError + cmds2procs = await runner.get(timeout=timeout) + agents2procs = list(zip(agents, cmds2procs.values())) + msg = report.err_summary(agents2procs) + if msg: + # report logs and stderr + await report.emit_logfiles(agents2procs) + raise SIPpFailure(msg) + + return cmds2procs + + try: + await runner.run( + (ua.render() for ua in agents), + timeout=timeout + ) + await finalize() + return runner + except TimeoutError as terr: + # print error logs even when we timeout + try: + await finalize() + except SIPpFailure as err: + assert 'exit code -9' in str(err) + raise terr diff --git a/pysipp/report.py b/pysipp/report.py index 9643892..b4dbb49 100644 --- a/pysipp/report.py +++ b/pysipp/report.py @@ -16,6 +16,7 @@ 99: "Normal exit without calls processed", -1: "Fatal error", -2: "Fatal error binding a socket", + -9: "Signalled to stop with SIGUSR1", -10: "Signalled to stop with SIGUSR1", 254: "Connection Error: socket already in use", 255: "Command or syntax error: check stderr output", @@ -41,7 +42,7 @@ def err_summary(agents2procs): return msg -def emit_logfiles(agents2procs, level='warn', max_lines=100): +async def emit_logfiles(agents2procs, level='warn', max_lines=100): """Log all available SIPp log-file contents """ emit = getattr(log, level) @@ -49,7 +50,7 @@ def emit_logfiles(agents2procs, level='warn', max_lines=100): # print stderr emit("stderr for '{}' @ {}\n{}\n".format( - ua.name, ua.srcaddr, proc.streams.stderr)) + ua.name, ua.srcaddr, proc.stderr_output)) # FIXME: no idea, but some logs are not being printed without this # logging mod bug? time.sleep(0.01) From 90e8f1d809ff4a8ecad4b50316f9a013892cbcf7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:16:23 -0400 Subject: [PATCH 18/25] Adjust tests to match new `trio` apis --- tests/test_agent.py | 13 ++++++++++--- tests/test_launcher.py | 30 +++++++++++++++++++----------- tests/test_scenario.py | 3 +++ tests/test_stack.py | 2 +- 4 files changed, 33 insertions(+), 15 deletions(-) create mode 100644 tests/test_scenario.py diff --git a/tests/test_agent.py b/tests/test_agent.py index aee476f..bda7040 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -118,15 +118,22 @@ def test_server(): (agent.client(destaddr=('99.99.99.99', 5060)), 1, {}, RuntimeError), # test if server times out it is signalled - (agent.server(), 0, {'timeout': 1}, launch.TimeoutError)], + (agent.server(), -9, {'timeout': 1}, launch.TimeoutError)], ids=['ua', 'uac', 'uas'], ) def test_failures(ua, retcode, kwargs, exc): """Test failure cases for all types of agents """ + runner = launch.TrioRunner() + # run it without raising - runner = ua(raise_exc=False, **kwargs) - cmds2procs = runner.get(timeout=0) + if exc: + with pytest.raises(exc): + ua(runner=runner, **kwargs) + + # runner = ua(raise_exc=False, **kwargs) + + cmds2procs = runner._procs assert not runner.is_alive() assert len(list(runner.iterprocs())) == 0 # tests transparency of the defaults config pipeline diff --git a/tests/test_launcher.py b/tests/test_launcher.py index e1d435a..5c17070 100644 --- a/tests/test_launcher.py +++ b/tests/test_launcher.py @@ -1,14 +1,18 @@ ''' Basic agent/scenario launching ''' +import time + +import trio +import pytest + from pysipp.agent import client, server -from pysipp.launch import PopenRunner +from pysipp.launch import TrioRunner, run_all_agents, SIPpFailure -def run_blocking(*agents): - runner = PopenRunner() +def run_blocking(runner, agents): assert not runner.is_alive() - runner(ua.render() for ua in agents) + trio.run(run_all_agents, runner, agents) assert not runner.is_alive() return runner @@ -22,24 +26,28 @@ def test_agent_fails(): uac.recv_timeout = 1 # avoids SIPp issue #176 uac.call_count = 1 # avoids SIPp issue #176 - runner = run_blocking(uas, uac) + runner = TrioRunner() + with pytest.raises(SIPpFailure): + run_blocking(runner, (uas, uac)) # fails due to invalid ip - uasproc = runner.get(timeout=0)[uas.render()] - assert uasproc.streams.stderr + uasproc = runner._procs[uas.render()] + print(uasproc.stderr_output) + assert uasproc.stderr_output assert uasproc.returncode == 255, uasproc.streams.stderr # killed by signal - uacproc = runner.get(timeout=0)[uac.render()] - # assert not uacproc.streams.stderr # sometimes this has a log msg? + uacproc = runner._procs[uac.render()] + # assert not uacproc.stderr_output # sometimes this has a log msg? ret = uacproc.returncode # killed by SIGUSR1 or terminates before it starts (racy) assert ret == -10 or ret == 0 def test_default_scen(default_agents): - runner = run_blocking(*default_agents) + runner = TrioRunner() + runner = run_blocking(runner, default_agents) # both agents should be successful - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert not proc.returncode diff --git a/tests/test_scenario.py b/tests/test_scenario.py new file mode 100644 index 0000000..60c7d65 --- /dev/null +++ b/tests/test_scenario.py @@ -0,0 +1,3 @@ +''' +pysipp.agent module tests +''' diff --git a/tests/test_stack.py b/tests/test_stack.py index 3aa9b82..2b8b8f6 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -76,7 +76,7 @@ def test_sync_run(scenwalk): """ for path, scen in scenwalk(): runner = scen(timeout=6) - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert proc.returncode == 0 From 6d31c3262d47a4e4f270b46bb70a96015d47f2c9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:20:18 -0400 Subject: [PATCH 19/25] Drop py2.7 from CI --- .travis.yml | 1 - tox.ini | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1acac4a..8454dd0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,6 @@ cache: - pip python: - - 2.7 - 3.5 - 3.6 # - 3.7 diff --git a/tox.ini b/tox.ini index cd9502a..5397180 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27, py35, py36, pypy +envlist = py36, py37 [testenv] deps = From dbc827478f9abd51555079a01aaabe6ff3c3a836 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jul 2019 22:19:58 -0400 Subject: [PATCH 20/25] Prepare setup script for 1.0 release --- setup.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index ced8544..9d26679 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version='0.1.0', + version='1.0.0.dev', description="A SIPp scenario launcher", long_description=readme, long_description_content_type="text/markdown", @@ -36,7 +36,10 @@ url='https://github.com/SIPp/pysipp', platforms=['linux'], packages=['pysipp', 'pysipp.cli'], - install_requires=['pluggy>=0.11.0'], + install_requires=[ + 'pluggy >= 0.11.0', + 'trio>=0.11.0' + ], tests_require=['pytest'], entry_points={ 'console_scripts': [ @@ -48,7 +51,7 @@ 'Intended Audience :: Developers', 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.6', 'Topic :: Software Development', 'Topic :: Software Development :: Testing', 'Topic :: Software Development :: Quality Assurance', From 6394960068455f9d040a31691c568015b2911910 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 12 Jul 2019 17:35:20 -0400 Subject: [PATCH 21/25] trio.open_process() isn't released yet --- pysipp/__init__.py | 1 + pysipp/launch.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pysipp/__init__.py b/pysipp/__init__.py index fc703b8..d88bdfa 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -101,6 +101,7 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( agents=[uas, uac], confpy=None, + scenkwargs=scenkwargs, ) if proxyaddr is not None: diff --git a/pysipp/launch.py b/pysipp/launch.py index 63b2d29..f859586 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -56,7 +56,8 @@ async def run( for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - proc = await trio.open_process( + # proc = await trio.open_process( + proc = trio.Process( shlex.split(cmd), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE From 77cf11aa7ec1bd7e1b8bbfd55ba909f6e0692298 Mon Sep 17 00:00:00 2001 From: Kon Tsaki Date: Sat, 26 Dec 2020 10:25:34 +0100 Subject: [PATCH 22/25] Fix running agents with trio --- .gitignore | 3 +++ pysipp/agent.py | 23 +++++++++++++++++++---- pysipp/launch.py | 45 ++++++++++++++++++++++----------------------- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index ba74660..86c28a4 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,9 @@ nosetests.xml coverage.xml *,cover +# virtual environment +venv + # Translations *.mo *.pot diff --git a/pysipp/agent.py b/pysipp/agent.py index ceb211f..05e6c67 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -18,6 +18,7 @@ SocketAddr = namedtuple('SocketAddr', 'ip port') +DEFAULT_RUNNER_TIMEOUT = 180 def tuple_property(attrs): def getter(self): @@ -443,13 +444,27 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs): async def arun( self, - timeout=180, + timeout=DEFAULT_RUNNER_TIMEOUT, runner=None, + block=True, ): - agents = self.prepare() - runner = runner or launch.TrioRunner() + self._prepared_agents = agents = self.prepare() + self._runner = runner = runner or launch.TrioRunner() + + return await launch.run_all_agents(runner, agents, timeout=timeout, block=block) - return await launch.run_all_agents(runner, agents, timeout=timeout) + def finalize(self, *, timeout=DEFAULT_RUNNER_TIMEOUT): + assert ( + self._prepared_agents and self._runner + ), "Must run scenario before finalizing." + return trio.run( + partial( + launch.finalize, + self._runner, + self._prepared_agents, + timeout=timeout, + ) + ) def run( self, diff --git a/pysipp/launch.py b/pysipp/launch.py index f859586..0a922dd 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -56,8 +56,7 @@ async def run( for cmd in cmds: log.debug( "launching cmd:\n\"{}\"\n".format(cmd)) - # proc = await trio.open_process( - proc = trio.Process( + proc = await trio.open_process( shlex.split(cmd), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE @@ -157,32 +156,32 @@ def clear(self): self._procs.clear() -async def run_all_agents(runner, agents, timeout=180): - """Run a sequencec of agents using a ``TrioRunner``. - """ - async def finalize(): - # this might raise TimeoutError - cmds2procs = await runner.get(timeout=timeout) - agents2procs = list(zip(agents, cmds2procs.values())) - msg = report.err_summary(agents2procs) - if msg: - # report logs and stderr - await report.emit_logfiles(agents2procs) - raise SIPpFailure(msg) - - return cmds2procs +async def run_all_agents(runner, agents, timeout=180, block=True): + """Run a sequencec of agents using a ``TrioRunner``.""" try: - await runner.run( - (ua.render() for ua in agents), - timeout=timeout - ) - await finalize() + await runner.run((ua.render() for ua in agents), timeout=timeout) + if block: + await finalize(runner, agents, timeout) return runner except TimeoutError as terr: # print error logs even when we timeout try: - await finalize() + await finalize(runner, agents, timeout) except SIPpFailure as err: - assert 'exit code -9' in str(err) + assert "exit code -9" in str(err) raise terr + + +async def finalize(runner, agents, timeout): + """Block up to `timeout` seconds for all agents to complete.""" + # this might raise TimeoutError + cmds2procs = await runner.get(timeout=timeout) + agents2procs = list(zip(agents, cmds2procs.values())) + msg = report.err_summary(agents2procs) + if msg: + # report logs and stderr + await report.emit_logfiles(agents2procs) + raise SIPpFailure(msg) + + return cmds2procs From 2d30b87698c7e26cbaed206c35486597fc2e95ec Mon Sep 17 00:00:00 2001 From: Kon Tsaki Date: Sat, 26 Dec 2020 18:17:09 +0100 Subject: [PATCH 23/25] Fix ScenarioType uninitialized attribute --- pysipp/agent.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pysipp/agent.py b/pysipp/agent.py index 05e6c67..de13a76 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -269,6 +269,7 @@ def __init__( # agents iterable in launch-order self._agents = agents + self._prepared_agents = None ua_attrs = UserAgent.keys() # default settings From 403c46e3cde4e6b969a123bc1a747e792bd229c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 12 Feb 2021 10:39:21 -0500 Subject: [PATCH 24/25] Be explicit on dev version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 9d26679..a9590a7 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version='1.0.0.dev', + version='1.0.0.dev0', description="A SIPp scenario launcher", long_description=readme, long_description_content_type="text/markdown", From ab27aa5fd9eaeba4775b42ea354b4cb66de517cd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 13 Feb 2021 11:56:02 -0500 Subject: [PATCH 25/25] Update readme to reflect python version requirement --- README.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index e9dc7ad..8b6530b 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,12 @@ but (want to) use it for automated testing because it gets the job done... ## What is it? -Python configuring and launching the infamous -[SIPp](http://sipp.sourceforge.net/) using an api inspired by -[requests](http://docs.python-requests.org/) +Python 3.6+ configuring and launching the infamous +[SIPp](http://sipp.sourceforge.net/) using a simple API to +generate commands and spawn them in subprocesses. + +Command subprocess launching now uses +[`trio`](https://trio.readthedocs.io/en/stable/reference-io.html#spawning-subprocesses)! ## It definitely lets you @@ -25,7 +28,7 @@ Python configuring and launching the infamous ## Basic Usage -Launching the default UAC scenario is a short line: +Launching the default UAC script is a short line: ```python import pysipp