diff --git a/dev-requirements.txt b/dev-requirements.txt index 7534146..4b14ec4 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -5,6 +5,9 @@ tox>=2.3.1 twine==1.8.1 +# for test and development +pytest-pycharm>=0.4.0 + #Used by setup.py rosdevelop gitpython>=2.1.0 pyyaml>=3.12 diff --git a/pyzmp/__init__.py b/pyzmp/__init__.py index 69e0690..1548034 100644 --- a/pyzmp/__init__.py +++ b/pyzmp/__init__.py @@ -24,14 +24,11 @@ from .exceptions import UnknownServiceException, UnknownRequestTypeException, UnknownResponseTypeException from .master import manager +from .process import Process from .node import Node, current_node from .service import Service, services, discover, ServiceCallTimeout -topics = manager.list() -params = manager.list() - - from .helpers import process_watcher from .helpers import process_watcher_cm diff --git a/pyzmp/node.py b/pyzmp/node.py index cea310d..0bd78c2 100644 --- a/pyzmp/node.py +++ b/pyzmp/node.py @@ -13,6 +13,8 @@ import uuid import errno + +import re import zmq import socket import logging @@ -100,181 +102,101 @@ from .master import manager from .exceptions import UnknownServiceException, UnknownRequestTypeException +from .registry import FileBasedRegistry from .message import ServiceRequest, ServiceRequest_dictparse, ServiceResponse, ServiceException -from .service import service_provider_cm -#from .service import RequestMsg, ResponseMsg, ErrorMsg # only to access message types +from .service import service_provider_cm, Service +from .process import Process +# from .service import RequestMsg, ResponseMsg, ErrorMsg # only to access message types current_node = multiprocessing.current_process -# Lock is definitely needed ( not implemented in proxy objects, unless the object itself already has it, like Queue ) -nodes_lock = manager.Lock() -nodes = manager.dict() -@contextlib.contextmanager -def dummy_cm(): - yield None +### +#This implements a local registry, relying on the local tmp file system. +### + +def _get_registry_filepath(): + """ + A deterministic way to find the path to a registry, so it can be used by any process. + :return: + """ + _zmp_froot = os.path.join(tempfile.gettempdir(), 'zmp') + return _zmp_froot + + +def _get_node_zmp_filepath(name): + # trying to follow the de-facto standard way to register daemon process info, + # while adding an extra information : the socket opened. + fname = os.path.join(_get_registry_filepath(), name + ".zmp") + return fname + + +def register_node(name, pid, zmp_addr): + zmpfname = _get_node_zmp_filepath(name) + with open(zmpfname, "xt") as fh: + fh.write(zmp_addr) + + +def unregister_node(name): + zmpfname = _get_node_zmp_filepath(name) + os.remove(zmpfname) -@contextlib.contextmanager -def node_cm(node_name, svc_address): - # advertise itself - nodes_lock.acquire() - nodes[node_name] = {'service_conn': svc_address} - nodes_lock.release() +def get_node_zmp(name): + fname = _get_node_zmp_filepath(name) + with open(fname, "rt") as fh: + zmp_addr = fh.read() + return zmp_addr - yield - # concealing itself - nodes_lock.acquire() - nodes[node_name] = {} - nodes_lock.release() + + + + + +# REF : http://stackoverflow.com/questions/3024925/python-create-a-with-block-on-several-context-managers # TODO : Nodelet ( thread, with fast intraprocess zmq comm - entity system design /vs/threadpool ?) + # CAREFUL here : multiprocessing documentation specifies that a process object can be started only once... -class Node(object): +class Node(Process): EndPoint = namedtuple("EndPoint", "self func") # TODO : allow just passing target to be able to make a Node from a simple function, and also via decorator... - def __init__(self, name=None, socket_bind=None, context_manager=None, target=None, args=None, kwargs=None): + def __init__(self, name=None, socket_bind=None, target_context=None, target=None, args=None, kwargs=None): """ Initializes a ZMP Node (Restartable Python Process communicating via ZMQ) :param name: Name of the node :param socket_bind: the string describing how to bind the ZMQ socket ( IPC, TCP, etc. ) - :param context_manager: a context_manager to be used with run (in a with statement) + :param target_context: a context_manager to be used during "run()". + This is used to ensure the target is called with the appropriate context + :param target: the function to call in child process. It will be called while ti returns None. + When an int is returned the loop will stop. + That loop can also be stopped by setting the terminate event (see Process). + :param args: the arguments to pass to the target + :param kwargs: the keywords arguments to pass to the target :return: """ - # TODO check name unicity - # using process as delegate - self._pargs = { - 'name': name or str(uuid.uuid4()), - 'args': args or (), - 'kwargs': kwargs or {}, - 'target': self.run, # Careful : our run() is not the same as the one for Process - } - # Careful : our own target is not the same as the one for Process - self._target = target or self.update # we expect the user to overload update in child class. - - #: the actual process instance. lazy creation on start() call only. - self._process = None - - #: whether or not the node name should be set as the actual process title - #: replacing the string duplicated from the python interpreter run - self.new_title = True - - self.context_manager = context_manager or dummy_cm # TODO: extend to list if possible ( available for python >3.1 only ) - self.exit = multiprocessing.Event() - self.started = multiprocessing.Event() + + self._target = target or self.target # User overload can choose to call Process.target or not + target_wrap = self._update + + # careful we need to swap context managers to keep the order of calling as expected + self.user_required_context = target_context or self.target_context # getting basic target context from Process. User overload can choose to call it or not. + # we only register the node context in for the process instance + super(Node, self).__init__(name=name, target_context=self._node_context, target_override=target_wrap, args=args, kwargs=kwargs) + self.listeners = {} self._providers = {} + # tmpdir for now. if moved to lowlevel system stuff -> /var/run would be more appropriate self.tmpdir = tempfile.mkdtemp(prefix='zmp-' + self.name + '-') # if no socket is specified the services of this node will be available only through IPC self._svc_address = socket_bind if socket_bind else 'ipc://' + self.tmpdir + '/services.pipe' - def __enter__(self): - # __enter__ is called only if we pass this instance to with statement ( after __init__ ) - # start only if needed (so that we can hook up a context manager to a running node) : - if not self.is_alive(): - self.start() - return self - - def __exit__(self, exception_type, exception_value, traceback): - # make sure we cleanup when we exit - self.shutdown() - - def has_started(self): - """ - :return: True if the node has started (update() might not have been called yet). Might still be alive, or not... - """ - return self.started.is_set() - - ### Process API delegation ### - def is_alive(self): - if self and self._process: - return self._process.is_alive() - - def join(self, timeout=None): - if not self._process: - # blocking on started event before blocking on join - self.started.wait(timeout=timeout) - return self._process.join(timeout=timeout) - - @property - def name(self): - if self and self._process: - return self._process.name - else: - return self._pargs.get('name', "ZMPNode") - - @name.setter - def name(self, name): - if self and self._process: - self._process.name = name - # only reset the name arg if it was accepted by the setter - self._pargs.set('name', self._process.name) - else: - # TODO : maybe we should be a bit more strict here ? - self._pargs.set('name', name) - - @property - def daemon(self): - """ - Return whether process is a daemon - :return: - """ - if self._process: - return self._process.daemon - else: - return self._pargs.get('daemonic', False) - - @daemon.setter - def daemon(self, daemonic): - """ - Set whether process is a daemon - :param daemonic: - :return: - """ - if self._process: - self._process.daemonic = daemonic - else: - self._pargs['daemonic'] = daemonic - - @property - def authkey(self): - return self._process.authkey - - @authkey.setter - def authkey(self, authkey): - """ - Set authorization key of process - """ - self._process.authkey = authkey - - @property - def exitcode(self): - """ - Return exit code of process or `None` if it has yet to stop - """ - if self._process: - return self._process.exitcode - else: - return None - - @property - def ident(self): - """ - Return identifier (PID) of process or `None` if it has yet to start - """ - if self._process: - return self._process.ident - else: - return None - - def __repr__(self): - # TODO : improve this - return self._process.__repr__() + self.provides(self.index) def start(self, timeout=None): """ @@ -283,48 +205,21 @@ def start(self, timeout=None): None waits until the update has been called at least once. """ - # we lazily create our process delegate (with same arguments) - if self.daemon: - daemonic = True - else: - daemonic = False - - pargs = self._pargs.copy() - pargs.pop('daemonic', None) - - self._process = multiprocessing.Process(**pargs) - - self._process.daemon = daemonic - - if self.is_alive(): - # if already started, we shutdown and join before restarting - # not timeout will bock here (default join behavior). - # otherwise we simply use the same timeout. - self.shutdown(join=True, timeout=timeout) # TODO : only restart if no error (check exitcode) - self.start(timeout=timeout) # recursive to try again if needed - else: - self._process.start() + started = super(Node, self).start(timeout=timeout) # timeout None means we want to wait and ensure it has started # deterministic behavior, like is_alive() from multiprocess.Process is always true after start() - if self.started.wait(timeout=timeout): # blocks until we know true or false + if started: return self._svc_address # returning the zmp url as a way to connect to the node # CAREFUL : doesnt make sense if this node only run a one-time task... # TODO: futures and ThreadPoolExecutor (so we dont need to manage the pool ourselves) else: return False + # TODO : Implement a way to redirect stdout/stderr, or even forward to parent ? # cf http://ryanjoneil.github.io/posts/2014-02-14-capturing-stdout-in-a-python-child-process.html - def terminate(self): - """Forcefully terminates the underlying process (using SIGTERM)""" - return self._process.terminate() - # TODO : maybe redirect to shutdown here to avoid child process leaks ? - - ### Node specific API ### - # TODO : find a way to separate process management and service provider API - def provides(self, svc_callback, service_name=None): service_name = service_name or svc_callback.__name__ # we store an endpoint ( bound method or unbound function ) @@ -338,62 +233,31 @@ def withholds(self, service_name): # we store an endpoint ( bound method or unbound function ) self._providers.pop(service_name) - # TODO : shortcut to discover/build only services provided by this node ? + def index(self): + # TODO : return Services instance directly + return self._providers.keys() - # Careful : this is NOT the same usage as "run()" from Process : - # it is called inside a loop that it does not directly control... - # TOOD : think about it and improve (Entity System integration ? Pool + Futures integration ?) - def update(self, *args, **kwargs): - """ - Runs at every update cycle in the node process/thread. - Usually you want to override this method to extend the behavior of the node in your implementation - :return: integer as exitcode to stop the node, or None to keep looping... - """ - # TODO : Check which way is better (can also be used to run external process, other functions, like Process) - return None # we keep looping by default (need to deal with services here...) - - def shutdown(self, join=True, timeout=None): - """ - Clean shutdown of the node. - :param join: optionally wait for the process to end (default : True) - :return: None - """ - if self.is_alive(): # check if process started - print("Shutdown initiated") - self.exit.set() - if join: - self.join(timeout=timeout) - # TODO : timeout before forcing terminate (SIGTERM) - - exitcode = self._process.exitcode if self._process else None # we return None if the process was never started - return exitcode - - def run(self, *args, **kwargs): - """ - The Node main method, running in a child process (similar to Process.run() but also accepts args) - A children class can override this method, but it needs to call super().run(*args, **kwargs) - for the node to start properly and call update() as expected. - :param args: arguments to pass to update() - :param kwargs: keyword arguments to pass to update() - :return: last exitcode returned by update() - """ - # TODO : make use of the arguments ? since run is now the target for Process... - - exitstatus = None # keeping the semantic of multiprocessing.Process : running process has None - - if setproctitle and self.new_title: - setproctitle.setproctitle("{0}".format(self.name)) - - print('[{node}] Node started as [{pid} <= {address}]'.format(node=self.name, pid=self.ident, address=self._svc_address)) + @contextlib.contextmanager + def _node_context(self): + # declaring our services first + with service_provider_cm(self.name, self._svc_address, self._providers) as spcm: + # advertise itself + nodes[self.name] = self._svc_address + # Do not yield until we are register (otherwise noone can find us, there is no point.) + yield + # concealing itself + nodes.pop(self.name) + @contextlib.contextmanager + def _zmq_poller_context(self, ): zcontext = zmq.Context() # check creating context in init ( compatibility with multiple processes ) # Apparently not needed ? Ref : https://github.com/zeromq/pyzmq/issues/770 zcontext.setsockopt(socket.SO_REUSEADDR, 1) # required to make restart easy and avoid debugging traps... svc_socket = zcontext.socket(zmq.REP) # Ref : http://api.zeromq.org/2-1:zmq-socket # TODO : ROUTER instead ? try: # attempting binding socket - svc_socket.bind(self._svc_address,) + svc_socket.bind(self._svc_address, ) except zmq.ZMQError as ze: if ze.errno == errno.ENOENT: # No such file or directory # TODO : handle all possible cases @@ -414,104 +278,160 @@ def run(self, *args, **kwargs): except Exception as e: raise - poller = zmq.Poller() poller.register(svc_socket, zmq.POLLIN) - # Initializing all context managers - with service_provider_cm( - self.name, self._svc_address, self._providers - ), node_cm(self.name, self._svc_address), self.context_manager() as cm: - - # Starting the clock - start = time.time() - - first_loop = True - # loop listening to connection - while not self.exit.is_set(): - - # signalling startup only the first time, just after having check for exit request. - # We need to guarantee at least ONE call to update. - if first_loop: - self.started.set() - - # blocking. messages are received ASAP. timeout only determine update/shutdown speed. - socks = dict(poller.poll(timeout=100)) - if svc_socket in socks and socks[svc_socket] == zmq.POLLIN: - req = None - try: - req_unparsed = svc_socket.recv() - req = ServiceRequest_dictparse(req_unparsed) - if isinstance(req, ServiceRequest): - if req.service and req.service in self._providers.keys(): - - request_args = pickle.loads(req.args) if req.args else () - # add 'self' if providers[req.service] is a bound method. - if self._providers[req.service].self: - request_args = (self, ) + request_args - request_kwargs = pickle.loads(req.kwargs) if req.kwargs else {} - - resp = self._providers[req.service].func(*request_args, **request_kwargs) - svc_socket.send(ServiceResponse( - service=req.service, - response=pickle.dumps(resp), - ).serialize()) - - else: - raise UnknownServiceException("Unknown Service {0}".format(req.service)) - else: # should not happen : dictparse would fail before reaching here... - raise UnknownRequestTypeException("Unknown Request Type {0}".format(type(req.request))) - except Exception: # we transmit back all errors, and keep spinning... - exctype, excvalue, tb = sys.exc_info() - # trying to make a pickleable traceback - try: - ftb = Traceback(tb) - except TypeError as exc: - ftb = "Traceback manipulation error: {exc}. Verify that python-tblib is installed.".format(exc=exc) - - # sending back that exception with traceback + yield {'poller': poller, 'socket': svc_socket} + + zcontext.term() + + # Careful : this is NOT the same usage as "run()" from Process : + # it is called inside a loop that it does not directly control... + # TOOD : think about it and improve (Entity System integration ? Pool + Futures integration ?) + def _update(self, poller, svc_socket, **kwargs): + """ + Runs at every update cycle in the node process/thread. + ######## Usually you want to override this method to extend the behavior of the node in your implementation #### still true ??? + :return: integer as exitcode to stop the node, or None to keep looping... + """ + + # blocking. messages are received ASAP. timeout only determine update/shutdown speed. + socks = dict(poller.poll(timeout=100)) + if svc_socket in socks and socks[svc_socket] == zmq.POLLIN: + req = None + try: + req_unparsed = svc_socket.recv() + req = ServiceRequest_dictparse(req_unparsed) + if isinstance(req, ServiceRequest): + if req.service and req.service in self._providers.keys(): + + request_args = pickle.loads(req.args) if req.args else () + # add 'self' if providers[req.service] is a bound method. + if self._providers[req.service].self: + request_args = (self,) + request_args + request_kwargs = pickle.loads(req.kwargs) if req.kwargs else {} + + resp = self._providers[req.service].func(*request_args, **request_kwargs) svc_socket.send(ServiceResponse( service=req.service, - exception=ServiceException( - exc_type=pickle.dumps(exctype), - exc_value=pickle.dumps(excvalue), - traceback=pickle.dumps(ftb), - ) + response=pickle.dumps(resp), ).serialize()) - # time is ticking - # TODO : move this out of here. this class should require only generic interface to update method. - now = time.time() - timedelta = now - start - start = now + else: + raise UnknownServiceException("Unknown Service {0}".format(req.service)) + else: # should not happen : dictparse would fail before reaching here... + raise UnknownRequestTypeException("Unknown Request Type {0}".format(type(req.request))) + except Exception: # we transmit back all errors, and keep spinning... + exctype, excvalue, tb = sys.exc_info() + # trying to make a pickleable traceback + try: + ftb = Traceback(tb) + except TypeError as exc: + ftb = "Traceback manipulation error: {exc}. Verify that python-tblib is installed.".format(exc=exc) + + # sending back that exception with traceback + svc_socket.send(ServiceResponse( + service=req.service, + exception=ServiceException( + exc_type=pickle.dumps(exctype), + exc_value=pickle.dumps(excvalue), + traceback=pickle.dumps(ftb), + ) + ).serialize()) + + return None # we keep looping by default (need to deal with services here...) + + class Control(object): + """ + Node Client is an object to gather stateful services for which the actual node (real world context of service) called matters + Note this usually leads to a brittle distributed design, and stateless services should be preferred. + """ + + def __init__(self, node_name, svc_address): + self.node_name = node_name + + # we assume all nodes have an "index" service. + self.index_svc = Service(name='index', providers=[(node_name, svc_address)]) + + # we call it + svc_list = self.index_svc.call() + + # and dynamically setup proxy calls for services RPC style + for s in svc_list: + if not hasattr(self, s): # only if we do not have a similar attribute yet + svc = Service(name=s, providers=[(node_name, svc_address)]) + svc_method = svc.call + svc_method.__doc__ = "Remote call for {s}".format(**locals()) + svc_method.__name__ = s + setattr(self, svc_method.__name__, svc_method) + + # TODO : NodeObserver : inverted control flow (to get stream data callback), but in a nice way ? + # something symmetrical to function call.... + + @staticmethod + def discover(name_regex='.*', timeout=None): + """ + IMPORTANT : This method is not meant to be used by final clients, + as it is easy to misuse and tends to produce brittle distributed software. + Ideally, the nodes should not matter for the user (client of the zmp multiprocess system). + However it is provided here because it can be useful to call stateful remote procedures. + + Discovers all nodes. + Note : we do not want to make the discovery block undefinitely since we never know for sure if a node is running or not + TODO : improve with future... + :param name_regex: regex to filter hte nodes by name/uuid + :param timeout: maximum number of seconds the discover can wait for a discovery matching requirements. if None, doesn't wait. + """ + start = time.time() + endtime = timeout if timeout else 0 + + reg = re.compile(name_regex) + + while True: + timed_out = time.time() - start > endtime + res = nodes.get_all() + if res: + return { + res.get('name'): Node.Control(n.get('name'), n.get('address')) + for n in res if reg.match(n.get('name')) + # filtering by regex here TODO : move that feature to the Registry + } # return right away if we have something + + if timed_out: + break + # else we keep looping after a short sleep ( to allow time to refresh services list ) + time.sleep(0.2) # sleep + return None - # replacing the original Process.run() call, passing arguments to our target - if self._target: - # bwcompat - kwargs['timedelta'] = timedelta + def run(self, *args, **kwargs): + """ + The Node main method, running in a child process (similar to Process.run() but also accepts args) + A children class can override this method, but it needs to call super().run(*args, **kwargs) + for the node to start properly and call update() as expected. + :param args: arguments to pass to update() + :param kwargs: keyword arguments to pass to update() + :return: last exitcode returned by update() + """ + # TODO : make use of the arguments ? since run is now the target for Process... - # TODO : use return code to determine when/how we need to run this the next time... - # Also we need to keep the exit status to be able to call external process as an update... + print('[{node}] Node available at [{address}]'.format(node=self.name, address=self._svc_address)) - logging.debug("[{self.name}] calling {self._target.__name__} with args {args} and kwargs {kwargs}...".format(**locals())) - exitstatus = self._target(*args, **kwargs) + # Initializing all context managers + with self.user_required_context() as cm: # user context first - if first_loop: - first_loop = False + # setting up our event poller + with self._zmq_poller_context() as zcm: - if exitstatus is not None: - break + # This will start looping and calling our target... + exitstatus = super(Node, self).run(poller=zcm.get('poller'), svc_socket=zcm.get('socket')) - if self.started.is_set() and exitstatus is None and self.exit.is_set(): - # in the not so special case where we started, we didnt get exit code and we exited, - # this is expected as a normal result and we set an exitcode here of 0 - # As 0 is the conventional success for unix process successful run - exitstatus = 0 + # all context managers are destroyed properly here logging.debug("[{self.name}] Node stopped.".format(**locals())) return exitstatus # returning last exit status from the update function - # all context managers are destroyed properly here + + diff --git a/pyzmp/process.py b/pyzmp/process.py new file mode 100644 index 0000000..15e7e3c --- /dev/null +++ b/pyzmp/process.py @@ -0,0 +1,463 @@ +# -*- coding: utf-8 -*- +# This python package is implementing a very simple multiprocess framework +# The point of it is to be able to fully tests the multiprocess behavior, +# in pure python, without having to run a ROS system. +from __future__ import absolute_import +from __future__ import print_function + +import os +import sys +import tempfile +if os.name == 'posix' and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess + +import multiprocessing, multiprocessing.reduction # TODO we should probably use subprocess + psutil instead... +import psutil +import types +import uuid + +import errno + +import re +import zmq +import socket +import logging +import pickle +import contextlib +#import dill as pickle + +# allowing pickling of exceptions to transfer it +from collections import namedtuple + +from .registry import FileBasedRegistry + +import time + +try: + from tblib.decorators import Traceback + # TODO : potential candidates for pickle + tblib replacement for easier serialization + # TODO : - https://github.com/uqfoundation/dill + # TODO : - OR https://github.com/cloudpipe/cloudpickle + # TODO : - OR https://github.com/irmen/Serpent ? + # TODO : - OR https://github.com/esnme/ultrajson ? + # TODO : - OR something else ? +except ImportError: + Traceback = None + +try: + import setproctitle +except ImportError: + setproctitle = None + + +# TODO : Nodelet ( thread, with fast intraprocess zmq comm - entity system design /vs/threadpool ?) + +pid_registry = FileBasedRegistry("pid") + + +def on_terminate(proc): + print("process {} terminated with exit code {}".format(proc, proc.returncode)) + + +class ProcessObserver(object): + """A ProcessObserver can observe any local running process (even if we did not launch it and are not the parent)""" + + # local storage of all our child process which we are responsible for + _watched_pids = {} + + @staticmethod + def monitor_all(): # TODO : maybe one per processobserver instance is easier ? + """ function to monitor the registry entry. Needs to be called by the update method of the parent process""" + # NEED this for a delayed cleanup in case of process termination/crash + gone_pids = [p for p in ProcessObserver._watched_pids if not psutil.pid_exists(p)] + for p in gone_pids: + pid_registry.pop(p) + + def __init__(self, pid=None, infanticist=False): + """ + Creates a ProcessObserver for the process matching the pid (or hte current process if pid is None). + If infanticist is set to true, the current process will attempt to kill this pid (his child) when dying. + :param pid: + :param infanticist: + """ + self.infanticist = infanticist + self._process = psutil.Process(pid) + self._watched_pids[pid] = self + + def monitor(self): + """ + Function to monitor the registry entry for this process. + This needs to be called by the update method of the parent process + """ + # need this for a delayed cleanup in case of process termination/crash + if not psutil.pid_exists(self._process.pid): + pid_registry.pop(self._process.pid) + + def __del__(self): + if self.infanticist: + self._process.terminate() + gone, still_alive = psutil.wait_procs(self._process, timeout=3, callback=on_terminate) + for p in still_alive: + p.kill() + + +def discover_process(name_regex='.*', timeout=None): + """ + Discovers all processes. + Note : we do not want to make the discovery block undefinitely since we never know for sure if a process is running or not + TODO : improve with future... + :param name_regex: regex to filter the nodes by name/uuid + :param timeout: maximum number of seconds the discover can wait for a discovery matching requirements. if None, doesn't wait. + """ + start = time.time() + endtime = timeout if timeout else 0 + + reg = re.compile(name_regex) + + while True: + timed_out = time.time() - start > endtime + dp = { + p: ProcessObserver(pid_registry[p]) + for p in pid_registry if reg.match(p) + # filtering by regex here TODO : move that feature to the Registry + } # return right away if we have something + + if dp: + return dp + elif timed_out: + break + # else we keep looping after a short sleep ( to allow time to refresh services list ) + time.sleep(0.2) # sleep + return None + + +class Process(object): + """ + Process class that model how a process is started and stopped, can start / stop child processes, + all in a synchronous deterministic manner. + It mainly add synchronization primitives to multiprocessing.Process. + """ + + class Observer(object): + """ + ProcessObserver that provide a observe interface to an already running process. + """ + + def __init__(self, pid=None): + self.started = multiprocessing.Event() + self._osproc = psutil.Process(pid) + + # TODO : inverted control flow, but in a nice way ??? + def wait_for_start(self, timeout): + return self.started.wait(timeout=timeout) + + def has_started(self): + """ + :return: True if the node has started (update() might not have been called yet). Might still be alive, or not... + """ + return self.started.is_set() + + # TODO : we need to monitor a process and cleanup pid files if needed... + class Control(Observer): + # inheritance since there is no point to try to control without feedback, + # and users usually expect both in same place... + """ + ProcessControl that provide a control interface to an already running process. + """ + + def __init__(self, pid=None): + self.exit = multiprocessing.Event() + super(Process.Control, self).__init__(pid=pid) + + def set_exit_flag(self): + """Request a process termination""" + return self.exit.set() + + def monitor_registry_entry(self): + """ function to monitor the registry entry. Needs to be called by the update method of the parent process""" + # need this for a delayed cleanup in case of process termination/crash + if psutil.pid_exists(self._osproc.pid): + pid_registry.pop(self._osproc.pid) + + # TODO : we can extend this later (see psutil) for debugging and more... + + def __init__(self, name=None, target_context=None, target_override=None, args=None, kwargs=None): + """ + Initializes a ZMP Node (Restartable Python Process communicating via ZMQ) + :param name: Name of the node + :param target_context: a context_manager to be used with run (in a with statement) + :param target_override: a function to override this class target method + :return: + """ + # TODO check name unicity + # using process as delegate + self._pargs = { + 'name': name or str(uuid.uuid4()), + 'args': args or (), + 'kwargs': kwargs or {}, + 'target': self.run, # Careful : our run() is not the same as the one for Process + } + # TODO : we should ensure our args + kwargs are compatible with our target (to avoid later errors) + # Careful : our own target is not the same as the one for Process + self._target = target_override or self.target + self.target_call_start = None + self.target_call_timedelta = None + + #: the actual process instance. lazy creation on start() call only. + self._process = None + self._control = Process.Control() + + #: whether or not the node name should be set as the actual process title + #: replacing the string duplicated from the python interpreter run + self.new_title = True + + self._target_context = target_context or self.target_context # TODO: extend to list if possible ( available for python >3.1 only ) + super(Process, self).__init__() + + def __enter__(self): + # __enter__ is called only if we pass this instance to with statement ( after __init__ ) + # start only if needed (so that we can hook up a context manager to a running node) : + if not self.is_alive(): + self.start() + return self + + def __exit__(self, exception_type, exception_value, traceback): + # make sure we cleanup when we exit + self.shutdown() + + def is_alive(self): + if self and self._process: + return self._process.is_alive() + + def join(self, timeout=None): + if not self._process: + # blocking on started event before blocking on join + self._control.started.wait(timeout=timeout) + return self._process.join(timeout=timeout) + + @property + def name(self): + if self and self._process: + return self._process.name + else: + return self._pargs.get('name', "ZMPProcess") + + @name.setter + def name(self, name): + if self and self._process: + self._process.name = name + # only reset the name arg if it was accepted by the setter + self._pargs.set('name', self._process.name) + else: + # TODO : maybe we should be a bit more strict here ? + self._pargs.set('name', name) + + @property + def daemon(self): + """ + Return whether process is a daemon + :return: + """ + if self._process: + return self._process.daemon + else: + return self._pargs.get('daemonic', False) + + @daemon.setter + def daemon(self, daemonic): + """ + Set whether process is a daemon + :param daemonic: + :return: + """ + if self._process: + self._process.daemonic = daemonic + else: + self._pargs['daemonic'] = daemonic + + @property + def authkey(self): + return self._process.authkey + + @authkey.setter + def authkey(self, authkey): + """ + Set authorization key of process + """ + self._process.authkey = authkey + + @property + def exitcode(self): + """ + Return exit code of process or `None` if it has yet to stop + """ + if self._process: + return self._process.exitcode + else: + return None + + @property + def ident(self): + """ + Return identifier (PID) of process or `None` if it has yet to start + """ + if self._process: + return self._process.ident + else: + return None + + def __repr__(self): + # TODO : improve this + return self._process.__repr__() + + def start(self, timeout=None): + """ + Start child process + :param timeout: the maximum time to wait for child process to report it has actually started. + None waits until the update is ready to be called. + """ + + # we lazily create our process delegate (with same arguments) + if self.daemon: + daemonic = True + else: + daemonic = False + + pargs = self._pargs.copy() + pargs.pop('daemonic', None) + + self._process = multiprocessing.Process(**pargs) + + self._process.daemon = daemonic + + # CAREFUL here : multiprocessing documentation specifies that a process object can be started only once... + if self.is_alive(): + # if already started, we shutdown and join before restarting + # not timeout will bock here (default join behavior). + # otherwise we simply use the same timeout. + self.shutdown(join=True, timeout=timeout) # TODO : only restart if no error (check exitcode) + self.start(timeout=timeout) # recursive to try again if needed + else: + self._process.start() + + # timeout None means we want to wait and ensure it has started + # deterministic behavior, like is_alive() from multiprocess.Process is always true after start() + if self._control.wait_for_start(timeout=timeout): # blocks until we know true or false + # TODO: futures, somehow... + return ProcessObserver(self._process.ident) + + # TODO : Implement a way to redirect stdout/stderr, or even forward to parent ? + # cf http://ryanjoneil.github.io/posts/2014-02-14-capturing-stdout-in-a-python-child-process.html + + def terminate(self): + """ + Forcefully terminates the underlying process (using SIGTERM) + CAREFUL : in that case the finally clauses, and context exits will NOT run. + """ + return self._process.terminate() + # TODO : maybe redirect to shutdown here to avoid child process leaks ? + + def shutdown(self, join=True, timeout=None): + """ + Clean shutdown of the node from the parent. + :param join: optionally wait for the process to end (default : True) + :return: None + """ + if self.is_alive(): # check if process started + print("Shutdown initiated") + self._control.set_exit_flag() + if join: + self.join(timeout=timeout) + # TODO : timeout before forcing terminate (SIGTERM) + + exitcode = self._process.exitcode if self._process else None # we return None if the process was never started + return exitcode + + @contextlib.contextmanager + def target_context(self): + self.target_call_start = time.time() + self.target_call_timedelta = 0 + yield + + # TODO : extract that into a (asyncio) task... + def target(self, *args, **kwargs): + """ + The function to overload if inheriting the Process class to implement a specific behavior. + :param args: + :param kwargs: + :return: + """ + # tracking time, so a target defining timedelta parameter will get the time delta (should be optional) + target_call_time = time.time() + self.target_call_timedelta = target_call_time - self.target_call_start + self.target_call_start = target_call_time + + # TODO : this is probably where we could implement a sleep to enforce frequency of calls... + return None + + def run(self, *args, **kwargs): + """ + The Node main method, running in a child process (similar to Process.run() but also accepts args) + A children class can override this method, but it needs to call super().run(*args, **kwargs) + for the node to start properly and call update() as expected. + :param args: arguments to pass to update() + :param kwargs: keyword arguments to pass to update() + :return: last exitcode returned by update() + """ + # TODO : make use of the arguments ? since run is now the target for Process... + + exitstatus = None # keeping the semantic of multiprocessing.Process : running process has None + + try : + # Initializing the required context managers + with pid_registry.registered(self.name, self.ident) as pcm: # TODO : careful about reusing PIDs here... + + if setproctitle and self.new_title: + setproctitle.setproctitle("{0}".format(self.name)) + + print('[{procname}] Process started as [{pid}]'.format(procname=self.name, pid=self.ident)) + + with self._target_context() as cm: + + first_loop = True + # loop listening to connection + while not self._control.exit.is_set(): + + # signalling startup only the first time, just after having check for exit request. + # We need to guarantee at least ONE call to update. + if first_loop: + self._control.started.set() + + # replacing the original Process.run() call, passing arguments to our target + if self._target: + # TODO : use return code to determine when/how we need to run this the next time... + # Also we need to keep the exit status to be able to call external process as an update... + + logging.debug("[{self.name}] calling {self._target.__name__} with args {args} and kwargs {kwargs}...".format(**locals())) + exitstatus = self._target(*args, **kwargs) + + if first_loop: + first_loop = False + + if exitstatus is not None: + break + + if self._control.started.is_set() and exitstatus is None and self._control.exit.is_set(): + # in the not so special case where we started, we didnt get exit code and we exited, + # this is expected as a normal result and we set an exitcode here of 0 + # As 0 is the conventional success for unix process successful run + exitstatus = 0 + + except KeyboardInterrupt: + raise + except Exception: + raise + finally: + logging.debug("[{self.name}] Process stopped.".format(**locals())) + return exitstatus # returning last exit status from the update function + + + + diff --git a/pyzmp/process_async.py b/pyzmp/process_async.py new file mode 100644 index 0000000..33c37dc --- /dev/null +++ b/pyzmp/process_async.py @@ -0,0 +1,191 @@ +# Ref : https://docs.python.org/3.6/library/asyncio-subprocess.html + + +""" +An asyncio implementation of process. +""" +import contextlib +import uuid + +import time + +import logging +import setproctitle +import asyncio + + +class ProcessAsync(object): + """ + Process class that model how a process is started and stopped, can start / stop child processes, + all in a synchronous deterministic manner. + It mainly add synchronization primitives to multiprocessing.Process. + """ + + def __init__(self, name=None, target_context=None, target_override=None, args=None, kwargs=None): + """ + Initializes a ZMP Node (Restartable Python Process communicating via ZMQ) + :param name: Name of the node + :param target_context: a context_manager to be used with run (in a with statement) + :param target_override: a function to override this class target method + :return: + """ + # TODO check name unicity + # using process as delegate + self._pargs = { + 'name': name or str(uuid.uuid4()), + 'args': args or (), + 'kwargs': kwargs or {}, + 'target': self.run, # Careful : our run() is not the same as the one for Process + } + # TODO : we should ensure our args + kwargs are compatible with our target (to avoid later errors) + # Careful : our own target is not the same as the one for Process + self._target = target_override or self.target + self.target_call_start = None + self.target_call_timedelta = None + + #: the actual process instance. lazy creation on start() call only. + self._process = None + self._control = ProcessAsync.Control() + + #: whether or not the node name should be set as the actual process title + #: replacing the string duplicated from the python interpreter run + self.new_title = True + + self._target_context = target_context or self.target_context # TODO: extend to list if possible ( available for python >3.1 only ) + super(ProcessAsync, self).__init__() + + @asyncio.corountine() + await def start(self, timeout=None): + """ + Start child process + :param timeout: the maximum time to wait for child process to report it has actually started. + None waits until the update is ready to be called. + """ + + # we lazily create our process delegate (with same arguments) + if self.daemon: + daemonic = True + else: + daemonic = False + + pargs = self._pargs.copy() + pargs.pop('daemonic', None) + + + # https://github.com/dano/aioprocessing + + + # https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/ + # self._process = await asyncio.create_subprocess_exec(**pargs, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + # + # await asyncio.wait([ + # _read_stream(process.stdout, stdout_cb), + # _read_stream(process.stderr, stderr_cb) + # ]) + # return await process.wait() + + + self._process.daemon = daemonic + + # CAREFUL here : multiprocessing documentation specifies that a process object can be started only once... + if self.is_alive(): + # if already started, we shutdown and join before restarting + # not timeout will bock here (default join behavior). + # otherwise we simply use the same timeout. + self.shutdown(join=True, timeout=timeout) # TODO : only restart if no error (check exitcode) + self.start(timeout=timeout) # recursive to try again if needed + else: + self._process.start() + + # timeout None means we want to wait and ensure it has started + # deterministic behavior, like is_alive() from multiprocess.Process is always true after start() + if self._control.wait_for_start(timeout=timeout): # blocks until we know true or false + # TODO: futures, somehow... + return ProcessObserver(self._process.ident) + + # TODO : Implement a way to redirect stdout/stderr, or even forward to parent ? + # cf http://ryanjoneil.github.io/posts/2014-02-14-capturing-stdout-in-a-python-child-process.html + + + @contextlib.contextmanager + def target_context(self): + self.target_call_start = time.time() + self.target_call_timedelta = 0 + yield + + # TODO : extract that into a (asyncio) task... + def target(self, *args, **kwargs): + """ + The function to overload if inheriting the Process class to implement a specific behavior. + :param args: + :param kwargs: + :return: + """ + # tracking time, so a target defining timedelta parameter will get the time delta (should be optional) + target_call_time = time.time() + self.target_call_timedelta = target_call_time - self.target_call_start + self.target_call_start = target_call_time + + # TODO : this is probably where we could implement a sleep to enforce frequency of calls... + return None + + def run(self, *args, **kwargs): + """ + The Node main method, running in a child process (similar to Process.run() but also accepts args) + A children class can override this method, but it needs to call super().run(*args, **kwargs) + for the node to start properly and call update() as expected. + :param args: arguments to pass to update() + :param kwargs: keyword arguments to pass to update() + :return: last exitcode returned by update() + """ + # TODO : make use of the arguments ? since run is now the target for Process... + + exitstatus = None # keeping the semantic of multiprocessing.Process : running process has None + + try : + # Initializing the required context managers + with pid_registry.registered(self.name, self.ident) as pcm: # TODO : careful about reusing PIDs here... + + if setproctitle and self.new_title: + setproctitle.setproctitle("{0}".format(self.name)) + + print('[{procname}] Process started as [{pid}]'.format(procname=self.name, pid=self.ident)) + + with self._target_context() as cm: + + first_loop = True + # loop listening to connection + while not self._control.exit.is_set(): + + # signalling startup only the first time, just after having check for exit request. + # We need to guarantee at least ONE call to update. + if first_loop: + self._control.started.set() + + # replacing the original Process.run() call, passing arguments to our target + if self._target: + # TODO : use return code to determine when/how we need to run this the next time... + # Also we need to keep the exit status to be able to call external process as an update... + + logging.debug("[{self.name}] calling {self._target.__name__} with args {args} and kwargs {kwargs}...".format(**locals())) + exitstatus = self._target(*args, **kwargs) + + if first_loop: + first_loop = False + + if exitstatus is not None: + break + + if self._control.started.is_set() and exitstatus is None and self._control.exit.is_set(): + # in the not so special case where we started, we didnt get exit code and we exited, + # this is expected as a normal result and we set an exitcode here of 0 + # As 0 is the conventional success for unix process successful run + exitstatus = 0 + + except KeyboardInterrupt: + raise + except Exception: + raise + finally: + logging.debug("[{self.name}] Process stopped.".format(**locals())) + return exitstatus # returning last exit status from the update function \ No newline at end of file diff --git a/pyzmp/process_observer.py b/pyzmp/process_observer.py new file mode 100644 index 0000000..9a3dfd5 --- /dev/null +++ b/pyzmp/process_observer.py @@ -0,0 +1,508 @@ +# -*- coding: utf-8 -*- +# This python package is implementing a very simple multiprocess framework +# The point of it is to be able to fully tests the multiprocess behavior, +# in pure python, without having to run a ROS system. +from __future__ import absolute_import +from __future__ import print_function + +import os +import pty +import shlex +import sys +import tempfile + +import signal + +import mmap +import tty + +if os.name == 'posix' and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess + +import multiprocessing, multiprocessing.reduction #TODO we should probably use subprocess + psutil instead... +import threading +import psutil +import pexpect.fdpexpect, pexpect.popen_spawn, pexpect.spawnbase +import types +import uuid +import io +import errno + +import re +import zmq +import socket +import logging +import pickle +import contextlib +#import dill as pickle +import concurrent.futures + +try: + from queue import Queue, Empty # Python 3 +except ImportError: + from Queue import Queue, Empty # Python 2 + +# allowing pickling of exceptions to transfer it +from collections import namedtuple, OrderedDict + +from .registry import FileBasedRegistry + +import time + +try: + from tblib.decorators import Traceback + # TODO : potential candidates for pickle + tblib replacement for easier serialization + # TODO : - https://github.com/uqfoundation/dill + # TODO : - OR https://github.com/cloudpipe/cloudpickle + # TODO : - OR https://github.com/irmen/Serpent ? + # TODO : - OR https://github.com/esnme/ultrajson ? + # TODO : - OR something else ? +except ImportError: + Traceback = None + +try: + import setproctitle +except ImportError: + setproctitle = None + + +# TODO : Nodelet ( thread, with fast intraprocess zmq comm - entity system design /vs/threadpool ?) + +pid_registry = FileBasedRegistry("pid") + + +def on_terminate(proc): + print("process {} terminated with exit code {}".format(proc, proc.returncode)) + + +class AttachBase(pexpect.spawnbase.SpawnBase): + def __init__(self, timeout=30, maxread=2000, searchwindowsize=None, + logfile=None, encoding=None, codec_errors='strict'): + super(AttachBase, self).__init__(timeout=30, maxread=2000, searchwindowsize=None, + logfile=None, encoding=None, codec_errors='strict') + + +class PopenAttach(AttachBase): + """ Extending http://pexpect.readthedocs.io/en/stable/_modules/pexpect/popen_spawn.html#PopenSpawn to change API """ + if pexpect.spawnbase.PY3: + crlf = '\n'.encode('ascii') + else: + crlf = '\n' + + def __init__(self, cmd, timeout=30, maxread=2000, searchwindowsize=None, + logfile=None, cwd=None, env=None, encoding=None, + codec_errors='strict'): + super(PopenAttach, self).__init__(timeout=timeout, maxread=maxread, + searchwindowsize=searchwindowsize, logfile=logfile, + encoding=encoding, codec_errors=codec_errors) + + kwargs = dict(bufsize=0, stdin=subprocess.PIPE, + stderr=subprocess.STDOUT, stdout=subprocess.PIPE, + cwd=cwd, env=env) + + if sys.platform == 'win32': + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + kwargs['startupinfo'] = startupinfo + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + + if not isinstance(cmd, (list, tuple)): + cmd = shlex.split(cmd) + + # self.proc = subprocess.Popen(cmd, **kwargs) + self.opened = False + self.closed = False + self._buf = self.string_type() + + self._read_queue = Queue() + self._read_thread = threading.Thread(target=self._read_incoming) + self._read_thread.setDaemon(True) + self._read_thread.start() + + _read_reached_eof = False + + def read_nonblocking(self, size, timeout): + buf = self._buf + if self._read_reached_eof: + # We have already finished reading. Use up any buffered data, + # then raise EOF + if buf: + self._buf = buf[size:] + return buf[:size] + else: + self.flag_eof = True + raise pexpect.exceptions.EOF('End Of File (EOF).') + + if timeout == -1: + timeout = self.timeout + elif timeout is None: + timeout = 1e6 + + t0 = time.time() + while (time.time() - t0) < timeout and size and len(buf) < size: + try: + incoming = self._read_queue.get_nowait() + except Empty: + break + else: + if incoming is None: + self._read_reached_eof = True + break + + buf += self._decoder.decode(incoming, final=False) + + r, self._buf = buf[:size], buf[size:] + + self._log(r, 'read') + return r + + def _read_incoming(self): + """Run in a thread to move output from a pipe to a queue.""" + while not self.proc: + fileno = self.proc.stdout.fileno() + while 1: + buf = b'' + try: + buf = os.read(fileno, 1024) + except OSError as e: + self._log(e, 'read') + + if not buf: + # This indicates we have reached EOF + self._read_queue.put(None) + return + + self._read_queue.put(buf) + + def write(self, s): + '''This is similar to send() except that there is no return value. + ''' + self.send(s) + + def writelines(self, sequence): + '''This calls write() for each element in the sequence. + + The sequence can be any iterable object producing strings, typically a + list of strings. This does not add line separators. There is no return + value. + ''' + for s in sequence: + self.send(s) + + def send(self, s): + '''Send data to the subprocess' stdin. + + Returns the number of bytes written. + ''' + s = self._coerce_send_string(s) + self._log(s, 'send') + + b = self._encoder.encode(s, final=False) + if pexpect.spawnbase.PY3: + return self.proc.stdin.write(b) + else: + # On Python 2, .write() returns None, so we return the length of + # bytes written ourselves. This assumes they all got written. + self.proc.stdin.write(b) + return len(b) + + def sendline(self, s=''): + '''Wraps send(), sending string ``s`` to child process, with os.linesep + automatically appended. Returns number of bytes written. ''' + + n = self.send(s) + return n + self.send(self.linesep) + + def wait(self): + '''Wait for the subprocess to finish. + + Returns the exit code. + ''' + status = self.proc.wait() + if status >= 0: + self.exitstatus = status + self.signalstatus = None + else: + self.exitstatus = None + self.signalstatus = -status + self.terminated = True + return status + + def kill(self, sig): + '''Sends a Unix signal to the subprocess. + + Use constants from the :mod:`signal` module to specify which signal. + ''' + + if not self.proc: + return + + if sys.platform == 'win32': + if sig in [signal.SIGINT, signal.CTRL_C_EVENT]: + sig = signal.CTRL_C_EVENT + elif sig in [signal.SIGBREAK, signal.CTRL_BREAK_EVENT]: + sig = signal.CTRL_BREAK_EVENT + else: + sig = signal.SIGTERM + + os.kill(self.proc.pid, sig) + + def sendeof(self): + '''Closes the stdin pipe from the writing end.''' + if not self.proc: + return + self.proc.stdin.close() + + +class ProcessPipeInterface(io.BufferedRWPair): + def __init__(self, buffer_size=io.DEFAULT_BUFFER_SIZE): + super(ProcessPipeInterface, self).__init__(io.BytesIO(), io.BytesIO(), buffer_size) + + + + + + +class ProcessWatcher(object): + """A ProcessWatcher can observe any local running process (even if we did not launch it and are not the parent) + Heavily insprired from http://pexpect.readthedocs.io/en/stable/_modules/pexpect/popen_spawn.html#PopenSpawn """ + + @classmethod + def from_ptyprocess(cls, pexpect_spawn): + # We want to use pexpect tty interactive feature to control a process + return cls(pid=pexpect_spawn.pid, + expect_out=pexpect_spawn, + expect_err=pexpect_spawn) + + @classmethod + def from_subprocess(cls, subprocess_popen): + # building pexpect objects on file descriptors + return cls(pid=subprocess_popen.pid, + expect_out=pexpect.fdpexpect.fdspawn(subprocess_popen.stdout), + expect_err=pexpect.fdpexpect.fdspawn(subprocess_popen.stderr)) + + def __init__(self, out_watchers=None, err_watchers=None, async=True): + """ + Creates a ProcessObserver for the process matching the pid (or the current process if pid is None). + :param err_watcher: a list of pattern to watch for, along with the callback to call. + :param async: On py2 will create another thread to run the err_watcher callbacks. On py3 will use corountines instead. + Setting async to false means the monitor() method need to be called periodically in order to check for pattern in the output + """ + + self._lock = threading.RLock() + self._out_watcher = OrderedDict() + self._out_cpl_pattern = None + self._err_watcher = OrderedDict() + self._err_cpl_pattern = None + + # same as add_err_watcher and add_out_watcher + # careful we need to keep keys order here... + if out_watchers: + self.add_out_watcher(out_watchers) + if err_watchers: + self.add_err_watcher(err_watchers) + + # if async: + # # Optional function, can be used if we are not calling monitor from current process + # # CAREFUL : callback will be done in another thread + # def event_loop(self): # TODO : make this an event loop with asyncio and python 3 + # """ + # Function to monitor the registry entry for this process. + # This needs to be called by the update method of the parent process + # """ + # # with self.monitor_context() as mc: + # while self._expect_out.isalive() or self._expect_err.isalive(): + # stop = self.monitor(mc) + # time.sleep(0.1) + # if stop: + # break + + # async in python3 doesnt need a thread... + # executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + # f = executor.submit(event_loop) # ignoring return here + + # t = threading.Thread(name='threaded_eventloop', target=event_loop, args=(self,)) + # t.start() + + master_fd, slave_fd = pty.openpty() # Ref : https://stackoverflow.com/questions/12419198/python-subprocess-readlines-hangs/12471855#12471855 + + # p = subprocess.Popen(['python'], stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # pin = os.fdopen(master, 'w') + # tty.setcbreak(sys.stdin) + + + # stdin_fileno, fpath_stdin = tempfile.mkstemp(suffix="-in", prefix="watched-") + # stdout_fileno, fpath_stdout = tempfile.mkstemp(suffix="-out", prefix="watched-") + # stderr_fileno, fpath_stderr = tempfile.mkstemp(suffix="-err", prefix="watched-") + # + # stdin_fileno.write("\0") + + # mmapped_in = mmap.mmap(slave, length=0, access=mmap.ACCESS_WRITE) + # mmapped_out = mmap.mmap(master, length=0, access=mmap.ACCESS_READ) + # #mmapped_err = mmap.mmap(stderr_fileno, length=0, access=mmap.ACCESS_READ) + # + # # Here we return a pair of buffered io, to enable starting a subprocess afterwards. + # # This is required for short run processes to not miss any message. + # self.in_pipe = io.BufferedWriter(mmapped_in, buffer_size=io.DEFAULT_BUFFER_SIZE) + # self.out_pipe = io.BufferedReader(mmapped_out, buffer_size=io.DEFAULT_BUFFER_SIZE) + # self.err_pipe = io.BufferedReader(mmapped_err, buffer_size=io.DEFAULT_BUFFER_SIZE) + # + # self._expect_out = pexpect.fdpexpect.fdspawn(self.out_pipe) + # self._expect_err = pexpect.fdpexpect.fdspawn(self.err_pipe) + + #self._process = psutil.Process(pid=process.pid) + # TODO : detect if the process has virtual terminal attached or not... + + self.pty_master = master_fd + self.pty_slave = slave_fd + + self._process = None + + def ppid(self): + """ delegating to _process if available""" + if self._process: + return self._process.ppid() + + + # def expect_out(self, pattern_list, timeout=-1, searchwindowsize=-1, async=False): + # if self._expect_out: + # return self._expect_out.expect(pattern=pattern_list, timeout=timeout, searchwindowsize=searchwindowsize, async=async) + # + # def expect_err(self, pattern_list, timeout=-1, searchwindowsize=-1, async=False): + # if self._expect_err: + # return self._expect_err.expect(pattern=pattern_list, timeout=timeout, searchwindowsize=searchwindowsize, async=async) + # + # def expect_out_exact(self, pattern_list, timeout=-1, searchwindowsize=-1, async=False): + # if self._expect_out: + # return self._expect_out.expect_exact(pattern_list=pattern_list, timeout=timeout, searchwindowsize=searchwindowsize, async=async) + # + # def expect_err_exact(self, pattern_list, timeout=-1, searchwindowsize=-1, async=False): + # if self._expect_err: + # return self._expect_err.expect_exact(pattern_list=pattern_list, timeout=timeout, searchwindowsize=searchwindowsize, async=async) + + def add_err_watcher(self, watchers): + with self._lock: + for pattern, fun in watchers.items(): + assert callable(fun) + self._err_watcher[pattern] = fun + + def add_out_watcher(self, watchers): + with self._lock: + for pattern, fun in watchers.items(): + assert callable(fun) + self._out_watcher[pattern] = fun + + # @contextlib.contextmanager + # def monitor_context(self): + # last_err_cpl = self._err_watcher.keys() + # last_out_cpl = self._out_watcher.keys() + # self._err_cpl_pattern = self._expect_err.compile_pattern_list(last_err_cpl) + # self._out_cpl_pattern = self._expect_out.compile_pattern_list(last_out_cpl) + # yield last_out_cpl, last_err_cpl + # pass # we should keep cleanup as minimal as possible (will not be run when process crashes/is killed) + + def attach(self, pid): + self._process = psutil.Process(pid) + of = self._process.open_files() + + # write into registry + pid_registry[pid] = self._process + + def monitor(self): #, monitor_context): + """ + Function to monitor the registry entry for this process. + This needs to be called by the update method of the parent process + """ + + # # if there is a change in patterns to watch, we recompile it + # if monitor_context[1] != self._err_watcher.keys(): + # self._err_cpl_pattern = self._expect_err.compile_pattern_list(self._err_watcher.keys()) + # if monitor_context[0] != self._out_watcher.keys(): + # self._out_cpl_pattern = self._expect_out.compile_pattern_list(self._out_watcher.keys()) + # + # with self._lock: + # try: + # # TODO : make this a corountine with asyncio and python 3 + # i = self.expect_err(self._err_cpl_pattern, 1) + # if i: + # # calling function for this pattern + # self._err_watcher[i]() + # except pexpect.TIMEOUT: + # pass # we pass after timeout waiting + # except pexpect.EOF: + # pass # we pass if there is nothing to read + # + # try: + # # TODO : make this a corountine with asyncio and python 3 + # i = self.expect_out(self._out_cpl_pattern, 1) + # if i: + # # calling function for this pattern + # self._out_watcher[i]() + # except pexpect.TIMEOUT: + # pass # we pass after timeout waiting + # except pexpect.EOF: + # pass # we pass if there is nothing to read + + + # need this for attaching when process is detected + if self.isalive() and not self._process: + # find the pid + for fd in self._process.open_files(): + print(fd) + + #self.attach(self._expect.pid) + + + # need this for a delayed cleanup in case of process termination/crash + if self._process and not self._process.is_running(): + pid_registry.pop(self._process.pid) + return True # return True to stop looping + + def terminate(self): + if self._process: + return self._process.terminate() + + def kill(self): + if self._process: + return self._process.kill() + + def __del__(self): # TODO : is this really NEEDED ? + """Upon deletion, we want to get rid of everything, as properly as possible""" + for p in self._process.children(): + p.terminate() + gone, still_alive = psutil.wait_procs(self, timeout=3, callback=on_terminate) + for p in still_alive: + p.kill() + + +def discover_process(name_regex='.*', timeout=None): + """ + Discovers all processes. + Note : we do not want to make the discovery block undefinitely since we never know for sure if a process is running or not + TODO : improve with future... + :param name_regex: regex to filter the nodes by name/uuid + :param timeout: maximum number of seconds the discover can wait for a discovery matching requirements. if None, doesn't wait. + """ + start = time.time() + endtime = timeout if timeout else 0 + + reg = re.compile(name_regex) + + while True: + timed_out = time.time() - start > endtime + dp = { + p: ProcessObserver(pid_registry[p]) + for p in pid_registry if reg.match(p) + # filtering by regex here TODO : move that feature to the Registry + } # return right away if we have something + + if dp: + return dp + elif timed_out: + break + # else we keep looping after a short sleep ( to allow time to refresh services list ) + time.sleep(0.2) # sleep + return None + diff --git a/pyzmp/registry.py b/pyzmp/registry.py new file mode 100644 index 0000000..eeac10d --- /dev/null +++ b/pyzmp/registry.py @@ -0,0 +1,103 @@ +from __future__ import absolute_import, division, print_function + +import contextlib +from io import open + +import os +import tempfile +import abc + +import collections +import yaml +import errno + + +# TODO : namedtuples ? CRDT ? +class FileBasedRegistry(collections.MutableMapping): + """ + Implements a Registry as a set of files, each one containing only one attribute. + """ + + def __init__(self, value_desc, representer=None, constructor=None): + """ + Initialize the registry + :param value_desc: The description of the value stored in this registry + :param representer: The YAML representer + :param constructor: The YAML constructor + """ + self.desc = value_desc + self.representer = representer + self.constructor = constructor + + @staticmethod + def _get_registry_path(): + """ + A deterministic way to find the path to a registry, so it can be used in any context. + :return: + """ + _zmp_froot = os.path.join(tempfile.gettempdir(), 'zmp') + return _zmp_froot + + def _name2filepath(self, name): + # trying to follow the de-facto standard way to register daemon process info (as "name.pid" file for example) + fname = os.path.join(FileBasedRegistry._get_registry_path(), name + os.extsep + self.desc) + return fname + + def _filepath2name(self): + for f in os.listdir(FileBasedRegistry._get_registry_path()): + if f.endswith(os.extsep + self.desc): + yield os.path.basename(f)[:-len(os.extsep + self.desc)] + + def __setitem__(self, key, value): + attrfname = self._name2filepath(key) + try: + with open(attrfname, "w") as fh: + # Note : we use yaml as a codec + yaml.dump(value, fh, default_flow_style=False) + except IOError as ioe: + if ioe.errno == errno.ENOENT: # No such file or directory + # TODO : handle all possible cases + os.makedirs(os.path.dirname(attrfname)) + # now we can try again... + with open(attrfname, "w") as fh: + yaml.dump(value, fh, default_flow_style=False) + + def __delitem__(self, key): + pidfname = self._name2filepath(key) + os.remove(pidfname) + + def __getitem__(self, item): + fname = self._name2filepath(item) + try: + with open(fname, "r") as fh: + attr = yaml.load(fh) + return attr + except IOError as ioe: + if ioe.errno == errno.ENOENT: + raise KeyError + + def __iter__(self): + for name in self._filepath2name(): + yield name + + def __len__(self): + return len([a for a in self._filepath2name()]) + + def __str__(self): + return str({n: getattr(self, n) for n in self}) + + def __repr__(self): + return str({n: getattr(self, n) for n in self}) + + @contextlib.contextmanager + def registered(self, name, value): + # advertise itself + self[name] = value + + # Do not yield until we are register (otherwise noone can find us, there is no point.) + yield + + # concealing itself + # Note this will not be done if the process is killed or crash... + self.pop(name) + diff --git a/pyzmp/service.py b/pyzmp/service.py index cf654fa..33cfa67 100644 --- a/pyzmp/service.py +++ b/pyzmp/service.py @@ -28,6 +28,9 @@ # Lock is definitely needed ( not implemented in proxy objects, unless the object itself already has it, like Queue ) services_lock = manager.Lock() services = manager.dict() +# TODO : drop this and retrieve directly from Nodes. +# TODO : later we can create a "cache" of services, but this will create more problems, +# so it should just be an option, not a basis feature.... @contextlib.contextmanager @@ -91,7 +94,7 @@ def discover(name, timeout=None, minimum_providers=1): def __init__(self, name, providers=None): self.name = name self.providers = providers - # TODO : make a provide just a list of node names, and have connection URLs somewhere else... + # TODO : make a provide just a list of node names/ids, and retrieve connection URLs from registry... # TODO : implement async_call ( and return future ) def call(self, args=None, kwargs=None, node=None, send_timeout=1000, recv_timeout=5000, zmq_ctx=None): @@ -153,3 +156,10 @@ def call(self, args=None, kwargs=None, node=None, send_timeout=1000, recv_timeou # convenience discover = Service.discover + + +# TODO : +# class SteamListener: +# """the class with inverted control flow compared to NodeClient : everything is callback""" +# def listen_on(self, svc_name): +# """Setup a callback when some data arrived, stream like... using observables / futures ? """ \ No newline at end of file diff --git a/pyzmp/syncio/__init__.py b/pyzmp/syncio/__init__.py new file mode 100644 index 0000000..28642be --- /dev/null +++ b/pyzmp/syncio/__init__.py @@ -0,0 +1,11 @@ +""" +This package aims at implementing a TRIVIAL asyncio implementation, without coroutines, +and therefore usable on python2. +We will follow python3 asyncio API, but using normal functions instead of coroutines. +We will assume that : + - the function will eventually end +We will enforce that : + - only one function can be passed to an eventloop + - we can have only one task at a time + +""" \ No newline at end of file diff --git a/pyzmp/syncio/events.py b/pyzmp/syncio/events.py new file mode 100644 index 0000000..5bc9479 --- /dev/null +++ b/pyzmp/syncio/events.py @@ -0,0 +1,536 @@ +from __future__ import absolute_import, division, print_function + +import os +import threading + +"""Event loop and event loop policy.""" + +__all__ = ['AbstractEventLoopPolicy', + 'AbstractEventLoop', 'AbstractServer', + 'Handle', 'TimerHandle', + 'get_event_loop_policy', 'set_event_loop_policy', + 'get_event_loop', 'set_event_loop', 'new_event_loop', + 'get_child_watcher', 'set_child_watcher', + '_set_running_loop', '_get_running_loop', + ] + +import functools +import inspect +import os +import reprlib +import socket +import subprocess +import sys +import threading +import traceback + + +def _get_function_source(func): + func = inspect.unwrap(func) + if inspect.isfunction(func): + code = func.__code__ + return (code.co_filename, code.co_firstlineno) + if isinstance(func, functools.partial): + return _get_function_source(func.func) + if isinstance(func, functools.partialmethod): + return _get_function_source(func.func) + return None + + +def _format_args_and_kwargs(args, kwargs): + """Format function arguments and keyword arguments. + + Special case for a single parameter: ('hello',) is formatted as ('hello'). + """ + # use reprlib to limit the length of the output + items = [] + if args: + items.extend(reprlib.repr(arg) for arg in args) + if kwargs: + items.extend('{}={}'.format(k, reprlib.repr(v)) + for k, v in kwargs.items()) + return '(' + ', '.join(items) + ')' + + +def _format_callback(func, args, kwargs, suffix=''): + if isinstance(func, functools.partial): + suffix = _format_args_and_kwargs(args, kwargs) + suffix + return _format_callback(func.func, func.args, func.keywords, suffix) + + if hasattr(func, '__qualname__'): + func_repr = getattr(func, '__qualname__') + elif hasattr(func, '__name__'): + func_repr = getattr(func, '__name__') + else: + func_repr = repr(func) + + func_repr += _format_args_and_kwargs(args, kwargs) + if suffix: + func_repr += suffix + return func_repr + +def _format_callback_source(func, args): + func_repr = _format_callback(func, args, None) + source = _get_function_source(func) + if source: + func_repr += ' at %s:%s' % source + return func_repr + + + +""" +Event loop, using asyncio for py3 or custom implementation for py2 + +""" + + +class EventLoop(): # TODO : link with asyncio on py3 + """ + An AbstractEventLoop built for concurrent.futures and py2 + """ + + # Running and stopping the event loop. + + def run_forever(self): + """Run the event loop until stop() is called.""" + raise NotImplementedError + + def run_until_complete(self, future): + """Run the event loop until a Future is done. + + Return the Future's result, or raise its exception. + """ + raise NotImplementedError + + def stop(self): + """Stop the event loop as soon as reasonable. + + Exactly how soon that is may depend on the implementation, but + no more I/O callbacks should be scheduled. + """ + raise NotImplementedError + + def is_running(self): + """Return whether the event loop is currently running.""" + raise NotImplementedError + + def is_closed(self): + """Returns True if the event loop was closed.""" + raise NotImplementedError + + def close(self): + """Close the loop. + + The loop should not be running. + + This is idempotent and irreversible. + + No other methods should be called after this one. + """ + raise NotImplementedError + + def shutdown_asyncgens(self): + """Shutdown all active asynchronous generators.""" + raise NotImplementedError + + # Methods scheduling callbacks. All these return Handles. + + def _timer_handle_cancelled(self, handle): + """Notification that a TimerHandle has been cancelled.""" + raise NotImplementedError + + def call_soon(self, callback, *args): + return self.call_later(0, callback, *args) + + def call_later(self, delay, callback, *args): + raise NotImplementedError + + def call_at(self, when, callback, *args): + raise NotImplementedError + + def time(self): + raise NotImplementedError + + def create_future(self): + raise NotImplementedError + + # Method scheduling a coroutine object: create a task. + + def create_task(self, coro): + raise NotImplementedError + + # Methods for interacting with threads. + + def call_soon_threadsafe(self, callback, *args): + raise NotImplementedError + + def run_in_executor(self, executor, func, *args): + raise NotImplementedError + + def set_default_executor(self, executor): + raise NotImplementedError + + # Network I/O methods returning Futures. + + def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): + raise NotImplementedError + + def getnameinfo(self, sockaddr, flags=0): + raise NotImplementedError + + def create_connection(self, protocol_factory, host=None, port=None, *, + ssl=None, family=0, proto=0, flags=0, sock=None, + local_addr=None, server_hostname=None): + raise NotImplementedError + + def create_server(self, protocol_factory, host=None, port=None, *, + family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, + sock=None, backlog=100, ssl=None, reuse_address=None, + reuse_port=None): + """A coroutine which creates a TCP server bound to host and port. + + The return value is a Server object which can be used to stop + the service. + + If host is an empty string or None all interfaces are assumed + and a list of multiple sockets will be returned (most likely + one for IPv4 and another one for IPv6). The host parameter can also be a + sequence (e.g. list) of hosts to bind to. + + family can be set to either AF_INET or AF_INET6 to force the + socket to use IPv4 or IPv6. If not set it will be determined + from host (defaults to AF_UNSPEC). + + flags is a bitmask for getaddrinfo(). + + sock can optionally be specified in order to use a preexisting + socket object. + + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). + + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified will automatically be set to True on + UNIX. + + reuse_port tells the kernel to allow this endpoint to be bound to + the same port as other existing endpoints are bound to, so long as + they all set this flag when being created. This option is not + supported on Windows. + """ + raise NotImplementedError + + def create_unix_connection(self, protocol_factory, path, *, + ssl=None, sock=None, + server_hostname=None): + raise NotImplementedError + + def create_unix_server(self, protocol_factory, path, *, + sock=None, backlog=100, ssl=None): + """A coroutine which creates a UNIX Domain Socket server. + + The return value is a Server object, which can be used to stop + the service. + + path is a str, representing a file systsem path to bind the + server socket to. + + sock can optionally be specified in order to use a preexisting + socket object. + + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). + + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + """ + raise NotImplementedError + + def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, *, + family=0, proto=0, flags=0, + reuse_address=None, reuse_port=None, + allow_broadcast=None, sock=None): + """A coroutine which creates a datagram endpoint. + + This method will try to establish the endpoint in the background. + When successful, the coroutine returns a (transport, protocol) pair. + + protocol_factory must be a callable returning a protocol instance. + + socket family AF_INET or socket.AF_INET6 depending on host (or + family if specified), socket type SOCK_DGRAM. + + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified it will automatically be set to True on + UNIX. + + reuse_port tells the kernel to allow this endpoint to be bound to + the same port as other existing endpoints are bound to, so long as + they all set this flag when being created. This option is not + supported on Windows and some UNIX's. If the + :py:data:`~socket.SO_REUSEPORT` constant is not defined then this + capability is unsupported. + + allow_broadcast tells the kernel to allow this endpoint to send + messages to the broadcast address. + + sock can optionally be specified in order to use a preexisting + socket object. + """ + raise NotImplementedError + + # Pipes and subprocesses. + + def connect_read_pipe(self, protocol_factory, pipe): + """Register read pipe in event loop. Set the pipe to non-blocking mode. + + protocol_factory should instantiate object with Protocol interface. + pipe is a file-like object. + Return pair (transport, protocol), where transport supports the + ReadTransport interface.""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError + + def connect_write_pipe(self, protocol_factory, pipe): + """Register write pipe in event loop. + + protocol_factory should instantiate object with BaseProtocol interface. + Pipe is file-like object already switched to nonblocking. + Return pair (transport, protocol), where transport support + WriteTransport interface.""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError + + def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + # Ready-based callback registration methods. + # The add_*() methods return None. + # The remove_*() methods return True if something was removed, + # False if there was nothing to delete. + + def add_reader(self, fd, callback, *args): + raise NotImplementedError + + def remove_reader(self, fd): + raise NotImplementedError + + def add_writer(self, fd, callback, *args): + raise NotImplementedError + + def remove_writer(self, fd): + raise NotImplementedError + + # Completion based I/O methods returning Futures. + + def sock_recv(self, sock, nbytes): + raise NotImplementedError + + def sock_sendall(self, sock, data): + raise NotImplementedError + + def sock_connect(self, sock, address): + raise NotImplementedError + + def sock_accept(self, sock): + raise NotImplementedError + + # Signal handling. + + def add_signal_handler(self, sig, callback, *args): + raise NotImplementedError + + def remove_signal_handler(self, sig): + raise NotImplementedError + + # Task factory. + + def set_task_factory(self, factory): + raise NotImplementedError + + def get_task_factory(self): + raise NotImplementedError + + # Error handlers. + + def get_exception_handler(self): + raise NotImplementedError + + def set_exception_handler(self, handler): + raise NotImplementedError + + def default_exception_handler(self, context): + raise NotImplementedError + + def call_exception_handler(self, context): + raise NotImplementedError + + # Debug flag management. + + def get_debug(self): + raise NotImplementedError + + def set_debug(self, enabled): + raise NotImplementedError + +class SimpleEventLoopPolicy: # TODO : link with asyncio on py3 + """Simple policy implementation for accessing the event loop. + + In this policy, each thread has its own event loop. However, we + only automatically create an event loop by default for the main + thread; other threads by default have no event loop. + """ + + _loop_factory = None + + class _Local(threading.local): + _loop = None + _set_called = False + + def __init__(self): + self._local = self._Local() + + def get_event_loop(self): + """Get the event loop. + + This may be None or an instance of EventLoop. + """ + if (self._local._loop is None and + not self._local._set_called and + isinstance(threading.current_thread(), threading._MainThread)): + self.set_event_loop(self.new_event_loop()) + if self._local._loop is None: + raise RuntimeError('There is no current event loop in thread %r.' + % threading.current_thread().name) + return self._local._loop + + def set_event_loop(self, loop): + """Set the event loop.""" + self._local._set_called = True + assert loop is None or isinstance(loop, SimpleEventLoopPolicy) + self._local._loop = loop + + def new_event_loop(self): + """Create a new event loop. + + You must call set_event_loop() to make this the current event + loop. + """ + return self._loop_factory() + +# Event loop policy. The policy itself is always global, even if the +# policy's rules say that there is an event loop per thread (or other +# notion of context). The default policy is installed by the first +# call to get_event_loop_policy(). +_event_loop_policy = None + +# Lock for protecting the on-the-fly creation of the event loop policy. +_lock = threading.Lock() + + +# A TLS for the running event loop, used by _get_running_loop. +class _RunningLoop(threading.local): + _loop = None + _pid = None + + +_running_loop = _RunningLoop() + + +def _get_running_loop(): + """Return the running event loop or None. + + This is a low-level function intended to be used by event loops. + This function is thread-specific. + """ + running_loop = _running_loop._loop + if running_loop is not None and _running_loop._pid == os.getpid(): + return running_loop + + +def _set_running_loop(loop): + """Set the running event loop. + + This is a low-level function intended to be used by event loops. + This function is thread-specific. + """ + _running_loop._pid = os.getpid() + _running_loop._loop = loop + + +def _init_event_loop_policy(): + global _event_loop_policy + with _lock: + if _event_loop_policy is None: + _event_loop_policy = SimpleEventLoopPolicy() + + +def get_event_loop_policy(): + """Get the current event loop policy.""" + if _event_loop_policy is None: + _init_event_loop_policy() + return _event_loop_policy + + +def set_event_loop_policy(policy): + """Set the current event loop policy. + + If policy is None, the default policy is restored.""" + global _event_loop_policy + assert policy is None or isinstance(policy, SimpleEventLoopPolicy) + _event_loop_policy = policy + + +def get_event_loop(): + """Return an asyncio event loop. + + When called from a coroutine or a callback (e.g. scheduled with call_soon + or similar API), this function will always return the running event loop. + + If there is no running event loop set, the function will return + the result of `get_event_loop_policy().get_event_loop()` call. + """ + current_loop = _get_running_loop() + if current_loop is not None: + return current_loop + return None + + +def set_event_loop(loop): + """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" + get_event_loop_policy().set_event_loop(loop) + + +def new_event_loop(): + """Equivalent to calling get_event_loop_policy().new_event_loop().""" + return get_event_loop_policy().new_event_loop() + + +def get_child_watcher(): + """Equivalent to calling get_event_loop_policy().get_child_watcher().""" + return get_event_loop_policy().get_child_watcher() + + +def set_child_watcher(watcher): + """Equivalent to calling + get_event_loop_policy().set_child_watcher(watcher).""" + return get_event_loop_policy().set_child_watcher(watcher) \ No newline at end of file diff --git a/pyzmp/syncio/task.py b/pyzmp/syncio/task.py new file mode 100644 index 0000000..fd38dee --- /dev/null +++ b/pyzmp/syncio/task.py @@ -0,0 +1,440 @@ +from __future__ import absolute_import, division, print_function + +import inspect + +import concurrent.futures._base +import reprlib + +from . import events + +Error = concurrent.futures._base.Error +CancelledError = concurrent.futures.CancelledError +TimeoutError = concurrent.futures.TimeoutError + + +class InvalidStateError(Error): + """The operation is not allowed in this state.""" + + +def _format_callbacks(cb): + """helper function for Future.__repr__""" + size = len(cb) + if not size: + cb = '' + + def format_cb(callback): + return events._format_callback_source(callback, ()) + + if size == 1: + cb = format_cb(cb[0]) + elif size == 2: + cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1])) + elif size > 2: + cb = '{}, <{} more>, {}'.format(format_cb(cb[0]), + size - 2, + format_cb(cb[-1])) + return 'cb=[%s]' % cb + + +def _future_repr_info(future): + # (Future) -> str + """helper function for Future.__repr__""" + info = [future._state.lower()] + if future._state == concurrent.futures._base.FINISHED: + if future._exception is not None: + info.append('exception={!r}'.format(future._exception)) + else: + # use reprlib to limit the length of the output, especially + # for very long strings + result = reprlib.repr(future._result) + info.append('result={}'.format(result)) + if future._callbacks: + info.append(_format_callbacks(future._callbacks)) + if future._source_traceback: + frame = future._source_traceback[-1] + info.append('created at %s:%s' % (frame[0], frame[1])) + return info + + +def _format_routine(ro): + if not hasattr(ro, 'cr_code') and not hasattr(ro, 'gi_code'): + # Most likely a built-in type. + + # Built-in types might not have __qualname__ or __name__. + ro_name = getattr( + ro, '__qualname__', + getattr(ro, '__name__', type(ro).__name__)) + ro_name = '{}()'.format(ro_name) + + running = False + try: + running = ro.cr_running + except AttributeError: + try: + running = ro.gi_running + except AttributeError: + pass + + if running: + return '{} running'.format(ro_name) + else: + return ro_name + + ro_name = None + func = ro + + if ro_name is None: + ro_name = events._format_callback(func, (), {}) + + try: + ro_code = ro.gi_code + except AttributeError: + ro_code = ro.cr_code + + try: + ro_frame = ro.gi_frame + except AttributeError: + ro_frame = ro.cr_frame + + filename = ro_code.co_filename + lineno = 0 + if ro_frame is not None: + lineno = ro_frame.f_lineno + ro_repr = ('%s running at %s:%s' + % (ro_name, filename, lineno)) + else: + lineno = ro_code.co_firstlineno + ro_repr = ('%s done, defined at %s:%s' + % (ro_name, filename, lineno)) + + return ro_repr + + +import linecache +import traceback + + +def _task_repr_info(task): + info = _future_repr_info(task) + + if task._must_cancel: + # replace status + info[0] = 'cancelling' + + ro = _format_routine(task._ro) + info.insert(1, 'coro=<%s>' % ro) + + if task._fut_waiter is not None: + info.insert(2, 'wait_for=%r' % task._fut_waiter) + return info + + +def _task_get_stack(task, limit): + frames = [] + try: + # 'async def' coroutines + f = task._ro.cr_frame + except AttributeError: + f = task._ro.gi_frame + if f is not None: + while f is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(f) + f = f.f_back + frames.reverse() + elif task._exception is not None: + tb = task._exception.__traceback__ + while tb is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(tb.tb_frame) + tb = tb.tb_next + return frames + + +def _task_print_stack(task, limit, file): + extracted_list = [] + checked = set() + for f in task.get_stack(limit=limit): + lineno = f.f_lineno + co = f.f_code + filename = co.co_filename + name = co.co_name + if filename not in checked: + checked.add(filename) + linecache.checkcache(filename) + line = linecache.getline(filename, lineno, f.f_globals) + extracted_list.append((filename, lineno, name, line)) + exc = task._exception + if not extracted_list: + print('No stack for %r' % task, file=file) + elif exc is not None: + print('Traceback for %r (most recent call last):' % task, + file=file) + else: + print('Stack for %r (most recent call last):' % task, + file=file) + traceback.print_list(extracted_list, file=file) + if exc is not None: + for line in traceback.format_exception_only(exc.__class__, exc): + print(line, file=file, end='') + + + + +import weakref + +from . import events + + +""" +A Task is a completely serializable, atomic, unit of computing, that can be transferred between Threads (and therefore Processes). + +""" + + +class Task(concurrent.futures.Future): # TODO : asyncio on py3 + """ + A function call wrapped in a Future. + Provides a similar API to asyncio.Task for upward compatibility. + + Here we only have one function call, instead of a set of coroutines. + It is the most trivial implementation of a py3 asyncio Task, but working on python2.7. + """ + + # An important invariant maintained while a Task not done: + # + # - Either _fut_waiter is None, and _step() is scheduled; + # - or _fut_waiter is some Future, and _step() is *not* scheduled. + # + # The only transition from the latter to the former is through + # _wakeup(). When _fut_waiter is not None, one of its callbacks + # must be _wakeup(). + + # Weak set containing all tasks alive. In our case, only 1. + _all_tasks = weakref.WeakSet() + + # Dictionary containing tasks that are currently active in + # all running event loops. {EventLoop: Task} + _current_tasks = {} + + # If False, don't log a message if the task is destroyed whereas its + # status is still pending + _log_destroy_pending = True + + @classmethod + def current_task(cls, loop=None): + """Return the currently running task in an event loop or None. + + By default the current task for the current event loop is returned. + + None is returned when called not in the context of a Task. + """ + if loop is None: + loop = events.get_event_loop() + return cls._current_tasks.get(loop) + + @classmethod + def all_tasks(cls, loop=None): + """Return a set of all tasks for an event loop. + + By default all tasks for the current event loop are returned. + """ + if loop is None: + loop = events.get_event_loop() + return {t for t in cls._all_tasks if t._loop is loop} + + def __init__(self, ro, *, loop=None): + super(Task, self).__init__(loop=loop) + if self._source_traceback: + del self._source_traceback[-1] + self._ro = ro + self._fut_waiter = None + self._must_cancel = False + self._loop.call_soon(self._step) + self.__class__._all_tasks.add(self) + + def __del__(self): + if self._state == concurrent.futures._base.PENDING and self._log_destroy_pending: + context = { + 'task': self, + 'message': 'Task was destroyed but it is pending!', + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + super(Task, self).__del__(self) + + def _repr_info(self): + return _task_repr_info(self) + + def get_stack(self, *, limit=None): + """Return the list of stack frames for this task's coroutine. + + If the coroutine is not done, this returns the stack where it is + suspended. If the coroutine has completed successfully or was + cancelled, this returns an empty list. If the coroutine was + terminated by an exception, this returns the list of traceback + frames. + + The frames are always ordered from oldest to newest. + + The optional limit gives the maximum number of frames to + return; by default all available frames are returned. Its + meaning differs depending on whether a stack or a traceback is + returned: the newest frames of a stack are returned, but the + oldest frames of a traceback are returned. (This matches the + behavior of the traceback module.) + + For reasons beyond our control, only one stack frame is + returned for a suspended coroutine. + """ + return _task_get_stack(self, limit) + + def print_stack(self, *, limit=None, file=None): + """Print the stack or traceback for this task's coroutine. + + This produces output similar to that of the traceback module, + for the frames retrieved by get_stack(). The limit argument + is passed to get_stack(). The file argument is an I/O stream + to which the output is written; by default output is written + to sys.stderr. + """ + return _task_print_stack(self, limit, file) + + def cancel(self): + """Request that this task cancel itself. + + This arranges for a CancelledError to be thrown into the + wrapped coroutine on the next cycle through the event loop. + The coroutine then has a chance to clean up or even deny + the request using try/except/finally. + + Unlike Future.cancel, this does not guarantee that the + task will be cancelled: the exception might be caught and + acted upon, delaying cancellation of the task or preventing + cancellation completely. The task may also return a value or + raise a different exception. + + Immediately after this method is called, Task.cancelled() will + not return True (unless the task was already cancelled). A + task will be marked as cancelled when the wrapped coroutine + terminates with a CancelledError exception (even if cancel() + was not called). + """ + if self.done(): + return False + if self._fut_waiter is not None: + if self._fut_waiter.cancel(): + # Leave self._fut_waiter; it may be a Task that + # catches and ignores the cancellation so we may have + # to cancel it again later. + return True + # It must be the case that self._step is already scheduled. + self._must_cancel = True + return True + + def _step(self, exc=None): + assert not self.done(), \ + '_step(): already done: {!r}, {!r}'.format(self, exc) + if self._must_cancel: + if not isinstance(exc, CancelledError): + exc = CancelledError() + self._must_cancel = False + coro = self._coro + self._fut_waiter = None + + self.__class__._current_tasks[self._loop] = self + # Call either coro.throw(exc) or coro.send(None). + try: + if exc is None: + # We use the `send` method directly, because coroutines + # don't have `__iter__` and `__next__` methods. + result = coro.send(None) + else: + result = coro.throw(exc) + except StopIteration as exc: + if self._must_cancel: + # Task is cancelled right before coro stops. + self._must_cancel = False + self.set_exception(CancelledError()) + else: + self.set_result(exc.value) + except CancelledError: + super(Task, self).cancel() # I.e., Future.cancel(self). + except Exception as exc: + self.set_exception(exc) + except BaseException as exc: + self.set_exception(exc) + raise + else: + blocking = getattr(result, '_asyncio_future_blocking', None) + if blocking is not None: + # Yielded Future must come from Future.__iter__(). + if result._loop is not self._loop: + self._loop.call_soon( + self._step, + RuntimeError( + 'Task {!r} got Future {!r} attached to a ' + 'different loop'.format(self, result))) + elif blocking: + if result is self: + self._loop.call_soon( + self._step, + RuntimeError( + 'Task cannot await on itself: {!r}'.format( + self))) + else: + result._asyncio_future_blocking = False + result.add_done_callback(self._wakeup) + self._fut_waiter = result + if self._must_cancel: + if self._fut_waiter.cancel(): + self._must_cancel = False + else: + self._loop.call_soon( + self._step, + RuntimeError( + 'yield was used instead of yield from ' + 'in task {!r} with {!r}'.format(self, result))) + elif result is None: + # Bare yield relinquishes control for one event loop iteration. + self._loop.call_soon(self._step) + elif inspect.isgenerator(result): + # Yielding a generator is just wrong. + self._loop.call_soon( + self._step, + RuntimeError( + 'yield was used instead of yield from for ' + 'generator in task {!r} with {}'.format( + self, result))) + else: + # Yielding something else is an error. + self._loop.call_soon( + self._step, + RuntimeError( + 'Task got bad yield: {!r}'.format(result))) + finally: + self.__class__._current_tasks.pop(self._loop) + self = None # Needed to break cycles when an exception occurs. + + def _wakeup(self, future): + try: + future.result() + except Exception as exc: + # This may also be a cancellation. + self._step(exc) + else: + # Don't pass the value of `future.result()` explicitly, + # as `Future.__iter__` and `Future.__await__` don't need it. + # If we call `_step(value, None)` instead of `_step()`, + # Python eval loop would use `.send(value)` method call, + # instead of `__next__()`, which is slower for futures + # that return non-generator iterators from their `__iter__`. + self._step() + self = None # Needed to break cycles when an exception occurs. diff --git a/pyzmp/syncio/tests/__init__.py b/pyzmp/syncio/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyzmp/tests/test_node.py b/pyzmp/tests/test_node.py index 82f5513..9c9c5b9 100644 --- a/pyzmp/tests/test_node.py +++ b/pyzmp/tests/test_node.py @@ -3,6 +3,7 @@ # To allow python to run these tests as main script import functools +import inspect import multiprocessing import sys import os @@ -23,290 +24,88 @@ # TODO : PYPY # http://pypy.org/ -# TODO : Test Node exception : correctly transmitted, node still keeps spinning... +from pyzmp.node import Node, discover -@pytest.mark.timeout(5) -def test_node_termination(): - """Checks that a node can be shutdown without being started and indicate that it never ran""" - n1 = pyzmp.Node() - assert not n1.is_alive() - exitcode = n1.shutdown() # shutdown should have no effect here (if not started, same as noop ) - assert exitcode is None # exitcode should be None (process didn't start and didn't stop so no exit code) - assert not n1.is_alive() +# IPC protocol +# Node as fixture to guarantee cleanup +# Better to have IPC as main class as it is simpler and easier to test than Socket. +class TestNodeIPC(object): + __test__ = True + class UnstableNode(Node): + def __init__(self, name): + super(TestNodeIPC.UnstableNode, self).__init__(name) + self.magic_number = 666 + # TODO : improvement : autodetect class own methods + # TODO : assert static ? + self.provides(self.crash) -@pytest.mark.timeout(5) -def test_node_creation_termination(): - """Checks that a node can be started and shutdown and indicate that it ran successfully""" - n1 = pyzmp.Node() - assert not n1.is_alive() - svc_url = n1.start() - assert n1.is_alive() - assert svc_url - exitcode = n1.shutdown() - assert exitcode == 0 # default node should spin without issues - assert not n1.is_alive() + def crash(self): + 1/0 + def setup_method(self, method): + # services is already setup globally + self.testnode = TestNodeIPC.UnstableNode(name="TestNode") -@pytest.mark.timeout(5) -def test_node_timeout_creation_termination(): - """Checks that a node can be started with timeout and shutdown and indicate that it ran successfully""" - n1 = pyzmp.Node() - assert not n1.is_alive() - svc_url = n1.start(1) - assert svc_url - assert n1.is_alive() - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() + def teardown_method(self, method): + if self.testnode.is_alive(): + self.testnode.shutdown(join=True) + # if it s still alive terminate it. + if self.testnode.is_alive(): + self.testnode.terminate() + # @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) + def test_node_discover(self): + print("\n" + inspect.currentframe().f_code.co_name) + assert not self.testnode.is_alive() -# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) -@pytest.mark.timeout(5) -def test_node_double_creation_termination(): - """Checks that a node can be started twice and shutdown and indicate that it ran successfully""" - n1 = pyzmp.Node() - assert not n1.is_alive() - svc_url1 =n1.start() - assert n1.is_alive() - assert svc_url1 - svc_url2 = n1.start() # this shuts down properly and restart the node - assert n1.is_alive() - assert svc_url2 + print("Discovering Node...") + testnode_client = discover("Test.*") + assert testnode_client is None # node not found until started. - # the node is the same (same id) so we should get same url - assert svc_url1 == svc_url2 + self.testnode.start() + assert self.testnode.is_alive() - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() + print("Discovering Node...") + testnode_client = discover("Test.*") # Note : we should not have to wait here, start() should wait long enough. + assert not testnode_client is None + self.testnode.shutdown() + assert not self.testnode.is_alive() -# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) -@pytest.mark.timeout(5) -def test_node_timeout_double_creation_termination(): - """Checks that a node can be started twice with timeout and shutdown and indicate that it ran successfully""" - n1 = pyzmp.Node() - assert not n1.is_alive() - svc_url1 = n1.start(1) - assert n1.is_alive() - assert svc_url1 + print("Discovering Node...") + testnode_client = discover("Test.*") + assert testnode_client is None # node not found any longer. - svc_url2 = n1.start(1) # this shuts down and restart the node - assert n1.is_alive() - assert svc_url2 + def test_node_crash(self): + print("\n" + inspect.currentframe().f_code.co_name) + assert not self.testnode.is_alive() - # the node is the same (same id) so we should get same url - assert svc_url1 == svc_url2 + self.testnode.start() + assert self.testnode.is_alive() - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() + print("Discovering Node...") + testnode_client = discover("Test.*") # Note : we should not have to wait here, start() should wait long enough. + assert not testnode_client is None + # pick the one + assert len(testnode_client) == 1 + testnode_client = testnode_client[0] -@pytest.mark.timeout(5) -def test_node_creation_double_termination(): - """Checks that a node can be started and shutdown twice and indicate that it ran successfully""" - n1 = pyzmp.Node() - assert not n1.is_alive() + # calling a method dynamically setup to crash the child process + testnode_client.crash() + assert not self.testnode.is_alive() - svc_url1 = n1.start() - assert n1.is_alive() - assert svc_url1 + print("Discovering Node...") + testnode_client = discover("Test.*") + assert testnode_client is None # node not found any longer. - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() - exitcode = n1.shutdown() - assert exitcode == 0 # the exit code is still 0 since we didn't restart... - assert not n1.is_alive() +# test that, after the node started, services are immediately available +# test that, after a node stopped / terminated / crashed, services are not available -@pytest.mark.timeout(5) -def test_node_creation_args(): - """Checks that a node can be passed an argument using inheritance""" - ns = multiprocessing.Manager().Namespace() - ns.arg = 42 - - class TestArgNode(pyzmp.Node): - def update(self, *args, **kwargs): - ns.arg -= args[0] - return ns.arg - - n1 = TestArgNode(args=(ns.arg,)) - assert not n1.is_alive() - svc_url = n1.start() - assert n1.is_alive() - assert svc_url - - # starting and shutdown should at least guarantee ONE call of update function. - - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() - - assert ns.arg == 0 - - -@pytest.mark.timeout(5) -def test_node_creation_args_delegate(): - """Checks that a node can be passed an argument using delegation""" - ns = multiprocessing.Manager().Namespace() - ns.arg = 42 - - def arguser(fortytwo, **kwargs): # kwargs is there to accept extra arguments nicely (timedelta) - ns.arg -= fortytwo - return ns.arg - - n1 = pyzmp.Node(args=(ns.arg,), target=arguser) - assert not n1.is_alive() - svc_url = n1.start() - assert n1.is_alive() - assert svc_url - - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() - - assert ns.arg == 0 - - -@pytest.mark.timeout(5) -def test_node_creation_kwargs(): - """Checks that a node can be passed a keyword argument using inheritance""" - ns = multiprocessing.Manager().Namespace() - ns.kwarg = 42 - - class TestKWArgNode(pyzmp.Node): - def update(self, *args, **kwargs): - ns.kwarg -= kwargs.get('intval') - return ns.kwarg - - n1 = TestKWArgNode(kwargs={'intval': ns.kwarg, }) - assert not n1.is_alive() - svc_url = n1.start() - assert n1.is_alive() - assert svc_url - - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() - - assert ns.kwarg == 0 - - -@pytest.mark.timeout(5) -def test_node_creation_kwargs_delegate(): - """Checks that a node can be passed a keyword argument using delegation""" - ns = multiprocessing.Manager().Namespace() - ns.kwarg = 42 - - def kwarguser(intval, **kwargs): # kwargs is there to accept extra arguments nicely (timedelta) - ns.kwarg -= intval - return ns.kwarg - - n1 = pyzmp.Node(kwargs={'intval': ns.kwarg, }, target=kwarguser) - assert not n1.is_alive() - svc_url = n1.start() - assert n1.is_alive() - assert svc_url - - exitcode = n1.shutdown() - assert exitcode == 0 - assert not n1.is_alive() - - assert ns.kwarg == 0 - - -# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) -@pytest.mark.timeout(5) -def test_node_as_context_manager(): - """Checks that a node can be used as a context manager""" - with pyzmp.Node() as n1: # this will __init__ and __enter__ - assert n1.is_alive() - assert not n1.is_alive() - - -# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) -@pytest.mark.timeout(5) -def test_node_running_as_context_manager(): - """Checks that an already running node can be used as a context manager""" - n1 = pyzmp.Node() - n1.start() - with n1: # hooking to an already started node - # This might restart the node (might be bad but ideally should not matter.) - assert n1.is_alive() - assert not n1.is_alive() - - -def test_update_rate(): - """ - Testing that the update methods get a correct timedelta - """ - # TODO : investigate if node multiprocessing plugin would help simplify this - # playing with list to pass a reference to this - testing_last_update = [time.time()] - testing_time_delta = [] - acceptable_timedelta = [] - - def testing_update(self, timedelta, last_update, time_delta, ok_timedelta): - time_delta.append(time.time() - last_update[-1]) - last_update.append(time.time()) - - # if the time delta measured in test and the one passed as argument differ - # too much, one time, test is failed - if abs(time_delta[-1] - timedelta) > 0.005: - ok_timedelta.append(False) - else: - ok_timedelta.append(True) - - # spin like crazy, loads CPU for a bit, and eventually exits. - # We re here trying to disturb the update rate - while True: - if randint(0, 10000) == 42: - break - - # hack to dynamically change the update method - testing_update_onearg = functools.partial(testing_update, - last_update=testing_last_update, - time_delta=testing_time_delta, - ok_timedelta=acceptable_timedelta) - - n1 = pyzmp.Node() - n1.update = types.MethodType(testing_update_onearg, n1) - - assert not n1.is_alive() - - # Starting the node in the same thread, to be able to test simply by shared memory. - # TODO : A Node that can choose process or thread run ( on start() instead of init() maybe ? ) - runthread = threading.Thread(target=n1.run) - runthread.daemon = True # to kill this when test is finished - runthread.start() - # n1.start() - - # sleep here for a while - time.sleep(10) - - # removing init time only used for delta computation - testing_last_update.pop(0) - # Check time vars modified by update - for i in range(0, len(testing_last_update)): - print("update : {u} | delta: {d} | accept : {a}".format( - u=testing_last_update[i], - d=testing_time_delta[i], - a=acceptable_timedelta[i]) - ) - - assert acceptable_timedelta[i] - - - - - -### TODO : more testing in case of crash in process, exception, signal, etc. if __name__ == '__main__': - - import nose - nose.runmodule() + import pytest + pytest.main(['-s', '-x', __file__]) diff --git a/pyzmp/tests/test_process.py b/pyzmp/tests/test_process.py new file mode 100644 index 0000000..fdd348f --- /dev/null +++ b/pyzmp/tests/test_process.py @@ -0,0 +1,382 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +# To allow python to run these tests as main script +import functools +import inspect +import multiprocessing +import sys +import os +import threading + +import types +from random import randint + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + +import time +import pyzmp + +import pytest +# http://pytest.org/latest/contents.html +# https://github.com/ionelmc/pytest-benchmark + +# TODO : PYPY +# http://pypy.org/ + +# TODO : Test Node exception : correctly transmitted, node still keeps spinning... + + +@pytest.mark.timeout(5) +def test_process_termination(): + """Checks that a node can be shutdown without being started and indicate that it never ran""" + n1 = pyzmp.Process() + assert not n1.is_alive() + exitcode = n1.shutdown() # shutdown should have no effect here (if not started, same as noop ) + assert exitcode is None # exitcode should be None (process didn't start and didn't stop so no exit code) + assert not n1.is_alive() + + +@pytest.mark.timeout(5) +def test_process_creation_termination(): + """Checks that a node can be started and shutdown and indicate that it ran successfully""" + n1 = pyzmp.Process() + assert not n1.is_alive() + svc_url = n1.start() + assert n1.is_alive() + assert svc_url + exitcode = n1.shutdown() + assert exitcode == 0 # default node should spin without issues + assert not n1.is_alive() + + +@pytest.mark.timeout(5) +def test_process_timeout_creation_termination(): + """Checks that a node can be started with timeout and shutdown and indicate that it ran successfully""" + n1 = pyzmp.Process() + assert not n1.is_alive() + svc_url = n1.start(1) + assert svc_url + assert n1.is_alive() + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + +# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) +@pytest.mark.timeout(5) +def test_process_double_creation_termination(): + """Checks that a node can be started twice and shutdown and indicate that it ran successfully""" + n1 = pyzmp.Process() + assert not n1.is_alive() + svc_url1 = n1.start() + assert n1.is_alive() + assert svc_url1 + svc_url2 = n1.start() # this shuts down properly and restart the node + assert n1.is_alive() + assert svc_url2 + + # the node is the same (same id) so we should get same url + assert svc_url1 == svc_url2 + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + +# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) +@pytest.mark.timeout(5) +def test_process_timeout_double_creation_termination(): + """Checks that a node can be started twice with timeout and shutdown and indicate that it ran successfully""" + n1 = pyzmp.Process() + assert not n1.is_alive() + svc_url1 = n1.start(1) + assert n1.is_alive() + assert svc_url1 + + svc_url2 = n1.start(1) # this shuts down and restart the node + assert n1.is_alive() + assert svc_url2 + + # the node is the same (same id) so we should get same url + assert svc_url1 == svc_url2 + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + +@pytest.mark.timeout(5) +def test_process_creation_double_termination(): + """Checks that a node can be started and shutdown twice and indicate that it ran successfully""" + n1 = pyzmp.Process() + assert not n1.is_alive() + + svc_url1 = n1.start() + assert n1.is_alive() + assert svc_url1 + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + exitcode = n1.shutdown() + assert exitcode == 0 # the exit code is still 0 since we didn't restart... + assert not n1.is_alive() + + +@pytest.mark.timeout(5) +def test_process_creation_args(): + """Checks that a node can be passed an argument using inheritance""" + ns = multiprocessing.Manager().Namespace() + ns.arg = 42 + + class TestArgNode(pyzmp.Process): + def target(self, *args): + ns.arg -= args[0] + return ns.arg + + n1 = TestArgNode(args=(ns.arg,)) + assert not n1.is_alive() + svc_url = n1.start() + # update might not have been called yet + assert n1.is_alive() + assert svc_url + + # starting and shutdown should at least guarantee ONE call of update function. + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + assert ns.arg == 0 + + +@pytest.mark.timeout(5) +def test_process_creation_args_delegate(): + """Checks that a node can be passed an argument using delegation""" + ns = multiprocessing.Manager().Namespace() + ns.arg = 42 + + def arguser(fortytwo, **kwargs): # kwargs is there to accept extra arguments nicely (timedelta) + ns.arg -= fortytwo + return ns.arg + + n1 = pyzmp.Process(args=(ns.arg,), target_override=arguser) + assert not n1.is_alive() + svc_url = n1.start() + assert n1.is_alive() + assert svc_url + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + assert ns.arg == 0 + + +@pytest.mark.timeout(5) +def test_process_creation_kwargs(): + """Checks that a node can be passed a keyword argument using inheritance""" + ns = multiprocessing.Manager().Namespace() + ns.kwarg = 42 + + class TestKWArgNode(pyzmp.Process): + def target(self, *args, **kwargs): + ns.kwarg -= kwargs.get('intval') + return ns.kwarg + + n1 = TestKWArgNode(kwargs={'intval': ns.kwarg, }) + assert not n1.is_alive() + svc_url = n1.start() + assert n1.is_alive() + assert svc_url + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + assert ns.kwarg == 0 + + +@pytest.mark.timeout(5) +def test_process_creation_kwargs_delegate(): + """Checks that a node can be passed a keyword argument using delegation""" + ns = multiprocessing.Manager().Namespace() + ns.kwarg = 42 + + def kwarguser(intval, **kwargs): # kwargs is there to accept extra arguments nicely (timedelta) + ns.kwarg -= intval + return ns.kwarg + + n1 = pyzmp.Process(kwargs={'intval': ns.kwarg, }, target_override=kwarguser) + assert not n1.is_alive() + svc_url = n1.start() + assert n1.is_alive() + assert svc_url + + exitcode = n1.shutdown() + assert exitcode == 0 + assert not n1.is_alive() + + assert ns.kwarg == 0 + + +# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) +@pytest.mark.timeout(5) +def test_process_as_context_manager(): + """Checks that a node can be used as a context manager""" + with pyzmp.Process() as n1: # this will __init__ and __enter__ + assert n1.is_alive() + assert not n1.is_alive() + + +# @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) +@pytest.mark.timeout(5) +def test_process_running_as_context_manager(): + """Checks that an already running node can be used as a context manager""" + n1 = pyzmp.Process() + n1.start() + with n1: # hooking to an already started node + # This might restart the node (might be bad but ideally should not matter.) + assert n1.is_alive() + assert not n1.is_alive() + + +# Process as fixture to guarantee cleanup +class TestProc(object): + __test__ = True + + def setup_method(self, method): + # services is already setup globally + self.testproc = pyzmp.Process(name="TestProcess") + + def teardown_method(self, method): + if self.testproc.is_alive(): + self.testproc.shutdown(join=True) + # if it s still alive terminate it. + if self.testproc.is_alive(): + self.testproc.terminate() + + # @nose.SkipTest # to help debugging ( FIXME : how to programmatically start only one test - maybe in fixture - ? ) + def test_process_discover(self): + print("\n" + inspect.currentframe().f_code.co_name) + assert not self.testproc.is_alive() + + print("Discovering Node...") + testproc_client = pyzmp.Process.discover("Test.*") + assert testproc_client is None # node not found until started. + + self.testproc.start() + assert self.testproc.is_alive() + + print("Discovering Node...") + testproc_client = pyzmp.Process.discover("Test.*") # Note : we should not have to wait here, start() should wait long enough. + assert not testproc_client is None + + self.testproc.shutdown() + assert not self.testproc.is_alive() + + print("Discovering Node...") + testproc_client = pyzmp.Process.discover("Test.*") + assert testproc_client is None # node not found any longer. + + + def test_process_crash(self): + print("\n" + inspect.currentframe().f_code.co_name) + assert not self.testproc.is_alive() + + self.testproc.start() + assert self.testproc.is_alive() + + print("Discovering Node...") + testproc_clients = pyzmp.Process.discover("Test.*") # Note : we should not have to wait here, start() should wait long enough. + assert not testproc_clients is None + assert len(testproc_clients) == 1 + + # pick the one + testproc_client = testproc_clients.get("TestProcess") + + # sending a signal to kill the child process + self.testproc.terminate() + # TODO : handle all kinds of ways to do that... + + # dies immediately + assert self.testproc.is_alive() + + # but nothing is cleaned up (finally context managers, etc. are not cleaning) + while "TestProcess" in pyzmp.Process.discover("Test.*"): + time.sleep(0.5) + + #TODO : wait a bit (less than gossip period) until processor is not found any more... + print("Discovering Node...") + testproc_client = pyzmp.Process.discover("Test.*") + assert testproc_client is None # node not found any longer. + + +def test_update_rate(): + """ + Testing that the update methods get a correct timedelta + """ + # TODO : investigate if node multiprocessing plugin would help simplify this + # playing with list to pass a reference to this + testing_last_update = [time.time()] + testing_time_delta = [] + acceptable_timedelta = [] + + def testing_update(self, timedelta, last_update, time_delta, ok_timedelta): + time_delta.append(time.time() - last_update[-1]) + last_update.append(time.time()) + + # if the time delta measured in test and the one passed as argument differ + # too much, one time, test is failed + if abs(time_delta[-1] - timedelta) > 0.005: + ok_timedelta.append(False) + else: + ok_timedelta.append(True) + + # spin like crazy, loads CPU for a bit, and eventually exits. + # We re here trying to disturb the update rate + while True: + if randint(0, 10000) == 42: + break + + # hack to dynamically change the update method + testing_update_onearg = functools.partial(testing_update, + last_update=testing_last_update, + time_delta=testing_time_delta, + ok_timedelta=acceptable_timedelta) + + n1 = pyzmp.Process() + n1.update = types.MethodType(testing_update_onearg, n1) + + assert not n1.is_alive() + + # Starting the node in the same thread, to be able to test simply by shared memory. + # TODO : A Node that can choose process or thread run ( on start() instead of init() maybe ? ) + runthread = threading.Thread(target=n1.run) + runthread.daemon = True # to kill this when test is finished + runthread.start() + # n1.start() + + # sleep here for a while + time.sleep(10) + + # removing init time only used for delta computation + testing_last_update.pop(0) + # Check time vars modified by update + for i in range(0, len(testing_last_update)): + print("update : {u} | delta: {d} | accept : {a}".format( + u=testing_last_update[i], + d=testing_time_delta[i], + a=acceptable_timedelta[i]) + ) + + assert acceptable_timedelta[i] + + + +### TODO : more testing in case of crash in process, exception, signal, etc. + +if __name__ == '__main__': + import pytest + pytest.main(['-s', '-x', __file__]) diff --git a/pyzmp/tests/test_process_observer.py b/pyzmp/tests/test_process_observer.py new file mode 100644 index 0000000..0746879 --- /dev/null +++ b/pyzmp/tests/test_process_observer.py @@ -0,0 +1,265 @@ +from __future__ import absolute_import, division, print_function + +import threading + +import os +import sys +import io + +if os.name == 'posix' and sys.version_info[0] < 3: + import subprocess32 as subprocess +else: + import subprocess + +import psutil +import ptyprocess +import pexpect.popen_spawn +from pyzmp.process_observer import ProcessWatcher + +# Here we test basic process observer behavior, with a bunch of different ways to start and control a process + + +class TestSubprocessObserver(object): + __test__ = True + + def setup_method(self, method): + """Emergency cleanup if something happened and the process from a previous test is still there""" + if hasattr(self, 'testproc'): + if self.testproc.poll(): + self.testproc.terminate() + # if it s still alive, just terminate it. + if self.testproc.poll(): + self.testproc.kill() + if hasattr(self, 'testobserver'): + self.testobserver = None + + def test_start_once_detect(self): + # setup the event to be able to come back from the callback (potentially in another thread) + detected = threading.Event() + + def set_detected(): + detected.set() + + # setup the observer + self.testobserver = ProcessWatcher(out_watchers={ + 'test_string': set_detected + }) + + # start the process, using watcher buffered pipes. + self.testproc = subprocess.Popen(["/bin/echo", "test_string"], stdin=self.testobserver.pty_master, stdout=self.testobserver.pty_slave, stderr=subprocess.STDOUT, start_new_session=True) + # this does not last long enough for attach to grab anything. + #self.testobserver.attach(self.testproc) + + # check that we get some output + assert detected.wait(timeout=5) + + # basic checks + assert self.testobserver.ppid() == os.getpid() + + def test_start_once_crash(self): + # setup the event to be able to come back from the callback (potentially in another thread) + detected = threading.Event() + + def set_detected(): + detected.set() + + # setup the observer + self.testobserver = ProcessObserver(err_watchers={ + '"/bin/cat: no_filename_like_this: No such file or directory': set_detected + }) + + # start the process + self.testproc = subprocess.Popen(["/bin/cat", "no_filename_like_this"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True) + self.testobserver.attach(self.testproc) + + # basic checks + assert self.testobserver.ppid() == os.getpid() + + # check that we get some error + assert detected.wait(timeout=5) + + # def test_start_forever_shutdown(self): + # # We are using ed as a long running process to interact with + # + # self.testproc = pexpect.spawn("/bin/ed -p\*") # we need to use pexpect to manage interactive programs via a terminal + # #self.testproc = pexpect.popen_spawn.PopenSpawn(["ed", "-p\*"]) + # #self.testproc = subprocess.Popen(["ed", "-p\*"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True) + # + # # we can branch the observer onto stdout and stderr. + # self.testobserver = ProcessObserver.from_pexpect(self.testproc) + # + # # basic checks + # assert self.testobserver.ppid() == os.getpid() + # + # assert self.testobserver.is_running() + # + # try: + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write('H\n') + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write('a\n') + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write("some test string\n") + # self.testproc.write(".\n") + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write("p\n") + # self.testobserver.expect("\\some test string\n", timeout=5) + # + # self.testproc.write("Q\n") + # self.testobserver.expect("", timeout=5) + # + # except: # something went wrong + # print("Exception was thrown") + # print("debug information:") + # print(str(self.testproc)) + # # print("stdout:") + # # while not self.testobserver._expect_out.eof(): + # # print(self.testobserver._expect_out.readline()) + # # print("stderr:") + # # while not self.testobserver.expecterr.eof(): + # # print(self.testobserver.expecterr.readline()) + # raise + # + # self.testobserver.expect("") + # self.testobserver.expect("") + # + # # pipes are working well, time to shutdown + # pass + # + # + # def test_start_forever_terminate(self): + # self.testproc = psutil.Popen(["ed"],) + # self.testobserver = ProcessObserver(self.testproc.pid) + # + # while self.testobserver.is_running(): + # self.testobserver.monitor() + + +class TestPtyprocessObserver(object): + __test__ = True + + def setup_method(self, method): + """Emergency cleanup if something happened and the process from a previous test is still there""" + if hasattr(self, 'testproc'): + if self.testproc.poll(): + self.testproc.terminate() + # if it s still alive, just terminate it. + if self.testproc.poll(): + self.testproc.kill() + if hasattr(self, 'testobserver'): + self.testobserver = None + + def test_start_once(self): + # setup the event to be able to come back from the callback (potentially in another thread) + detected = threading.Event() + + def set_detected(): + detected.set() + + # setup the observer + self.testobserver = ProcessObserver(out_watchers={ + 'test_string': set_detected + }) + + # start the process + self.testproc = ptyprocess.PtyProcess.spawn(["/bin/echo", "test_string"], cwd=None, env=None, echo=True, preexec_fn=None, dimensions=(24, 80)) + self.testobserver.attach(self.testproc) + + # basic checks + assert self.testobserver.ppid() == os.getpid() + + assert detected.wait(timeout=5) + + def test_start_once_crash(self): + # setup the event to be able to come back from the callback (potentially in another thread) + detected = threading.Event() + + def set_detected(): + detected.set() + + # setup the observer + self.testobserver = ProcessObserver(err_watchers={ + '"/bin/cat: no_filename_like_this: No such file or directory': set_detected + }) + + # start the process + self.testproc = ptyprocess.PtyProcess.spawn(["/bin/cat", "no_filename_like_this"], cwd=None, env=None, echo=True, preexec_fn=None, dimensions=(24, 80)) + self.testobserver = ProcessObserver(self.testproc) + + # basic checks + assert self.testobserver.ppid() == os.getpid() + + assert detected.wait(timeout=5) + + # def test_start_forever_shutdown(self): + # # We are using ed as a long running process to interact with + # + # self.testproc = pexpect.spawn("/bin/ed -p\*") # we need to use pexpect to manage interactive programs via a terminal + # #self.testproc = pexpect.popen_spawn.PopenSpawn(["ed", "-p\*"]) + # #self.testproc = subprocess.Popen(["ed", "-p\*"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True) + # + # # we can branch the observer onto stdout and stderr. + # self.testobserver = ProcessObserver.from_pexpect(self.testproc) + # + # # basic checks + # assert self.testobserver.ppid() == os.getpid() + # + # assert self.testobserver.is_running() + # + # try: + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write('H\n') + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write('a\n') + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write("some test string\n") + # self.testproc.write(".\n") + # self.testobserver.expect("\\*", timeout=5) + # + # self.testproc.write("p\n") + # self.testobserver.expect("\\some test string\n", timeout=5) + # + # self.testproc.write("Q\n") + # self.testobserver.expect("", timeout=5) + # + # except: # something went wrong + # print("Exception was thrown") + # print("debug information:") + # print(str(self.testproc)) + # # print("stdout:") + # # while not self.testobserver._expect_out.eof(): + # # print(self.testobserver._expect_out.readline()) + # # print("stderr:") + # # while not self.testobserver.expecterr.eof(): + # # print(self.testobserver.expecterr.readline()) + # raise + # + # self.testobserver.expect("") + # self.testobserver.expect("") + # + # # pipes are working well, time to shutdown + # pass + # + # + # def test_start_forever_terminate(self): + # self.testproc = psutil.Popen(["ed"],) + # self.testobserver = ProcessObserver(self.testproc.pid) + # + # while self.testobserver.is_running(): + # self.testobserver.monitor() + + +# class TestPexpectObserver(object): +# class TestMultiprocessObserver(object): + + +if __name__ == '__main__': + import pytest + pytest.main(['-s', '-x', __file__]) diff --git a/pyzmp/tests/test_registry.py b/pyzmp/tests/test_registry.py new file mode 100644 index 0000000..d5ddd50 --- /dev/null +++ b/pyzmp/tests/test_registry.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import, division, print_function + +import pytest + +from pyzmp.registry import FileBasedRegistry + +# Testing operation combinations that make sense + + +class TestFileBasedRegistry(object): + + def setup_method(self, method): + self.attr_reg = FileBasedRegistry("myattr") + + def teardown_method(self, method): + # cleaning up what might exists + for a in self.attr_reg: + self.attr_reg.pop(a) + + def test_store_erase(self): + + with pytest.raises(KeyError) as e_info: + self.attr_reg["myid"] + + self.attr_reg["myid"] = 42 + + assert self.attr_reg["myid"] == 42 + + self.attr_reg.pop("myid") + + with pytest.raises(KeyError) as e_info: + self.attr_reg["myid"] + + + +if __name__ == '__main__': + pytest.main(['-s', '-x', __file__]) diff --git a/pyzmp/topic.py b/pyzmp/topic.py deleted file mode 100644 index 5525feb..0000000 --- a/pyzmp/topic.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import - -from collections import namedtuple - - -""" -Protocol allowing dynamic specification of message format -""" -# CAREFUL : topic might not be a complete self sufficient concept ( like service ) -# TODO : study PIPELINE ( and others ) from zmq -# GOAL : find concept that allow like service ( remote version of funciton call ) but with inverted control flow ( callback ) - -def gen_msg_type(self, name, **kwargs): - return namedtuple(name, **kwargs) - - - - -class Topic(object): - - def __init__(self, name): - self.name = name - self.cur_msg = None - - def publish(self, msg): - self.cur_msg = msg - return True - - - diff --git a/setup.py b/setup.py index 1c25303..5c3a831 100644 --- a/setup.py +++ b/setup.py @@ -6,10 +6,11 @@ import importlib import setuptools +import runpy # Ref : https://packaging.python.org/single_source_version/#single-sourcing-the-version -with open('pyzmp/_version.py') as vf: - exec(vf.read()) +version = runpy.run_path('pyzmp/_version.py') +__version__ = version.get('__version__') # Best Flow : # Clean previous build & dist @@ -43,7 +44,7 @@ def run(self): # $ gitchangelog >CHANGELOG.rst # $ git commit CHANGELOG.rst -m "updating changelog" # change version in code and changelog - subprocess.check_call("git commit CHANGELOG.rst pyros/_version.py -m 'v{0}'".format(__version__), shell=True) + subprocess.check_call("git commit CHANGELOG.rst pyzmp/_version.py -m 'v{0}'".format(__version__), shell=True) subprocess.check_call("git push", shell=True) print("You should verify travis checks, and you can publish this release with :") @@ -181,13 +182,15 @@ def run(self): 'tblib', # this might not always install six (latest version does not) 'six', 'pyzmq', + 'pyyaml', + 'psutil', 'pytest-timeout', # Careful : upon install plugins can be resolved instead of core pytest package # => pytest should be listed last here... - 'pytest>=2.9.1', # since tests are embedded in package + 'pytest>=2.5.1', # since tests are embedded in package ], setup_requires=['pytest-runner'], - tests_require=['pytest>=2.9.1'], + tests_require=['pytest>=2.5.1'], cmdclass={ 'rosdevelop': RosDevelopCommand, 'prepare_release': PrepareReleaseCommand, diff --git a/subproc/__init__.py b/subproc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/subproc/echo_server.py b/subproc/echo_server.py new file mode 100644 index 0000000..6783966 --- /dev/null +++ b/subproc/echo_server.py @@ -0,0 +1,48 @@ +import asyncio +import logging +import sys + +SERVER_ADDRESS = ('localhost', 10000) +logging.basicConfig( + level=logging.DEBUG, + format='%(name)s: %(message)s', + stream=sys.stderr, +) +log = logging.getLogger('main') + +event_loop = asyncio.get_event_loop() + +async def echo(reader, writer): + address = writer.get_extra_info('peername') + log = logging.getLogger('echo_{}_{}'.format(*address)) + log.debug('connection accepted') + while True: + data = await reader.read(128) + if data: + log.debug('received {!r}'.format(data)) + writer.write(data) + await writer.drain() + log.debug('sent {!r}'.format(data)) + + else: + log.debug('closing') + writer.close() + return + +# Create the server and let the loop finish the coroutine before +# starting the real event loop. +factory = asyncio.start_server(echo, *SERVER_ADDRESS) +server = event_loop.run_until_complete(factory) +log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS)) + +# Enter the event loop permanently to handle all connections. +try: + event_loop.run_forever() +except KeyboardInterrupt: + pass +finally: + log.debug('closing server') + server.close() + event_loop.run_until_complete(server.wait_closed()) + log.debug('closing event loop') + event_loop.close() \ No newline at end of file diff --git a/subproc/protocol.py b/subproc/protocol.py new file mode 100644 index 0000000..d30f92f --- /dev/null +++ b/subproc/protocol.py @@ -0,0 +1,3 @@ + + +class Protocol(object): diff --git a/subproc/subprocess.py b/subproc/subprocess.py new file mode 100644 index 0000000..ea12d6d --- /dev/null +++ b/subproc/subprocess.py @@ -0,0 +1,90 @@ + +import asyncio.subprocess + + +class SubProcess(): + """ + Class wrapping : + - asyncio.process (for async code), + - subprocess.Popen (for bwcompat with non async code - py2.7) + """ + # TODO : attach to an existing process ??? is it doable at this level or we need to go higher (stream and sockets ?) + + + def __init__(self): + pass + + def send_signal(self): + pass + + def terminate(self): + pass + + def kill(self): + pass + + + # async interface + async def wait(self): + await pass + + async def communicate(self): + await pass + + + +async def create_subprocess_shell(*args, stdin=None, stdout=None, stderr=None, loop=None, limit=None, **kwds): + await asyncio.create_subprocess_shell(*args, stdin=stdin, stdout=stdout, stderr=stderr, loop=loop, limit=limit, **kwds) + +async def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None, loop=None, limit=None, **kwds): + await asyncio.create_subprocess_exec(*args, stdin=stdin, stdout=stdout, stderr=stderr, loop=loop, limit=limit, **kwds) + + +# Simple example code +if __name__ == "__main__": + + testproc = create_subprocess_exec(["ed", "-p\*"]) + + try: + self.testobserver.expect("\\*", timeout=5) + + self.testproc.write('H\n') + self.testobserver.expect("\\*", timeout=5) + + self.testproc.write('a\n') + self.testobserver.expect("\\*", timeout=5) + + self.testproc.write("some test string\n") + self.testproc.write(".\n") + self.testobserver.expect("\\*", timeout=5) + + self.testproc.write("p\n") + self.testobserver.expect("\\some test string\n", timeout=5) + + self.testproc.write("Q\n") + self.testobserver.expect("", timeout=5) + + except: # something went wrong + print("Exception was thrown") + print("debug information:") + print(str(self.testproc)) + # print("stdout:") + # while not self.testobserver._expect_out.eof(): + # print(self.testobserver._expect_out.readline()) + # print("stderr:") + # while not self.testobserver.expecterr.eof(): + # print(self.testobserver.expecterr.readline()) + raise + + self.testobserver.expect("") + self.testobserver.expect("") + + if sys.platform == "win32": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + + date = loop.run_until_complete(get_date(loop)) + print("Current date: %s" % date) + loop.close() diff --git a/subproc/subprocess_logging_adapter.py b/subproc/subprocess_logging_adapter.py new file mode 100644 index 0000000..e69de29 diff --git a/subproc/subprocess_protocol.py b/subproc/subprocess_protocol.py new file mode 100644 index 0000000..f73bcd7 --- /dev/null +++ b/subproc/subprocess_protocol.py @@ -0,0 +1,171 @@ +import re +import signal + +import six + +try: + import asyncio +except ImportError: + import trollius as asyncio + +import logging.handlers + +""" +This is a text-based (ASCII) protocol, for communicating and synchronizing between processes, through pipes. +The goal is to synchronize process startup / shutdown behavior to be able to implement synchronization at a higher level... + +Note this does the same job as what is found in systemd, upstart, openrc, etc. +See https://en.wikipedia.org/wiki/Init +TODO : compatibility with those, eventually + +""" + +STARTED_FMT = "-STARTED {pid}-" # string to denote startup +STARTED_REGEX = ".*-STARTED (.+?)-" # regex to extract pid +SHUTDOWN_FMT = "-SHUTDOWN {exit_code}-" # string to denote shutdown +SHUTDOWN_REGEX = ".*-SHUTDOWN (.+?)-" # regex to extract exit code + + +class SubprocessProtocol(asyncio.SubprocessProtocol): + + def __init__(self, exit_future, loop=None): + self._logger = logging.getLogger(__name__) + self.transport = None # no connection made yet + self.loop = loop or asyncio.get_event_loop() + self.exit_future = exit_future + self.output = bytearray() # TODO : Change that into a logger to allow parent to redirect output (but not err !) + self.available = False + + def connection_made(self, transport): + """Called when a connection is made. + + The argument is the transport representing the pipe connection. + To receive data, wait for data_received() calls. + When the connection is closed, connection_lost() is called. + """ + self.transport = transport + self._logger.debug("connection made") + + asyncio.run_coroutine_threadsafe(self.on_connected(), self.loop) + + def connection_lost(self, exc): + """Called when the connection is lost or closed. + + The argument is an exception object or None (the latter + meaning a regular EOF is received or the connection was + aborted or closed). + """ + + self.transport = None + if exc is None: # EOF or aborted + pass + else: + self._logger.debug("connection lost") + + asyncio.run_coroutine_threadsafe(self.on_disconnected(), self.loop) + + def pause_writing(self): + """Called when the transport's buffer goes over the high-water mark. + + Pause and resume calls are paired -- pause_writing() is called + once when the buffer goes strictly over the high-water mark + (even if subsequent writes increases the buffer size even + more), and eventually resume_writing() is called once when the + buffer size reaches the low-water mark. + + Note that if the buffer size equals the high-water mark, + pause_writing() is not called -- it must go strictly over. + Conversely, resume_writing() is called when the buffer size is + equal or lower than the low-water mark. These end conditions + are important to ensure that things go as expected when either + mark is zero. + + NOTE: This is the only Protocol callback that is not called + through EventLoop.call_soon() -- if it were, it would have no + effect when it's most needed (when the app keeps writing + without yielding until pause_writing() is called). + """ + self._logger.debug("pause writing") + + def resume_writing(self): + """Called when the transport's buffer drains below the low-water mark. + + See pause_writing() for details. + """ + self._logger.debug("resume writing") + + """Interface for protocol for subprocess calls.""" + def pipe_data_received(self, fd, data): + """Called when the subprocess writes data into stdout/stderr pipe. + + fd is int file descriptor. + data is bytes object. + """ + + self._logger.debug("pipe data received") + + self.output.extend(data) + + if not self.available and re.match(STARTED_REGEX, data): # startup sequence has finished + self.available = True + pid = self.extract_pid(data) # CAREFUL with intermediary processes (shells especially), and how they forward signals... + asyncio.run_coroutine_threadsafe(self.on_started(), self.loop) + # elif data.endswith("STOPPED"): # process has been stopped (received SIGSTOP / Ctrl^Z) + # + # elif data.endswith("RESUMED"): # process has been stopped (received SIGCONT) + + if self.available and re.match(SHUTDOWN_REGEX, data): # shutdown sequence has been initiated. (received SIGTERM or normal shutdown) + # CAREFUL : by design, this is an optimization and should not be necessary for the system to keep working. + exit_code = self.extract_exit_code(data) # => other processes must not rely on exit code. Required messages must be propagated at a higher level... + asyncio.run_coroutine_threadsafe(self.on_shutdown(exit_code), self.loop) + self.available = False + + @asyncio.coroutine + def on_connected(self): + raise NotImplemented + + @asyncio.coroutine + def on_disconnected(self): + raise NotImplemented + + @asyncio.coroutine + def on_started(self): + raise NotImplemented + + @asyncio.coroutine + def on_shutdown(self): + raise NotImplemented + + @asyncio.coroutine + def on_exited(self): + raise NotImplemented + + def extract_pid(self, started_str): + try: + pid = re.search(STARTED_REGEX, started_str).group(1) + return pid + except AttributeError: + return None + + def extract_exit_code(self, shutdown_str): + try: + exit_code = re.search(SHUTDOWN_REGEX, shutdown_str).group(1) + return exit_code + except AttributeError: + return None + + + def pipe_connection_lost(self, fd, exc): + """Called when a file descriptor associated with the child process is + closed. + + fd is the int file descriptor that was closed. + """ + self._logger.debug('pipe connection lost') + + def process_exited(self): + """Called when subprocess has exited.""" + self._logger.debug('process exited') + asyncio.run_coroutine_threadsafe(self.on_exited(), self.loop) + self.exit_future.set_result(True) + diff --git a/subproc/subprocess_protocol_implement.py b/subproc/subprocess_protocol_implement.py new file mode 100644 index 0000000..6bc0241 --- /dev/null +++ b/subproc/subprocess_protocol_implement.py @@ -0,0 +1,86 @@ +import os +import re +import signal + +import six + +try: + import asyncio +except ImportError: + import trollius as asyncio + +import logging.handlers + +""" +This is a text-based (ASCII) protocol, for communicating and synchronizing between processes, through pipes. +The goal is to synchronize process startup / shutdown behavior to be able to implement synchronization at a higher level... + +Note this does the same job as what is found in systemd, upstart, openrc, etc. +See https://en.wikipedia.org/wiki/Init +TODO : compatibility with those, eventually + +""" + + +from .subprocess_protocol import STARTED_FMT, SHUTDOWN_FMT + + +class SubprocessProtocolImplement(object): + """helper for a process to implement the protocol. + """ + + # Ref : https://stackoverflow.com/questions/17558552/how-do-i-add-custom-field-to-python-log-format-string + # to add extra fields to the log format... + + def __init__(self, logger): + self.logger = logger + self.prepend_format_str = '[%(process)d %(threadName)s %(relativeCreated)d]' + + ensure_console = False + ensure_syslog = False + for h in self.logger.handlers: + if isinstance(h, logging.StreamHandler): # we have one stream handler + ensure_console = True + # We just modify the existing format (no API for this ?) + h.setFormatter(logging.Formatter(fmt=self.prepend_format_str + h.formatter._fmt, datefmt=h.formatter.datefmt, style=h.formatter._style)) + + if isinstance(h, logging.handlers.SysLogHandler): # we have one syslog handler + ensure_syslog = True + # We just modify the existing format (no API for this ?) + h.setFormatter(logging.Formatter(fmt=self.prepend_format_str + h.formatter._fmt, datefmt=h.formatter.datefmt, style=h.formatter._style)) + + # enforce minimum handlers + if not ensure_console: + # configure stream handler + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(logging.Formatter(self.prepend_format_str + logging.BASIC_FORMAT)) + self.logger.addHandler(consoleHandler) + + if not ensure_syslog: + # configure syslog handler + # TODO : add syslog config in /etc/rsyslog.d/ on install/startup... + syslogHandler = logging.handlers.SysLogHandler(address='/dev/log') + syslogHandler.setFormatter(logging.Formatter(self.prepend_format_str + ' %(filename)s:%(lineno)d ' + logging.BASIC_FORMAT)) + self.logger.addHandler(syslogHandler) + + if not self.logger.isEnabledFor(logging.INFO): # we need to be enabled at minimum for info + self.logger.setLevel(logging.INFO) + + def started_event(self): + """Needs to be called after startup, once all initialization has been done. + """ + self.logger.info(STARTED_FMT.format(pid=os.getpid())) + + def shutdown_event(self, exit_code): + """Needs to be called before shutdown, before cleaning up. + """ + if exit_code < 0: + for s in signal.Signals: + if s.value == -exit_code: + exit_code = s.name + break # we found it and changed exit_code to str. + + self.logger.info(SHUTDOWN_FMT.format(exit_code=exit_code)) + + + diff --git a/subproc/subprocess_transport.py b/subproc/subprocess_transport.py new file mode 100644 index 0000000..2173895 --- /dev/null +++ b/subproc/subprocess_transport.py @@ -0,0 +1,56 @@ +try: + import asyncio +except ImportError: + import trollius as asyncio + + +class SubprocessTransport(asyncio.SubprocessTransport): + + def get_pid(self): + """Get subprocess id.""" + raise NotImplementedError + + def get_returncode(self): + """Get subprocess returncode. + + See also + http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode + """ + raise NotImplementedError + + def get_pipe_transport(self, fd): + """Get transport for pipe with number fd.""" + raise NotImplementedError + + def send_signal(self, signal): + """Send signal to subprocess. + + See also: + docs.python.org/3/library/subprocess#subprocess.Popen.send_signal + """ + raise NotImplementedError + + def terminate(self): + """Stop the subprocess. + + Alias for close() method. + + On Posix OSs the method sends SIGTERM to the subprocess. + On Windows the Win32 API function TerminateProcess() + is called to stop the subprocess. + + See also: + http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate + """ + raise NotImplementedError + + def kill(self): + """Kill the subprocess. + + On Posix OSs the function sends SIGKILL to the subprocess. + On Windows kill() is an alias for terminate(). + + See also: + http://docs.python.org/3/library/subprocess#subprocess.Popen.kill + """ + raise NotImplementedError diff --git a/subproc/tests/__init__.py b/subproc/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/subproc/tests/minimal_long_process.py b/subproc/tests/minimal_long_process.py new file mode 100644 index 0000000..b53f57b --- /dev/null +++ b/subproc/tests/minimal_long_process.py @@ -0,0 +1,39 @@ +import signal +import os +import sys +import time + +import logging + +from subproc.subprocess_protocol_implement import SubprocessProtocolImplement + +protocol = SubprocessProtocolImplement(logging.getLogger(__name__)) + +protocol.started_event() +#print('{0} -STARTED-'.format(os.getpid())) + + +def shutdown(signum, frame=None): + protocol.shutdown_event(exit_code=-signum) + #print('{signum} triggered -SHUTDOWN-'.format(**locals())) + sys.stdout.flush() + sys.exit(-signum) # using a convention already seen somewhere else... (REF ?) + +signal.signal(signal.SIGTERM, shutdown) + +if signal.getsignal(signal.SIGINT) is signal.SIG_IGN: # if sigint is ignored (python didnt setup default_int_handler) + signal.signal(signal.SIGINT, shutdown) # to get Ctrl-C trigger shutdown, even in child process + +# NOTE : SIGKILL cannot be caught + +try: + # do not block for ever, so that, in case something is broken in test, we can see it. + time.sleep(10) +except KeyboardInterrupt as ki: + # CAREFUL this is not caught in child process (only top parent), but SIGINT will still be triggered. + # REF : https://stackoverflow.com/questions/40775054/capturing-sigint-using-keyboardinterrupt-exception-works-in-terminal-not-in-scr/40785230#40785230 + print('KeyboardInterrupt') + shutdown(signal.SIGINT) + +shutdown(0) + diff --git a/subproc/tests/minimal_short_process.py b/subproc/tests/minimal_short_process.py new file mode 100644 index 0000000..8338608 --- /dev/null +++ b/subproc/tests/minimal_short_process.py @@ -0,0 +1,22 @@ +import signal +import os + +import logging + +from subproc.subprocess_protocol_implement import SubprocessProtocolImplement + +protocol = SubprocessProtocolImplement(logging.getLogger(__name__)) + +protocol.started_event() +#print('{0} -STARTED-'.format(os.getpid())) + + +def shutdown(signum, frame): + protocol.shutdown_event(exit_code=-signum) + #print('{signum} triggered -SHUTDOWN-'.format(**locals())) + +signal.signal(signal.SIGTERM, shutdown) + +# do not do anything to exit immediately +protocol.shutdown_event(0) + diff --git a/subproc/tests/prereq/__init__.py b/subproc/tests/prereq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/subproc/tests/prereq/test_subprocess_protocol.py b/subproc/tests/prereq/test_subprocess_protocol.py new file mode 100644 index 0000000..63b1648 --- /dev/null +++ b/subproc/tests/prereq/test_subprocess_protocol.py @@ -0,0 +1,167 @@ + +import sys + +import pytest + +""" +Test different version of async framework to verify our original assumptions +""" + + +@pytest.mark.skipif(sys.version_info < (2, 7), reason="requires python2.7 minimum") +@pytest.mark.skipif(sys.version_info >= (3, 4), reason="can do better for python 3.4 and up") +def test_subprocess_protocol_2_7(): + """ + Ref : https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process + Adapted for syncio for python 2.7 + """ + import trollius + + class DateProtocol(trollius.SubprocessProtocol): + def __init__(self, exit_future): + self.exit_future = exit_future + self.output = bytearray() + + def pipe_data_received(self, fd, data): + self.output.extend(data) + + def process_exited(self): + self.exit_future.set_result(True) + + @trollius.coroutine + def get_date(loop): + code = 'import datetime; print(datetime.datetime.now())' + exit_future = trollius.Future(loop=loop) + + # Create the subprocess controlled by the protocol DateProtocol, + # redirect the standard output into a pipe + create = loop.subprocess_exec(lambda: DateProtocol(exit_future), + sys.executable, '-c', code, + stdin=None, stderr=None) + transport, protocol = yield trollius.From(create) + + # Wait for the subprocess exit using the process_exited() method + # of the protocol + yield trollius.From(exit_future) + + # Close the stdout pipe + transport.close() + + # Read the output which was collected by the pipe_data_received() + # method of the protocol + data = bytes(protocol.output) + return data.decode('ascii').rstrip() + + if sys.platform == "win32": + loop = trollius.ProactorEventLoop() + else: + loop = trollius.new_event_loop() + trollius.set_event_loop(loop) + + date = loop.run_until_complete(get_date(loop)) + print("Current date: %s" % date) + loop.close() + + +@pytest.mark.skipif(sys.version_info < (3, 4), reason="requires python3.4") +def test_subprocess_protocol_3_4(): + import asyncio + + """Ref : https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process""" + class DateProtocol(asyncio.SubprocessProtocol): + def __init__(self, exit_future): + self.exit_future = exit_future + self.output = bytearray() + + def pipe_data_received(self, fd, data): + self.output.extend(data) + + def process_exited(self): + self.exit_future.set_result(True) + + @asyncio.coroutine + def get_date(loop): + code = 'import datetime; print(datetime.datetime.now())' + exit_future = asyncio.Future(loop=loop) + + # Create the subprocess controlled by the protocol DateProtocol, + # redirect the standard output into a pipe + create = loop.subprocess_exec(lambda: DateProtocol(exit_future), + sys.executable, '-c', code, + stdin=None, stderr=None) + transport, protocol = yield from create + + # Wait for the subprocess exit using the process_exited() method + # of the protocol + yield from exit_future + + # Close the stdout pipe + transport.close() + + # Read the output which was collected by the pipe_data_received() + # method of the protocol + data = bytes(protocol.output) + return data.decode('ascii').rstrip() + + if sys.platform == "win32": + loop = asyncio.ProactorEventLoop() + else: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + date = loop.run_until_complete(get_date(loop)) + print("Current date: %s" % date) + loop.close() + + +@pytest.mark.skipif(sys.version_info < (3, 5), reason="requires python3.5") +def test_subprocess_protocol_3_5(): + import asyncio + + """Ref : https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process""" + class DateProtocol(asyncio.SubprocessProtocol): + def __init__(self, exit_future): + self.exit_future = exit_future + self.output = bytearray() + + def pipe_data_received(self, fd, data): + self.output.extend(data) + + def process_exited(self): + self.exit_future.set_result(True) + + async def get_date(loop): + code = 'import datetime; print(datetime.datetime.now())' + exit_future = asyncio.Future(loop=loop) + + # Create the subprocess controlled by the protocol DateProtocol, + # redirect the standard output into a pipe + create = loop.subprocess_exec(lambda: DateProtocol(exit_future), + sys.executable, '-c', code, + stdin=None, stderr=None) + transport, protocol = await create + + # Wait for the subprocess exit using the process_exited() method + # of the protocol + await exit_future + + # Close the stdout pipe + transport.close() + + # Read the output which was collected by the pipe_data_received() + # method of the protocol + data = bytes(protocol.output) + return data.decode('ascii').rstrip() + + if sys.platform == "win32": + loop = asyncio.ProactorEventLoop() + else: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + date = loop.run_until_complete(get_date(loop)) + print("Current date: %s" % date) + loop.close() + +if __name__ == '__main__': + pytest.main(['-s', '-x', __file__]) diff --git a/subproc/tests/test_minimal_long_process.py b/subproc/tests/test_minimal_long_process.py new file mode 100644 index 0000000..f1c7107 --- /dev/null +++ b/subproc/tests/test_minimal_long_process.py @@ -0,0 +1,63 @@ +"""Verify that the minimal_long_process behave as expected in all cases. +This is not related to the protocol, but related to other external/system events (signals, interruptions, etc.) +""" +import os +import signal +import subprocess + +# Ref : https://stackoverflow.com/questions/40775054/capturing-sigint-using-keyboardinterrupt-exception-works-in-terminal-not-in-scr/40785230#40785230 +import threading +import time + +# TODO : change into logging protocol adapter test + +def test_shutdown_triggers_from_main(): + mpath = os.path.join(os.path.dirname(__name__), 'minimal_long_process.py') + proc = subprocess.Popen(['python', mpath], stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + def late_killer(p): + time.sleep(2) # sleep a bit so main task has time to communicate() + p.send_signal(signal.SIGINT) + + t = threading.Thread(target=late_killer, args=(proc,)) + t.start() + + out, _ = proc.communicate() + + assert '-STARTED-'.encode() in out, print(out) + assert '-SHUTDOWN SIGINT-'.encode() in out, print(out) + assert proc.returncode == -2 % 256, print(proc.returncode) # check signal is returned + +def test_shutdown_triggers_from_attached_child(): + mpath = os.path.join(os.path.dirname(__name__), 'minimal_long_process.py') + proc = subprocess.Popen('python '+mpath, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + + def late_killer(p): + time.sleep(2) # sleep a bit so main task has time to communicate() + p.send_signal(signal.SIGINT) + + t = threading.Thread(target=late_killer, args=(proc,)) + t.start() + + out, _ = proc.communicate() + + assert '-STARTED-'.encode() in out, print(out) + assert '-SHUTDOWN SIGINT-'.encode() in out, print(out) + assert proc.returncode == -2 % 256, print(proc.returncode) # check signal is returned + +def test_shutdown_triggers_from_detached_child(): + mpath = os.path.join(os.path.dirname(__name__), 'minimal_long_process.py') + proc = subprocess.Popen('python '+mpath+' &', stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + + def late_killer(p): + time.sleep(2) # sleep a bit so main task has time to communicate() + p.send_signal(signal.SIGINT) + + t = threading.Thread(target=late_killer, args=(proc,)) + t.start() + + out, _ = proc.communicate() + + assert '-STARTED-'.encode() in out, print(out) + assert '-SHUTDOWN SIGINT-'.encode() in out, print(out) + assert proc.returncode == -2 % 256, print(proc.returncode) # check signal is returned \ No newline at end of file diff --git a/subproc/tests/test_subprocess.py b/subproc/tests/test_subprocess.py new file mode 100644 index 0000000..49d5f3b --- /dev/null +++ b/subproc/tests/test_subprocess.py @@ -0,0 +1,68 @@ +import asyncio.subprocess +import sys + +import pytest + + + +@pytest.mark.skipif(sys.version_info < (3,4), reason="requires python3.4") +def test_subprocess_stream_3_4(): + """Ref : https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process""" + @asyncio.coroutine + def get_date(): + code = 'import datetime; print(datetime.datetime.now())' + + # Create the subprocess, redirect the standard output into a pipe + create = asyncio.create_subprocess_exec(sys.executable, '-c', code, + stdout=asyncio.subprocess.PIPE) + proc = yield from create + + # Read one line of output + data = yield from proc.stdout.readline() + line = data.decode('ascii').rstrip() + + # Wait for the subprocess exit + yield from proc.wait() + return line + + if sys.platform == "win32": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + + date = loop.run_until_complete(get_date()) + print("Current date: %s" % date) + loop.close() + +@pytest.mark.skipif(sys.version_info < (3,5), reason="requires python3.5") +def test_subprocess_stream_3_5(): + """Ref : https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process""" + async def get_date(): + code = 'import datetime; print(datetime.datetime.now())' + + # Create the subprocess, redirect the standard output into a pipe + create = asyncio.create_subprocess_exec(sys.executable, '-c', code, + stdout=asyncio.subprocess.PIPE) + proc = await create + + # Read one line of output + data = await proc.stdout.readline() + line = data.decode('ascii').rstrip() + + # Wait for the subprocess exit + await proc.wait() + return line + + if sys.platform == "win32": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + + date = loop.run_until_complete(get_date()) + print("Current date: %s" % date) + loop.close() + +if __name__ == '__main__': + pytest.main(['-s', '-x', __file__]) diff --git a/subproc/tests/test_subprocess_protocol.py b/subproc/tests/test_subprocess_protocol.py new file mode 100644 index 0000000..7c06ef5 --- /dev/null +++ b/subproc/tests/test_subprocess_protocol.py @@ -0,0 +1,126 @@ +import os +import sys +# THis needs to be done early to avoid problem with other async-related import +try: + import asyncio +except ImportError: + import trollius as asyncio + + +import asynctest +import pytest + +from subproc.subprocess_protocol import SubprocessProtocol + +# We need to carefully setup the logging, to make sure we can see the debug logging messages from the protocol. +import logging.config +logging.config.dictConfig({ + 'version': 1, + 'disable_existing_loggers': True, + 'formatters': { + 'pid_aware': { + 'format': '%(name)s %(levelname)s : %(message)s' + }, + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'pid_aware', + } + }, + 'root': { + 'level': 'DEBUG', + 'handlers': ['console'], + }, + 'subproc.subprocess_protocol': { + 'level': 'DEBUG', + 'handlers': ['console'], + 'propagate': True, + }, +}) + +# @pytest.mark.skipif(sys.version_info < (2, 7), reason="requires python2.7 minimum") +# @pytest.mark.skipif(sys.version_info >= (3, 4), reason="can do better for python 3.4 and up") +# def test_subprocess_protocol_2_7(): +# """ +# Ref : https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process +# Adapted for syncio for python 2.7 +# """ +# import trollius + + +@pytest.mark.asyncio +@asyncio.coroutine +def test_minimal_long_process(event_loop): + with open(os.path.join(os.path.dirname(__file__), "minimal_long_process.py"), "r") as min_proc: + code = min_proc.read() # file should be small enough + + exit_future = asyncio.Future(loop=event_loop) + + # In case mock doesnt work as expected + class TestSubprocessProtocol(SubprocessProtocol): + @asyncio.coroutine + def on_started(self): + print("ON STARTED") + + @asyncio.coroutine + def on_shutdown(self): + print("ON SHUTDOWN") + + @asyncio.coroutine + def on_exited(self): + print("ON EXIT") + + def mock_protocol_factory(): + """To mock only the async API""" + p = TestSubprocessProtocol(exit_future) + # Just comment out these lines to test the actual behavior with mock + # p = SubprocessProtocol(exit_future) + #p.on_shutdown = asynctest.CoroutineMock() + #p.on_started = asynctest.CoroutineMock() + #p.on_exited = asynctest.CoroutineMock() + return p + + # Create the subprocess controlled by the protocol DateProtocol, + # redirect the standard output into a pipe + create = event_loop.subprocess_exec(mock_protocol_factory, + sys.executable, '-c', code, + stdin=None, stderr=None) + try: + transport, protocol = yield from create + except Exception as e: # TODO : catch exact exception on python < 3.4 + transport, protocol = yield asyncio.From(create) + + #protocol.on_started.assert_called_once() + + + try: + # Wait for the subprocess exit using the process_exited() method + # of the protocol + yield from exit_future + except Exception as e: # TODO : catch exact exception on python < 3.4 + yield asyncio.From(exit_future) + + #protocol.on_shutdown.assert_called_once() + + # Close the stdout pipe + transport.close() + # + # # Read the output which was collected by the pipe_data_received() + # # method of the protocol + # data = bytes(protocol.output) + # return data.decode('ascii').rstrip() + +# if sys.platform == "win32": +# loop = trollius.ProactorEventLoop() +# else: +# loop = trollius.new_event_loop() +# trollius.set_event_loop(loop) +# +# date = loop.run_until_complete(run_minimal_process(loop)) +# print("Current date: %s" % date) +# loop.close() + + +if __name__ == '__main__': + pytest.main(['-s', '-x', __file__]) diff --git a/subproc/zmqserver.py b/subproc/zmqserver.py new file mode 100644 index 0000000..fe59eac --- /dev/null +++ b/subproc/zmqserver.py @@ -0,0 +1,17 @@ + + +# Copying asyncio Server API + +# TODO +class ZMQServer(): + """Server listening on zmq sockets""" + + def __init__(self): + self.sockets = [] + + def close(self): + pass + + async def wait_closed(self): + await pass +