diff --git a/launch/launch/utilities/__init__.py b/launch/launch/utilities/__init__.py index f3efb4de5..fd044f2ee 100644 --- a/launch/launch/utilities/__init__.py +++ b/launch/launch/utilities/__init__.py @@ -20,6 +20,7 @@ from .normalize_to_list_of_substitutions_impl import normalize_to_list_of_substitutions from .perform_substitutions_impl import perform_substitutions from .signal_management import AsyncSafeSignalManager +from .signal_management import install_signal_handlers, on_sigint, on_sigquit, on_sigterm from .visit_all_entities_and_collect_futures_impl import visit_all_entities_and_collect_futures __all__ = [ @@ -30,6 +31,10 @@ 'ensure_argument_type', 'perform_substitutions', 'AsyncSafeSignalManager', + 'install_signal_handlers', + 'on_sigint', + 'on_sigquit', + 'on_sigterm', 'normalize_to_list_of_substitutions', 'visit_all_entities_and_collect_futures', ] diff --git a/launch/launch/utilities/signal_management.py b/launch/launch/utilities/signal_management.py index a144fc5fb..619e4428d 100644 --- a/launch/launch/utilities/signal_management.py +++ b/launch/launch/utilities/signal_management.py @@ -15,32 +15,19 @@ """Module for signal management functionality.""" import asyncio +from contextlib import ExitStack import os import platform import signal import socket import threading - from typing import Callable from typing import Optional from typing import Tuple # noqa: F401 from typing import Union +import warnings - -def is_winsock_handle(fd): - """Check if the given file descriptor is WinSock handle.""" - if platform.system() != 'Windows': - return False - try: - # On Windows, WinSock handles and regular file handles - # have disjoint APIs. This test leverages the fact that - # attempting to get an MSVC runtime file handle from a - # WinSock handle will fail. - import msvcrt - msvcrt.get_osfhandle(fd) - return False - except OSError: - return True +import osrf_pycommon.process_utils class AsyncSafeSignalManager: @@ -66,8 +53,19 @@ class AsyncSafeSignalManager: :func:`signal.signal`. All signals received are forwarded to the previously setup file descriptor, if any. + + ..warning:: + Within (potentially nested) contexts, :func:`signal.set_wakeup_fd` + calls are intercepted such that the given file descriptor overrides + the previously setup file descriptor for the outermost manager. + This ensures the manager's chain of signal wakeup file descriptors + is not broken by third-party code or by asyncio itself in some platforms. """ + __current = None # type: AsyncSafeSignalManager + + __set_wakeup_fd = signal.set_wakeup_fd # type: Callable[[int], int] + def __init__( self, loop: asyncio.AbstractEventLoop @@ -77,21 +75,57 @@ def __init__( :param loop: event loop that will handle the signals. """ + self.__parent = None # type: AsyncSafeSignalManager self.__loop = loop # type: asyncio.AbstractEventLoop self.__background_loop = None # type: Optional[asyncio.AbstractEventLoop] self.__handlers = {} # type: dict self.__prev_wakeup_handle = -1 # type: Union[int, socket.socket] - self.__wsock, self.__rsock = socket.socketpair() # type: Tuple[socket.socket, socket.socket] # noqa - self.__wsock.setblocking(False) - self.__rsock.setblocking(False) + self.__wsock = None + self.__rsock = None + self.__close_sockets = None def __enter__(self): + pair = socket.socketpair() # type: Tuple[socket.socket, socket.socket] # noqa + with ExitStack() as stack: + self.__wsock = stack.enter_context(pair[0]) + self.__rsock = stack.enter_context(pair[1]) + self.__wsock.setblocking(False) + self.__rsock.setblocking(False) + self.__close_sockets = stack.pop_all().close + + self.__add_signal_readers() + try: + self.__install_signal_writers() + except Exception: + self.__remove_signal_readers() + self.__close_sockets() + self.__rsock = None + self.__wsock = None + self.__close_sockets = None + raise + self.__chain() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + try: + try: + self.__uninstall_signal_writers() + finally: + self.__remove_signal_readers() + finally: + self.__unchain() + self.__close_sockets() + self.__rsock = None + self.__wsock = None + self.__close_sockets = None + + def __add_signal_readers(self): try: self.__loop.add_reader(self.__rsock.fileno(), self.__handle_signal) except NotImplementedError: # Some event loops, like the asyncio.ProactorEventLoop # on Windows, do not support asynchronous socket reads. - # So we emulate it. + # Emulate it. self.__background_loop = asyncio.SelectorEventLoop() self.__background_loop.add_reader( self.__rsock.fileno(), @@ -102,29 +136,69 @@ def run_background_loop(): asyncio.set_event_loop(self.__background_loop) self.__background_loop.run_forever() - self.__background_thread = threading.Thread(target=run_background_loop) + self.__background_thread = threading.Thread( + target=run_background_loop, daemon=True) self.__background_thread.start() - self.__prev_wakeup_handle = signal.set_wakeup_fd(self.__wsock.fileno()) - if self.__prev_wakeup_handle != -1 and is_winsock_handle(self.__prev_wakeup_handle): - # On Windows, os.write will fail on a WinSock handle. There is no WinSock API - # in the standard library either. Thus we wrap it in a socket.socket instance. - self.__prev_wakeup_handle = socket.socket(fileno=self.__prev_wakeup_handle) - return self - def __exit__(self, type_, value, traceback): - if isinstance(self.__prev_wakeup_handle, socket.socket): - # Detach (Windows) socket and retrieve the raw OS handle. - prev_wakeup_handle = self.__prev_wakeup_handle.fileno() - self.__prev_wakeup_handle.detach() - self.__prev_wakeup_handle = prev_wakeup_handle - assert self.__wsock.fileno() == signal.set_wakeup_fd(self.__prev_wakeup_handle) + def __remove_signal_readers(self): if self.__background_loop: self.__background_loop.call_soon_threadsafe(self.__background_loop.stop) self.__background_thread.join() self.__background_loop.close() + self.__background_loop = None else: self.__loop.remove_reader(self.__rsock.fileno()) + def __install_signal_writers(self): + prev_wakeup_handle = self.__set_wakeup_fd(self.__wsock.fileno()) + try: + self.__chain_wakeup_handle(prev_wakeup_handle) + except Exception: + own_wakeup_handle = self.__set_wakeup_fd(prev_wakeup_handle) + assert self.__wsock.fileno() == own_wakeup_handle + raise + + def __uninstall_signal_writers(self): + prev_wakeup_handle = self.__chain_wakeup_handle(-1) + own_wakeup_handle = self.__set_wakeup_fd(prev_wakeup_handle) + assert self.__wsock.fileno() == own_wakeup_handle + + def __chain(self): + self.__parent = AsyncSafeSignalManager.__current + AsyncSafeSignalManager.__current = self + if self.__parent is None: + # Do not trust signal.set_wakeup_fd calls within context. + # Overwrite handle at the start of the managers' chain. + def modified_set_wakeup_fd(signum): + if threading.current_thread() is not threading.main_thread(): + raise ValueError( + 'set_wakeup_fd only works in main' + ' thread of the main interpreter' + ) + return self.__chain_wakeup_handle(signum) + signal.set_wakeup_fd = modified_set_wakeup_fd + + def __unchain(self): + if self.__parent is None: + signal.set_wakeup_fd = self.__set_wakeup_fd + AsyncSafeSignalManager.__current = self.__parent + + def __chain_wakeup_handle(self, wakeup_handle): + prev_wakeup_handle = self.__prev_wakeup_handle + if isinstance(prev_wakeup_handle, socket.socket): + # Detach (Windows) socket and retrieve the raw OS handle. + prev_wakeup_handle = prev_wakeup_handle.detach() + if wakeup_handle != -1 and platform.system() == 'Windows': + # On Windows, os.write will fail on a WinSock handle. There is no WinSock API + # in the standard library either. Thus we wrap it in a socket.socket instance. + try: + wakeup_handle = socket.socket(fileno=wakeup_handle) + except WindowsError as e: + if e.winerror != 10038: # WSAENOTSOCK + raise + self.__prev_wakeup_handle = wakeup_handle + return prev_wakeup_handle + def __handle_signal(self): while True: try: @@ -168,3 +242,108 @@ def handle( else: old_handler = self.__handlers.pop(signum, None) return old_handler + + +__global_signal_manager_activated_lock = threading.Lock() +__global_signal_manager_activated = False +__global_signal_manager = AsyncSafeSignalManager( + loop=osrf_pycommon.process_utils.get_loop()) + + +def on_sigint(handler): + """ + Set the signal handler to be called on SIGINT. + + Pass None for no custom handler. + + install_signal_handlers() must have been called in the main thread before. + + .. deprecated:: Foxy + + Use AsyncSafeSignalManager instead + """ + warnings.warn( + 'Global signal management APIs are deprecated. Do not use on_sigint(). ' + 'Use the AsyngSafeSignalManager instead.', + DeprecationWarning + ) + __global_signal_manager.handle(signal.SIGINT, handler) + + +def on_sigquit(handler): + """ + Set the signal handler to be called on SIGQUIT. + + Note Windows does not have SIGQUIT, so it can be set with this function, + but the handler will not be called. + + Pass None for no custom handler. + + install_signal_handlers() must have been called in the main thread before. + + .. deprecated:: Foxy + + Use AsyncSafeSignalManager instead + """ + warnings.warn( + 'Global signal management APIs are deprecated. Do not use on_sigquit(). ' + 'Use the AsyngSafeSignalManager instead.', + DeprecationWarning + ) + if platform.system() != 'Windows': + __global_signal_manager.handle(signal.SIGQUIT, handler) + + +def on_sigterm(handler): + """ + Set the signal handler to be called on SIGTERM. + + Pass None for no custom handler. + + install_signal_handlers() must have been called in the main thread before. + + .. deprecated:: Foxy + + Use AsyncSafeSignalManager instead + """ + warnings.warn( + 'Global signal management APIs are deprecated. Do not use on_sigterm(). ' + 'Use the AsyngSafeSignalManager instead.', + DeprecationWarning + ) + __global_signal_manager.handle(signal.SIGTERM, handler) + + +def install_signal_handlers(): + """ + Install custom signal handlers so that hooks can be setup from other threads. + + Calling this multiple times does not fail, but the signals are only + installed once. + + If called outside of the main-thread, a ValueError is raised, see: + https://docs.python.org/3.6/library/signal.html#signal.signal + + Also, if you register your own signal handlers after calling this function, + then you should store and forward to the existing signal handlers. + + If you register signal handlers before calling this function, then your + signal handler will automatically be called by the signal handlers in this + thread. + + .. deprecated:: Foxy + + Use AsyncSafeSignalManager instead + """ + global __global_signal_manager_activated + with __global_signal_manager_activated_lock: + if __global_signal_manager_activated: + return + __global_signal_manager_activated = True + warnings.warn( + 'Global signal management APIs are deprecated. ' + 'Do not use install_signal_handlers(). ' + 'Use the AsyngSafeSignalManager instead.', + DeprecationWarning + ) + __global_signal_manager.__enter__() diff --git a/launch/test/launch/utilities/test_signal_management.py b/launch/test/launch/utilities/test_signal_management.py index 92d504cc8..55c2f9ac5 100644 --- a/launch/test/launch/utilities/test_signal_management.py +++ b/launch/test/launch/utilities/test_signal_management.py @@ -20,9 +20,15 @@ import signal from launch.utilities import AsyncSafeSignalManager +from launch.utilities import install_signal_handlers +from launch.utilities import on_sigint +from launch.utilities import on_sigquit +from launch.utilities import on_sigterm import osrf_pycommon.process_utils +import pytest + def cap_signals(*signals): def _noop(*args): @@ -51,6 +57,14 @@ def _wrapper(*args, **kwargs): SIGNAL = signal.SIGUSR1 ANOTHER_SIGNAL = signal.SIGUSR2 +if not hasattr(signal, 'raise_signal'): + # Only available for Python 3.8+ + def raise_signal(signum): + import os + os.kill(os.getpid(), signum) +else: + raise_signal = signal.raise_signal + @cap_signals(SIGNAL, ANOTHER_SIGNAL) def test_async_safe_signal_manager(): @@ -70,7 +84,7 @@ def test_async_safe_signal_manager(): manager.handle(ANOTHER_SIGNAL, got_another_signal.set_result) # Verify signal handling is working - loop.call_soon(signal.raise_signal, SIGNAL) + loop.call_soon(raise_signal, SIGNAL) loop.run_until_complete(asyncio.wait( [got_signal, got_another_signal], return_when=asyncio.FIRST_COMPLETED, @@ -84,14 +98,14 @@ def test_async_safe_signal_manager(): manager.handle(SIGNAL, None) # Verify signal handler is no longer there - loop.call_soon(signal.raise_signal, SIGNAL) + loop.call_soon(raise_signal, SIGNAL) loop.run_until_complete(asyncio.wait( [got_another_signal], timeout=1.0 )) assert not got_another_signal.done() # Signal handling is (now) inactive outside context - loop.call_soon(signal.raise_signal, ANOTHER_SIGNAL) + loop.call_soon(raise_signal, ANOTHER_SIGNAL) loop.run_until_complete(asyncio.wait( [got_another_signal], timeout=1.0 )) @@ -99,9 +113,73 @@ def test_async_safe_signal_manager(): # Managers' context may be re-entered with manager: - loop.call_soon(signal.raise_signal, ANOTHER_SIGNAL) + loop.call_soon(raise_signal, ANOTHER_SIGNAL) loop.run_until_complete(asyncio.wait( [got_another_signal], timeout=1.0 )) assert got_another_signal.done() assert got_another_signal.result() == ANOTHER_SIGNAL + + +def test_install_signal_handlers(): + """Test the install_signal_handlers() function.""" + with pytest.deprecated_call(): + install_signal_handlers() + install_signal_handlers() + install_signal_handlers() + + +def test_on_sigint(): + """Test the on_sigint() function.""" + with pytest.deprecated_call(): + # None is acceptable + on_sigint(None) + + def mock_sigint_handler(): + pass + + on_sigint(mock_sigint_handler) + + # Non-callable is not + with pytest.raises(ValueError): + on_sigint('non-callable') + + # TODO(jacobperron): implement a functional test by using subprocess.Popen + + +@pytest.mark.skipif(platform.system() == 'Windows', + reason='No SIGQUIT on Windows') +def test_on_sigquit(): + """Test the on_sigquit() function.""" + with pytest.deprecated_call(): + # None is acceptable + on_sigquit(None) + + def mock_sigquit_handler(): + pass + + on_sigquit(mock_sigquit_handler) + + # Non-callable is not + with pytest.raises(ValueError): + on_sigquit('non-callable') + + # TODO(jacobperron): implement a functional test by using subprocess.Popen + + +def test_on_sigterm(): + """Test the on_sigterm() function.""" + with pytest.deprecated_call(): + # None is acceptable + on_sigterm(None) + + def mock_sigterm_handler(): + pass + + on_sigterm(mock_sigterm_handler) + + # Non-callable is not + with pytest.raises(ValueError): + on_sigterm('non-callable') + + # TODO(jacobperron): implement a functional test by using subprocess.Popen