diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 77d0edea..19cabb17 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -13,15 +13,15 @@ env: BASE_IMAGE_USER: ghcr.io/driplineorg BASE_IMAGE_REPO: dripline-cpp DEV_SUFFIX: '-dev' - BASE_IMAGE_TAG: 'v2.10.4' -# BASE_IMAGE_TAG: 'hf2.10.4' + BASE_IMAGE_TAG: 'v2.10.6' +# BASE_IMAGE_TAG: 'cancelation' # DEV_SUFFIX: '' jobs: test_docker: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 env: TAG: gha-test @@ -94,7 +94,7 @@ jobs: build_and_push: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 if: | github.event_name == 'push' || @@ -204,6 +204,16 @@ jobs: tags: ${{ steps.docker_meta_integration.outputs.tags }} platforms: linux/amd64,linux/arm64,linux/arm/v7 - - name: Release - uses: softprops/action-gh-release@v2 + - name: Release with a changelog + uses: rasmus-saks/release-a-changelog-action@v1.2.0 if: ${{ github.event_name == 'push' && contains(github.ref, 'refs/tags/') }} + with: + github-token: '${{ secrets.GITHUB_TOKEN }}' + path: 'changelog.md' + title-template: 'dripline-python v{version} -- Release Notes' + tag-template: 'v{version}' + + # This should be removed if the use of rasmus-saks/release-a-changelog-action works + #- name: Release + # uses: softprops/action-gh-release@v2 + # if: ${{ github.event_name == 'push' && contains(github.ref, 'refs/tags/') }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 6589732d..ad2576a5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.5) # <3.5 is deprecated by CMake -project( DriplinePy VERSION 5.0.1 ) +project( DriplinePy VERSION 5.1.0 ) cmake_policy( SET CMP0074 NEW ) diff --git a/Dockerfile b/Dockerfile index 2bd456da..93ab7c9d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,9 @@ ARG img_user=ghcr.io/driplineorg ARG img_repo=dripline-cpp #ARG img_tag=develop -ARG img_tag=v2.10.4 +ARG img_tag=v2.10.6 -FROM ${img_user}/${img_repo}:${img_tag} +FROM ${img_user}/${img_repo}:${img_tag} AS deps ## would prefer not to do this, just run ldconfig after the build to get things ## into the ld.so.conf cache... use this only when developing and adding libs @@ -14,7 +14,10 @@ RUN apt-get update && \ apt-get --fix-missing -y install \ libpq-dev && \ rm -rf /var/lib/apt/lists/* && \ - pip install ipython pytest + pip install ipython pytest &&\ + pip install aiohttp sqlalchemy psycopg2 PyYAML uuid asteval colorlog + +FROM deps COPY . /usr/local/src_py/ diff --git a/bin/dl-serve b/bin/dl-serve index b3986cf6..19376b42 100755 --- a/bin/dl-serve +++ b/bin/dl-serve @@ -32,7 +32,16 @@ class Serve(dripline.core.ObjectCreator): try: import colorlog modified_format = base_format.format('%(log_color)s', '%(purple)s') - self._logging_format = colorlog.ColoredFormatter(modified_format, datefmt=time_format[:-4]) + self._logging_format = colorlog.ColoredFormatter(modified_format, + datefmt=time_format[:-4], + log_colors={ + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red,bg_white', + }, + ) except ImportError: modified_format = base_format.format(' ', '') self._logging_format = logging.Formatter(modified_format, time_format[:-4]) @@ -52,9 +61,10 @@ class Serve(dripline.core.ObjectCreator): this_config_param = the_app.primary_config this_auth = the_app.auth - verbosity = the_app.global_verbosity + verbosity = scarab.s2py_verbosity(the_app.global_verbosity) #print(f"verbosity is {verbosity}") self.handler.setLevel(verbosity) + logger.setLevel(verbosity) sig_handler = scarab.SignalHandler() @@ -74,11 +84,14 @@ class Serve(dripline.core.ObjectCreator): # Create the service and register it with the signal handler the_service = self.create_object( this_config, 'Service', this_auth ) sig_handler.add_cancelable(the_service) + #scarab.SignalHandler.add_cancelable(the_service) logger.info(f'Service {the_service.name} has been built') # Run the service the_service.run() + scarab.SignalHandler.remove_cancelable(the_service) + if __name__ == '__main__': # App object the_main = scarab.MainApp() diff --git a/changelog.md b/changelog.md new file mode 100644 index 00000000..decc9406 --- /dev/null +++ b/changelog.md @@ -0,0 +1,44 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [5.1.0] - 2025-08-26 + +### Added + +- Heartbeat Monitor (implementations.HeartbeatMonitor) +- New logic for how logging is handled by Entities + - The `log_interval` is now the interval with which an entity's value is checked, not necessarily logged + - Whether a value is logged at the `log_interval` is controlled by: + - `max_interval`: if this time is exceeded since the last log entry, then it will be logged; if 0 (default), then logging occurs every `log_interval` + - `max_absolute_change`: if the value changes by more than this since the last log entry, then it will be logged + - `max_fractional_change`: if the value changes fractional change is more than this since the last log entry, then it will be logged + - The field that's checked for the `max_fractional_change` and `max_absolute_change` is given by `check_field` + +### Changed + +- Methods for sending and receiving messages are moved to the mixin classes core.RequestHandler and core.RequestSender + to capture how dl-py handles requests for both services and endpoints +- Upgrade dl-cpp to v2.10.6 +- Docker build now separates the installation of dependencies into a separate stage + +### Fixed + +- Postgres syntax +- Application cancelation -- can use ctrl-c or other system signals to cancel an executable +- Alerts exchange not hard-coded in the alerts consumer + +## [5.0.1] - 2023-03-05 + +### Incompatibility + +Messages sent with this version of dl-py are not compatible with: +- dl-py v5.0.0 and earlier +- dl-py v5.1.0 and later +- dl-cpp v2.10.3 and earlier +- dl-cpp v2.10.6 and later. diff --git a/chart/Chart.yaml b/chart/Chart.yaml index 80e33a20..93922655 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v1 ## the appVersion is used as the container image tag for the main container in the pod from the deployment ## it can be overridden by a values.yaml file in image.tag -appVersion: "v5.0.0" +appVersion: "v5.1.0" description: Deploy a dripline-python microservice name: dripline-python version: 1.1.2 diff --git a/dripline/core/__init__.py b/dripline/core/__init__.py index a98e28d0..3166a117 100644 --- a/dripline/core/__init__.py +++ b/dripline/core/__init__.py @@ -9,6 +9,8 @@ from .entity import * from .interface import * from .object_creator import * +from .request_handler import * +from .request_sender import * from .return_codes import * from .service import * from .throw_reply import * diff --git a/dripline/core/alert_consumer.py b/dripline/core/alert_consumer.py index 82926329..9434dd99 100644 --- a/dripline/core/alert_consumer.py +++ b/dripline/core/alert_consumer.py @@ -35,7 +35,7 @@ def bind_keys(self): to_return = Service.bind_keys(self); for a_key in self._alert_keys: logger.debug(f" binding alert key {a_key}") - to_return = to_return and self.bind_key("alerts", a_key) + to_return = to_return and self.bind_key(self.alerts_exchange, a_key) return to_return def on_alert_message(self, an_alert): diff --git a/dripline/core/endpoint.py b/dripline/core/endpoint.py index ab408ccc..e1250a80 100644 --- a/dripline/core/endpoint.py +++ b/dripline/core/endpoint.py @@ -1,8 +1,7 @@ __all__ = [] -import scarab from _dripline.core import _Endpoint -from .throw_reply import ThrowReply +from .request_handler import RequestHandler import logging @@ -11,7 +10,7 @@ __all__.append('Endpoint') -class Endpoint(_Endpoint): +class Endpoint(_Endpoint, RequestHandler): ''' Base class for all objects which can be sent dripline requests. Every object described in a runtime configuration passed to `dl-serve` should derive from this class (either directly or indirectly). @@ -24,88 +23,54 @@ def __init__(self, name): ''' _Endpoint.__init__(self, name) - def result_to_scarab_payload(self, result: str): - """ - Intercept result values and throw error if scarab is unable to convert to param - TODO: Handles global Exception case, could be more specific + def do_get_request(self, a_request_message): + ''' + Default function for handling an OP_GET request message addressed to this endpoint. + + .. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic + dripline-python behavior for an endpoint receiving a get request, including using the specifier to access attributes, + and calling on_get() when there is no specifier. + As an extension author you might typically override RequestReciever.on_get(), but leave this function alone. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the version of do_get_request() in RequestHandler. + Args: - result (str): request message passed - """ - try: - return scarab.to_param(result) - except Exception as e: - raise ThrowReply('service_error_bad_payload', - f"{self.name} unable to convert result to scarab payload: {result}") + a_request_message (MsgRequest): the message receveived by this endpoint + ''' - def do_get_request(self, a_request_message): - logger.info("in do_get_request") - a_specifier = a_request_message.specifier.to_string() - if (a_specifier): - logger.debug("has specifier") - try: - logger.debug(f"specifier is: {a_specifier}") - an_attribute = getattr(self, a_specifier) - logger.debug(f"attribute '{a_specifier}' value is [{an_attribute}]") - the_node = scarab.ParamNode() - the_node.add("values", scarab.ParamArray()) - the_node["values"].push_back(scarab.ParamValue(an_attribute)) - return a_request_message.reply(payload=the_node) - except AttributeError as this_error: - raise ThrowReply('service_error_invalid_specifier', - f"endpoint {self.name} has no attribute {a_specifier}, unable to get") - else: - logger.debug('no specifier') - the_value = self.on_get() - return a_request_message.reply(payload=self.result_to_scarab_payload(the_value)) + return RequestHandler.do_get_request(self, a_request_message) def do_set_request(self, a_request_message): - a_specifier = a_request_message.specifier.to_string() - try: - new_value = a_request_message.payload["values"][0]() - except Exception as err: - raise ThrowReply('service_error_bad_payload', f'Set called with invalid values: {err}') - new_value = getattr(new_value, "as_"+new_value.type())() - logger.debug(f'new_value is [{new_value}]') - if ( a_specifier ): - if not hasattr(self, a_specifier): - raise ThrowReply('service_error_invalid_specifier', - "endpoint {} has no attribute {}, unable to set".format(self.name, a_specifier)) - setattr(self, a_specifier, new_value) - return a_request_message.reply() - else: - result = self.on_set(new_value) - return a_request_message.reply(payload=self.result_to_scarab_payload(result)) - - def do_cmd_request(self, a_request_message): - # Note: any command executed in this way must return a python data structure which is - # able to be converted to a Param object (to be returned in the reply message) - method_name = a_request_message.specifier.to_string() - try: - method_ref = getattr(self, method_name) - except AttributeError as e: - raise ThrowReply('service_error_invalid_method', - "error getting command's corresponding method: {}".format(str(e))) - the_kwargs = a_request_message.payload.to_python() - the_args = the_kwargs.pop('values', []) - try: - result = method_ref(*the_args, **the_kwargs) - except TypeError as e: - raise ThrowReply('service_error_invalid_value', - f'A TypeError occurred while calling the requested method for endpoint {self.name}: {method_name}. Values provided may be invalid.\nOriginal error: {str(e)}') - return a_request_message.reply(payload=self.result_to_scarab_payload(result)) - - def on_get(self): ''' - placeholder method for getting the value of an endpoint. - Implementations may override to enable OP_GET operations. - The implementation must return a value which is able to be passed to the ParamValue constructor. + Default function for handling an OP_SET request message addressed to this endpoint. + + .. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic + dripline-python behavior for an endpoint receiving a set request, including using the specifier to access attributes, + and calling on_set() when there is no specifier. + As an extension author you might typically override RequestReciever.on_set(), but leave this function alone. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the version of do_set_request() in RequestHandler. + + Args: + a_request_message (MsgRequest): the message receveived by this endpoint ''' - raise ThrowReply('service_error_invalid_method', "{} does not implement on_get".format(self.__class__)) - def on_set(self, _value): + return RequestHandler.do_set_request(self, a_request_message) + + def do_cmd_request(self, a_request_message): ''' - placeholder method for setting the value of an endpoint. - Implementations may override to enable OP_SET operations. - Any returned object must already be a scarab::Param object + Default function for handling an OP_CMD request message addressed to this endpoint. + + .. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic + dripline-python behavior for an endpoint receiving a cmd request, namesly using the specifier to call endpoint methods. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the version of do_cmd_request() in RequestHandler. + + Args: + a_request_message (MsgRequest): the message receveived by this endpoint ''' - raise ThrowReply('service_error_invalid_method', "{} does not implement on_set".format(self.__class__)) + + return RequestHandler.do_cmd_request(self, a_request_message) diff --git a/dripline/core/entity.py b/dripline/core/entity.py index cb9ef3bd..fae6ae3b 100644 --- a/dripline/core/entity.py +++ b/dripline/core/entity.py @@ -1,13 +1,13 @@ import datetime import functools -import types import numbers import scarab +from _dripline.core import MsgAlert from .endpoint import Endpoint -from dripline.core import MsgAlert +from .throw_reply import ThrowReply __all__ = [] import logging @@ -31,6 +31,11 @@ def wrapper(*args, **kwargs): values.update({'value_raw': args[0]}) logger.debug('set done, now log') self.log_a_value(values) + try: + this_value = float(values[self._check_field]) + except (TypeError, ValueError): + this_value = False + self._last_log_value = this_value return result return wrapper @@ -49,29 +54,60 @@ def wrapper(*args, **kwargs): __all__.append("Entity") class Entity(Endpoint): ''' - Subclass of Endpoint which adds logic related to logging and confirming values. + Subclass of Endpoint that adds logic related to logging and confirming values. In particular, there is support for: - get_on_set -> setting the endpoint's value returns a get() result rather than an empty success (particularly useful for devices which may round assignment values) - log_on_set -> further extends get_on_set to send an alert message in addtion to returning the value in a reply - log_interval -> leverages the scheduler class to log the on_get result at a regular cadence + get_on_set -> setting the endpoint's value returns an on_get() result rather than an empty success (particularly useful for devices that may round assignment values) + log_on_set -> further extends get_on_set to send an logging alert message in addtion to returning the value in a reply + log_interval -> leverages the scheduler class to log the on_get result at a regular cadence and if the value changes significantly ''' #check_on_set -> allows for more complex logic to confirm successful value updates # (for example, the success condition may be measuring another endpoint) - def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_interval=0, log_on_set=False, calibration=None, **kwargs): + def __init__(self, + get_on_set=False, + log_on_set=False, + log_routing_key_prefix='sensor_value', + log_interval=0, + max_interval=0, + max_absolute_change=0, + max_fractional_change=0, + check_field='value_cal', + calibration=None, + **kwargs): ''' Args: - get_on_set: if true, calls to on_set are immediately followed by an on_get, which is returned - log_routing_key_prefix: first term in routing key used in alert messages which log values - log_interval: how often to log the Entity's value. If 0 then scheduled logging is disabled; - if a number, interpreted as number of seconds; if a dict, unpacked as arguments - to the datetime.time_delta initializer; if a datetime.timedelta taken as the new value - log_on_set: if true, always call log_a_value() immediately after on_set + get_on_set: bool (default is False) + If true, calls to on_set() are immediately followed by an on_get(), which is returned + log_on_set: bool (default is False) + If true, always call log_a_value() immediately after on_set() **Note:** requires get_on_set be true, overrides must be equivalent - calibration (string || dict) : if string, updated with raw on_get() result via str.format() in - @calibrate decorator, used to populate raw and calibrated values - fields of a result payload. If a dictionary, the raw result is used - to index the dict with the calibrated value being the dict's value. + log_routing_key_prefix: string (default is 'sensor_value') + First term in routing key used in alert messages that log values + log_interval: 0 (default), float, dict, datetime.timmedelta + Defines how often to check the Entity's value to determine if it should be logged + If 0, scheduled logging is disabled; + If a number, interpreted as number of seconds; + If a dict, unpacked as arguments to the datetime.time_delta initializer; + If a datetime.timedelta, taken as the new value + max_interval: float + Maximum time interval between logging in seconds. + Logging will take place at the next log_interval after max_interval since the last logged value. + If less than log_interval, then logging values occurs every log_interval. + max_absolute_change: float + Absolute change in the numeric value that will trigger the value to be logged + If 0, then any change in the value will be logged + If < 0, then the value will always be logged (recommend instead max_interval=0) + max_fractional_change: float + Fractional change in the value that will trigger the value to be logged + If 0, then any change in the value will be logged + If < 0, then the value will always be logged (recommend instead max_interval=0) + check_field: string + Field in the dict returned by `on_get() that's used to check for a change in the fractional value + Typically is either 'value_cal' or 'value_raw' + calibration: string or dict + If string, updated with raw on_get() result via str.format() in the @calibrate decorator, + used to populate raw and calibrated values fields of a result payload. + If a dictionary, the raw result is used to index the dict with the calibrated value being the dict's value. ''' Endpoint.__init__(self, **kwargs) @@ -81,13 +117,19 @@ def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_ # keep a reference to the on_set (possibly decorated in a subclass), needed for changing *_on_set configurations self.__initial_on_set = self.on_set - self._get_on_set = None self._log_on_set = None self.get_on_set = get_on_set self.log_on_set = log_on_set self.log_interval = log_interval + self._max_interval = max_interval + self._max_absolute_change = max_absolute_change + self._max_fractional_change = max_fractional_change + self._check_field = check_field + self._log_action_id = None + self._last_log_time = None + self._last_log_value = None @property def get_on_set(self): @@ -136,12 +178,42 @@ def log_interval(self, new_interval): def scheduled_log(self): logger.debug("in a scheduled log event") result = self.on_get() + try: + this_value = float(result[self._check_field]) + is_float = True + except ValueError: + is_float = False + this_value = result[self._check_field] + + # Various checks for log condition + if self._last_log_time is None: + logger.debug("Logging because this is the first logged value") + elif (datetime.datetime.now(datetime.timezone.utc) - self._last_log_time).total_seconds() > self._max_interval: + logger.debug("Logging because enough time has elapsed") + # Treatment of non-numeric value + elif not is_float: + if this_value != self._last_log_value: + logger.debug("Logging because the value has changed") + else: + logger.debug("No log condition met for string data, therefore not logging") + return + elif abs(self._last_log_value - this_value) > self._max_absolute_change: + logger.debug("Logging because the value has changed significantly") + # this condition is |x1-x0|/(|x1+x0|/2) > max_fractional_change, but safe in case the denominator is 0 + elif 2 * abs(self._last_log_value - this_value) > self._max_fractional_change * abs(self._last_log_value + this_value): + logger.debug("Logging because the value has fractionally changed significantly") + else: + logger.debug("No log condition met for numeric data, therefore not logging") + return + + self._last_log_value = this_value self.log_a_value(result) def log_a_value(self, the_value): - logger.debug(f"value to log is:\n{the_value}") + logger.info(f"value to log for {self.name} is:\n{the_value}") + self._last_log_time = datetime.datetime.now(datetime.timezone.utc) the_alert = MsgAlert.create(payload=scarab.to_param(the_value), routing_key=f'{self.log_routing_key_prefix}.{self.name}') - alert_sent = self.service.send(the_alert) + _ = self.service.send(the_alert) def start_logging(self): if self._log_action_id is not None: diff --git a/dripline/core/interface.py b/dripline/core/interface.py index 05182b1d..61ee69ac 100644 --- a/dripline/core/interface.py +++ b/dripline/core/interface.py @@ -2,16 +2,19 @@ import scarab -from _dripline.core import op_t, create_dripline_auth_spec, Core, DriplineConfig, Receiver, MsgRequest, MsgReply, DriplineError +from _dripline.core import create_dripline_auth_spec, Core, DriplineConfig, Receiver +from .request_sender import RequestSender import logging logger = logging.getLogger(__name__) __all__.append("Interface") -class Interface(Core): +class Interface(Core, RequestSender): ''' A class that provides user-friendly methods for dripline interactions in a Python interpreter. Intended for use as a dripline client in scripts or interactive sessions. + + See :py:class:dripline.core.RequestSender for the message-sending interface. ''' def __init__(self, username: str | dict=None, password: str | dict=None, dripline_mesh: dict=None, timeout_s: int=10, confirm_retcodes: bool=True): ''' @@ -62,93 +65,9 @@ def __init__(self, username: str | dict=None, password: str | dict=None, driplin auth.process_spec() Core.__init__(self, config=scarab.to_param(dripline_config), auth=auth) + RequestSender.__init__(self, sender=self) self._confirm_retcode = confirm_retcodes self.timeout_s = timeout_s self._receiver = Receiver() - - def _send_request(self, msgop, target, specifier=None, payload=None, timeout=None, lockout_key=None): - ''' - internal helper method to standardize sending request messages - ''' - a_specifier = specifier if specifier is not None else "" - a_request = MsgRequest.create(payload=scarab.to_param(payload), msg_op=msgop, routing_key=target, specifier=a_specifier) - a_request.lockout_key = lockout_key if lockout_key is not None else "" - receive_reply = self.send(a_request) - if not receive_reply.successful_send: - raise DriplineError('unable to send request') - return receive_reply - - def _receive_reply(self, reply_pkg, timeout_s): - ''' - internal helper method to standardize receiving reply messages - ''' - sig_handler = scarab.SignalHandler() - sig_handler.add_cancelable(self._receiver) - result = self._receiver.wait_for_reply(reply_pkg, timeout_s * 1000) # receiver expects ms - sig_handler.remove_cancelable(self._receiver) - return result - - def get(self, endpoint: str, specifier: str=None, lockout_key=None, timeout_s: int=0) -> MsgReply: - ''' - Send a get request to an endpoint and return the reply message. - - Parameters - ---------- - endpoint: str - Routing key to which the request should be sent. - specifier: str, optional - Specifier to add to the request, if needed. - timeout_s: int | float, optional - Maximum time to wait for a reply in seconds (default is 0) - A timeout of 0 seconds means no timeout will be used. - ''' - reply_pkg = self._send_request( msgop=op_t.get, target=endpoint, specifier=specifier, lockout_key=lockout_key ) - result = self._receive_reply( reply_pkg, timeout_s ) - return result - - def set(self, endpoint: str, value: str | int | float | bool, specifier: str=None, lockout_key=None, timeout_s: int | float=0) -> MsgReply: - ''' - Send a set request to an endpoint and return the reply message. - - Parameters - ---------- - endpoint: str - Routing key to which the request should be sent. - value: str | int | float | bool - Value to assign in the set operation - specifier: str, optional - Specifier to add to the request, if needed. - timeout_s: int | float, optional - Maximum time to wait for a reply in seconds (default is 0) - A timeout of 0 seconds means no timeout will be used. - ''' - payload = {'values':[value]} - reply_pkg = self._send_request( msgop=op_t.set, target=endpoint, specifier=specifier, payload=payload, lockout_key=lockout_key ) - result = self._receive_reply( reply_pkg, timeout_s ) - return result - - def cmd(self, endpoint: str, specifier: str, ordered_args=None, keyed_args=None, lockout_key=None, timeout_s: int | float=0) -> MsgReply: - ''' - Send a cmd request to an endpoint and return the reply message. - - Parameters - ---------- - endpoint: str - Routing key to which the request should be sent. - ordered_args: array, optional - Array of values to assign under 'values' in the payload, if any - keyed_args: dict, optional - Keyword arguments to assign to the payload, if any - specifier: str - Specifier to add to the request. For a dripline-python endpoint, this will be the method executed. - timeout_s: int | float, optional - Maximum time to wait for a reply in seconds (default is 0) - A timeout of 0 seconds means no timeout will be used. - ''' - payload = {'values': [] if ordered_args is None else ordered_args} - payload.update({} if keyed_args is None else keyed_args) - reply_pkg = self._send_request( msgop=op_t.cmd, target=endpoint, specifier=specifier, lockout_key=lockout_key, payload=payload ) - result = self._receive_reply( reply_pkg, timeout_s ) - return result diff --git a/dripline/core/request_handler.py b/dripline/core/request_handler.py new file mode 100644 index 00000000..705145b5 --- /dev/null +++ b/dripline/core/request_handler.py @@ -0,0 +1,123 @@ +__all__ = [] + +import scarab + +from .throw_reply import ThrowReply + + +import logging +logger = logging.getLogger(__name__) + +__all__.append("RequestHandler") +class RequestHandler(): + ''' + A mixin class that provides methods for dripline responses in a dripline objects. + Intended for use as a centralized object containing response methods in dripline. + + This class is used by Endpoint and Service to include the dripline-standard behavior + for handling and responding to requests. + + Any class using RequestHandler will get the following features: + + * When handling a get/set request that does *not* include a specifier, + the function ``on_[get/set]()`` will be called. + * The default implementations of ``on_[get/set]()`` raise a `ThrowReply``; + If a response to get/set requests with no specifier is desired, + a derived class should override these functions. + * When handling a cmd request, the specifier is required. + The specifier provides the method name to be called. + If no method exists with the name given by the specifier, an error is returned. + ''' + + def do_get_request(self, a_request_message): + logger.info("in get_request") + a_specifier = a_request_message.specifier.to_string() + if (a_specifier): + logger.debug("has specifier") + try: + logger.debug(f"specifier is: {a_specifier}") + an_attribute = getattr(self, a_specifier) + the_node = scarab.ParamNode() + the_node.add("values", scarab.ParamArray()) + the_node["values"].push_back(self.result_to_scarab_payload(an_attribute)) + logger.debug(f"attribute '{a_specifier}' value is [{an_attribute}]") + return a_request_message.reply(payload=the_node) + except AttributeError: + raise ThrowReply('service_error_invalid_specifier', + f"endpoint {self.name} has no attribute {a_specifier}, unable to get") + else: + logger.debug('no specifier') + the_value = self.on_get() + return a_request_message.reply(payload=self.result_to_scarab_payload(the_value)) + + def do_set_request(self, a_request_message): + + a_specifier = a_request_message.specifier.to_string() + if not "values" in a_request_message.payload: + raise ThrowReply('service_error_bad_payload', + 'setting called without values, but values are required for set') + new_value = a_request_message.payload["values"][0]() + new_value = getattr(new_value, "as_" + new_value.type())() + logger.debug(f'Attempting to set new_value to [{new_value}]') + + if (a_specifier): + if not hasattr(self, a_specifier): + raise ThrowReply('service_error_invalid_specifier', + "endpoint {} has no attribute {}, unable to set".format(self.name, a_specifier)) + setattr(self, a_specifier, new_value) + return a_request_message.reply() + else: + result = self.on_set(new_value) + return a_request_message.reply(payload=self.result_to_scarab_payload(result)) + + def do_cmd_request(self, a_request_message): + # Note: any command executed in this way must return a python data structure which is + # able to be converted to a Param object (to be returned in the reply message) + method_name = a_request_message.specifier.to_string() + if method_name == "": + raise ThrowReply('service_error_invalid_method', 'No specifier was provided for an OP_CMD request') + try: + method_ref = getattr(self, method_name) + except AttributeError as e: + raise ThrowReply('service_error_invalid_method', + f'error getting command\'s corresponding method: {str(e)}') + the_kwargs = a_request_message.payload.to_python() + the_args = the_kwargs.pop('values', []) + print(f'args: {the_args} -- kwargs: {the_kwargs}') + try: + result = method_ref(*the_args, **the_kwargs) + except TypeError as e: + raise ThrowReply('service_error_invalid_value', + f'A TypeError occurred while calling the requested method for endpoint {self.name}: {method_name}. Values provided may be invalid.\nOriginal error: {str(e)}') + return a_request_message.reply(payload=self.result_to_scarab_payload(result)) + + def on_get(self): + ''' + Placeholder method for getting the value of an endpoint. + Implementations should override to enable OP_GET operations. + The implementation must return a value which is convertible to a scarab param object. + ''' + raise ThrowReply('service_error_invalid_method', "{} does not implement on_get".format(self.__class__)) + + def on_set(self, _value): + ''' + Placeholder method for setting the value of an endpoint. + Implementations should override to enable OP_SET operations. + Any returned object must already be a scarab::Param object + ''' + raise ThrowReply('service_error_invalid_method', "{} does not implement on_set".format(self.__class__)) + + def result_to_scarab_payload(self, result: str): + """ + Intercept result values and throw error if scarab is unable to convert to param + TODO: Handles global Exception case, could be more specific + Args: + result (str): request message passed + """ + try: + return scarab.to_param(result) + except Exception as e: + result_str = str(result) + logger.warning(f"Bad payload: [{result_str}] is not of type bool, int, float, str, or dict. Converting to str.") + return scarab.to_param(result_str) + diff --git a/dripline/core/request_sender.py b/dripline/core/request_sender.py new file mode 100644 index 00000000..b4023743 --- /dev/null +++ b/dripline/core/request_sender.py @@ -0,0 +1,122 @@ +__all__ = [] + +import scarab + +from _dripline.core import op_t, Core, Receiver, MsgRequest, MsgReply, DriplineError +import uuid +import logging +logger = logging.getLogger(__name__) + +__all__.append("RequestSender") +class RequestSender(): + ''' + A mixin class that provides convenient methods for dripline interactions in a dripline objects. + Intended for use as a dripline client in scripts or interactive sessions. + ''' + def __init__(self, sender, timeout_s: int=10, confirm_retcodes: bool=True): + ''' + Configures an interface with the necessary parameters. + + Parameters + ---------- + sender : Core, Service, or other class that implements send(MsgRequest) + Provide an object that will actually send the messages as it implements the send() function + timeout_s: int, optional + Time to wait for a reply, in seconds -- default is 10 s + confirm_retcodes: bool, optional + If True, and if a reply is received with retcode != 0, raises an exception -- default is True + ''' + self.sender = sender + self._confirm_retcode = confirm_retcodes + self.timeout_s = timeout_s + self._receiver = Receiver() + + def _send_request(self, msgop, target, specifier=None, payload=None, timeout=None, lockout_key:str|uuid.UUID=None): + ''' + internal helper method to standardize sending request messages + ''' + a_specifier = specifier if specifier is not None else "" + a_request = MsgRequest.create(payload=scarab.to_param(payload), msg_op=msgop, routing_key=target, specifier=a_specifier) + if lockout_key is not None: + try: + a_request.lockout_key = lockout_key + except RuntimeError as err: + err.add_note(f"Lockout key [{lockout_key}] is not uuid format compliant.") + raise + receive_reply = self.sender.send(a_request) + if not receive_reply.successful_send: + raise DriplineError('unable to send request') + return receive_reply + + def _receive_reply(self, reply_pkg, timeout_s): + ''' + internal helper method to standardize receiving reply messages + ''' + sig_handler = scarab.SignalHandler() + sig_handler.add_cancelable(self._receiver) + result = self._receiver.wait_for_reply(reply_pkg, timeout_s * 1000) # receiver expects ms + sig_handler.remove_cancelable(self._receiver) + return result.payload.to_python() + + def get(self, endpoint: str, specifier: str=None, lockout_key: str | uuid.UUID=None, timeout_s: int=0) -> MsgReply: + ''' + Send a get request to an endpoint and return the reply message. + + Parameters + ---------- + endpoint: str + Routing key to which the request should be sent. + specifier: str, optional + Specifier to add to the request, if needed. + timeout_s: int | float, optional + Maximum time to wait for a reply in seconds (default is 0) + A timeout of 0 seconds means no timeout will be used. + ''' + reply_pkg = self._send_request( msgop=op_t.get, target=endpoint, specifier=specifier, lockout_key=lockout_key ) + result = self._receive_reply( reply_pkg, timeout_s ) + return result + + def set(self, endpoint: str, value: str | int | float | bool, specifier: str=None, lockout_key: str | uuid.UUID=None, timeout_s: int | float=0) -> MsgReply: + ''' + Send a set request to an endpoint and return the reply message. + + Parameters + ---------- + endpoint: str + Routing key to which the request should be sent. + value: str | int | float | bool + Value to assign in the set operation + specifier: str, optional + Specifier to add to the request, if needed. + timeout_s: int | float, optional + Maximum time to wait for a reply in seconds (default is 0) + A timeout of 0 seconds means no timeout will be used. + ''' + payload = {'values':[value]} + reply_pkg = self._send_request( msgop=op_t.set, target=endpoint, specifier=specifier, payload=payload, lockout_key=lockout_key ) + result = self._receive_reply( reply_pkg, timeout_s ) + return result + + def cmd(self, endpoint: str, specifier: str, ordered_args=None, keyed_args=None, lockout_key: str | uuid.UUID=None, timeout_s: int | float=0) -> MsgReply: + ''' + Send a cmd request to an endpoint and return the reply message. + + Parameters + ---------- + endpoint: str + Routing key to which the request should be sent. + ordered_args: array, optional + Array of values to assign under 'values' in the payload, if any + keyed_args: dict, optional + Keyword arguments to assign to the payload, if any + specifier: str + Specifier to add to the request. For a dripline-python endpoint, this will be the method executed. + timeout_s: int | float, optional + Maximum time to wait for a reply in seconds (default is 0) + A timeout of 0 seconds means no timeout will be used. + ''' + payload = {'values': [] if ordered_args is None else ordered_args} + payload.update({} if keyed_args is None else keyed_args) + reply_pkg = self._send_request( msgop=op_t.cmd, target=endpoint, specifier=specifier, lockout_key=lockout_key, payload=payload ) + result = self._receive_reply( reply_pkg, timeout_s ) + return result diff --git a/dripline/core/service.py b/dripline/core/service.py index 3a185220..ad4574bd 100644 --- a/dripline/core/service.py +++ b/dripline/core/service.py @@ -4,6 +4,8 @@ from _dripline.core import _Service, DriplineConfig, create_dripline_auth_spec from .throw_reply import ThrowReply from .object_creator import ObjectCreator +from .request_sender import RequestSender +from .request_handler import RequestHandler import datetime import logging @@ -11,7 +13,7 @@ logger = logging.getLogger(__name__) __all__.append('Service') -class Service(_Service, ObjectCreator): +class Service(_Service, ObjectCreator, RequestSender, RequestHandler): ''' The primary unit of software that connects to a broker and typically provides an interface with an instrument or other software. @@ -129,6 +131,7 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now auth.process_spec() _Service.__init__(self, config=scarab.to_param(config), auth=auth, make_connection=make_connection) + RequestSender.__init__(self, sender=self) # Endpoints self.endpoint_configs = endpoints @@ -146,93 +149,55 @@ def add_endpoints_from_config(self): if getattr(an_endpoint, 'log_interval', datetime.timedelta(seconds=0)) > datetime.timedelta(seconds=0): logger.debug("queue up start logging for '{}'".format(an_endpoint.name)) an_endpoint.start_logging() + + def do_get_request(self, a_request_message): + ''' + Default function for handling an OP_GET request message addressed to this service. + .. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic + dripline-python behavior for an service receiving a get request, including using the specifier to access attributes, + and calling on_get() when there is no specifier. + As an extension author you might typically override RequestReciever.on_get(), but leave this function alone. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the version of do_get_request() in RequestHandler. - def result_to_scarab_payload(self, result: str): - """ - Intercept result values and throw error if scarab is unable to convert to param - TODO: Handles global Exception case, could be more specific Args: - result (str): request message passed - """ - try: - return scarab.to_param(result) - except Exception as e: - raise ThrowReply('service_error_bad_payload', - f"{self.name} unable to convert result to scarab payload: {result}") + a_request_message (MsgRequest): the message receveived by this service + ''' - def do_get_request(self, a_request_message): - logger.info("in get_request") - a_specifier = a_request_message.specifier.to_string() - if (a_specifier): - logger.debug("has specifier") - try: - logger.debug(f"specifier is: {a_specifier}") - an_attribute = getattr(self, a_specifier) - logger.debug(f"attribute '{a_specifier}' value is [{an_attribute}]") - the_node = scarab.ParamNode() - the_node["values"] = scarab.ParamArray() - the_node["values"].push_back(scarab.ParamValue(an_attribute)) - return a_request_message.reply(payload=the_node) - except AttributeError as this_error: - raise ThrowReply('service_error_invalid_specifier', - f"endpoint {self.name} has no attribute {a_specifier}, unable to get") - else: - logger.debug('no specifier') - the_value = self.on_get() - return a_request_message.reply(payload=self.result_to_scarab_payload(the_value)) + return RequestHandler.do_get_request(self, a_request_message) def do_set_request(self, a_request_message): + ''' + Default function for handling an OP_SET request message addressed to this service. - a_specifier = a_request_message.specifier.to_string() - if not "values" in a_request_message.payload: - raise ThrowReply('service_error_bad_payload', - 'setting called without values, but values are required for set') - new_value = a_request_message.payload["values"][0]() - new_value = getattr(new_value, "as_" + new_value.type())() - logger.debug(f'Attempting to set new_value to [{new_value}]') - - if (a_specifier): - if not hasattr(self, a_specifier): - raise ThrowReply('service_error_invalid_specifier', - "endpoint {} has no attribute {}, unable to set".format(self.name, a_specifier)) - setattr(self, a_specifier, new_value) - return a_request_message.reply() - else: - result = self.on_set(new_value) - return a_request_message.reply(payload=self.result_to_scarab_payload(result)) + .. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic + dripline-python behavior for an service receiving a set request, including using the specifier to access attributes, + and calling on_set() when there is no specifier. + As an extension author you might typically override RequestReciever.on_set(), but leave this function alone. - def do_cmd_request(self, a_request_message): - # Note: any command executed in this way must return a python data structure which is - # able to be converted to a Param object (to be returned in the reply message) - method_name = a_request_message.specifier.to_string() - try: - method_ref = getattr(self, method_name) - except AttributeError as e: - raise ThrowReply('service_error_invalid_method', - "error getting command's corresponding method: {}".format(str(e))) - the_kwargs = a_request_message.payload.to_python() - the_args = the_kwargs.pop('values', []) - print(f'args: {the_args} -- kwargs: {the_kwargs}') - try: - result = method_ref(*the_args, **the_kwargs) - except TypeError as e: - raise ThrowReply('service_error_invalid_value', - f'A TypeError occurred while calling the requested method for endpoint {self.name}: {method_name}. Values provided may be invalid.\nOriginal error: {str(e)}') - return a_request_message.reply(payload=self.result_to_scarab_payload(result)) - - def on_get(self): - ''' - placeholder method for getting the value of an endpoint. - Implementations may override to enable OP_GET operations. - The implementation must return a value which is able to be passed to the ParamValue constructor. + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the version of do_set_request() in RequestHandler. + + Args: + a_request_message (MsgRequest): the message receveived by this service ''' - raise ThrowReply('service_error_invalid_method', "{} does not implement on_get".format(self.__class__)) - def on_set(self, _value): + return RequestHandler.do_set_request(self, a_request_message) + + def do_cmd_request(self, a_request_message): ''' - placeholder method for setting the value of an endpoint. - Implementations may override to enable OP_SET operations. - Any returned object must already be a scarab::Param object + Default function for handling an OP_CMD request message addressed to this service. + + .. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic + dripline-python behavior for an service receiving a cmd request, namesly using the specifier to call service methods. + + .. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from + the C++ base class. It intentionally just calls the version of do_cmd_request() in RequestHandler. + + Args: + a_request_message (MsgRequest): the message receveived by this service ''' - raise ThrowReply('service_error_invalid_method', "{} does not implement on_set".format(self.__class__)) + + return RequestHandler.do_cmd_request(self, a_request_message) diff --git a/dripline/implementations/__init__.py b/dripline/implementations/__init__.py index 3d89a48c..2e602f03 100644 --- a/dripline/implementations/__init__.py +++ b/dripline/implementations/__init__.py @@ -4,6 +4,7 @@ from .config import * from .ethernet_scpi_service import * from .entity_endpoints import * +from .heartbeat_monitor import * from .http_server import * from .key_value_store import * from .postgres_interface import * diff --git a/dripline/implementations/entity_endpoints.py b/dripline/implementations/entity_endpoints.py index fb3d21de..fb795a82 100644 --- a/dripline/implementations/entity_endpoints.py +++ b/dripline/implementations/entity_endpoints.py @@ -39,7 +39,7 @@ def __init__(self, ''' Entity.__init__(self, **kwargs) if base_str is None: - raise ThrowReply('service_error_invalid_value', ' is required to __init__ SimpleSCPIEntity instance') + raise ValueError(' is required to __init__ SimpleSCPIEntity instance') else: self.cmd_base = base_str @@ -96,7 +96,6 @@ class FormatEntity(Entity): def __init__(self, get_str=None, - get_reply_float=False, set_str=None, set_value_lowercase=True, set_value_map=None, @@ -105,7 +104,6 @@ def __init__(self, ''' Args: get_str (str): sent verbatim in the event of on_get; if None, getting of endpoint is disabled - get_reply_float (bool): apply special formatting to get return set_str (str): sent as set_str.format(value) in the event of on_set; if None, setting of endpoint is disabled set_value_lowercase (bool): default option to map all string set value to .lower() **WARNING**: never set to False if using a set_value_map dict @@ -113,17 +111,16 @@ def __init__(self, extract_raw_regex (str): regular expression search pattern applied to get return. Must be constructed with an extraction group keyed with the name "value_raw" (ie r'(?P)' ) ''' Entity.__init__(self, **kwargs) - self._get_reply_float = get_reply_float self._get_str = get_str self._set_str = set_str self._set_value_map = set_value_map self._extract_raw_regex = extract_raw_regex self.evaluator = asteval.Interpreter() if set_value_map is not None and not isinstance(set_value_map, (dict,str)): - raise ThrowReply('service_error_invalid_value', f"Invalid set_value_map config for {self.name}; type is {type(set_value_map)} not dict") + raise ValueError(f"Invalid set_value_map config for {self.name}; type is {type(set_value_map)} not dict") self._set_value_lowercase = set_value_lowercase if isinstance(set_value_map, dict) and not set_value_lowercase: - raise ThrowReply('service_error_invalid_value', f"Invalid config option for {self.name} with set_value_map and set_value_lowercase=False") + raise ValueError(f"Invalid config option for {self.name} with set_value_map and set_value_lowercase=False") @calibrate() def on_get(self): @@ -131,7 +128,7 @@ def on_get(self): # exceptions.DriplineMethodNotSupportedError raise ThrowReply('message_error_invalid_method', f"endpoint '{self.name}' does not support get") result = self.service.send_to_device([self._get_str]) - logger.debug(f'result is: {result}') + logger.debug(f'raw result is: {result}') if self._extract_raw_regex is not None: first_result = result matches = re.search(self._extract_raw_regex, first_result) diff --git a/dripline/implementations/ethernet_scpi_service.py b/dripline/implementations/ethernet_scpi_service.py index 2fbec19e..8cbcd334 100644 --- a/dripline/implementations/ethernet_scpi_service.py +++ b/dripline/implementations/ethernet_scpi_service.py @@ -52,10 +52,10 @@ def __init__(self, (ip,port) = re.findall(re_str,socket_info)[0] socket_info = (ip,int(port)) if response_terminator is None or response_terminator == '': - raise ThrowReply('service_error_invalid_value', f"Invalid response terminator: <{repr(response_terminator)}>! Expect string") + raise ValueError(f"Invalid response terminator: <{repr(response_terminator)}>! Expect string") if not isinstance(cmd_at_reconnect, list) or len(cmd_at_reconnect)==0: if cmd_at_reconnect is not None: - raise ThrowReply('service_error_invalid_value', f"Invalid cmd_at_reconnect: <{repr(cmd_at_reconnect)}>! Expect non-zero length list") + raise ValueError(f"Invalid cmd_at_reconnect: <{repr(cmd_at_reconnect)}>! Expect non-zero length list") self.alock = threading.Lock() self.socket = socket.socket() diff --git a/dripline/implementations/heartbeat_monitor.py b/dripline/implementations/heartbeat_monitor.py new file mode 100644 index 00000000..315a50e5 --- /dev/null +++ b/dripline/implementations/heartbeat_monitor.py @@ -0,0 +1,220 @@ +''' +A service for monitoring service heartbeats +''' + +from __future__ import absolute_import + +# standard libs +import logging + +import time +from datetime import datetime, timedelta +from enum import Enum +import threading + +# internal imports +from dripline.core import AlertConsumer, Endpoint +import scarab + +__all__ = [] +logger = logging.getLogger(__name__) + +__all__.append('HeartbeatTracker') +class HeartbeatTracker(Endpoint): + ''' + ''' + def __init__(self, service_name, **kwargs): + ''' + Args: + service_name (str): Name of the service to be monitored + ''' + Endpoint.__init__(self, **kwargs) + self.service_name = service_name + self.last_timestamp = time.time() + self.is_active = True + self.status = HeartbeatTracker.Status.UNKNOWN + + def process_heartbeat(self, timestamp): + ''' + ''' + logger.debug(f'New timestamp for {self.service_name}: {timestamp}') + dt = datetime.fromisoformat(timestamp) + posix_time = dt.timestamp() + logger.debug(f'Time since epoch: {posix_time}') + self.last_timestamp = posix_time + + def check_delay(self): + ''' + ''' + diff = time.time() - self.last_timestamp + if self.is_active: + if diff > self.service.critical_threshold_s: + # report critical + logger.critical(f'Missing heartbeat: {self.service_name}') + self.status = HeartbeatTracker.Status.CRITICAL + else: + if diff > self.service.warning_threshold_s: + # report warning + logger.warning(f'Missing heartbeat: {self.service_name}') + self.status = HeartbeatTracker.Status.WARNING + else: + logger.debug(f'Heartbeat status ok: {self.service_name}') + self.status = HeartbeatTracker.Status.OK + else: + # report inactive heartbeat received + logger.debug(f'Inactive heartbeat: time difference: {diff}') + self.status = HeartbeatTracker.Status.UNKNOWN + return {'status': self.status, 'time_since_last_hb': diff} + + class Status(Enum): + OK = 0 + WARNING = 1 + CRITICAL = 2 + UNKNOWN = -1 + +__all__.append('HeartbeatMonitor') +class HeartbeatMonitor(AlertConsumer): + ''' + An alert consumer which listens to heartbeat messages and keeps track of the time since the last was received + + ''' + def __init__(self, + time_between_checks_s=20, + warning_threshold_s=120, + critical_threshold_s=300, + add_unknown_heartbeats=True, + endpoint_name_prefix='hbmon_', + **kwargs): + ''' + Args: + time_between_checks_s (int): number of seconds between heartbeat status checks + warning_threshold_s (int): warning threshold for missing heartbeats (in seconds) + critical_threshold_s (int): critical threshold for missing heartbeats (in seconds) + add_unknown_heartbeats (bool): whether or not to add a new endpoint if an unknown heartbeat is received + endpoint_name_prefix (str): prefix added to monitored-service names for hbmon endpoints + ''' + AlertConsumer.__init__(self, **kwargs) + + # Total sleep time is made of multiple smaller sleeps between checking whether the application is cancelled, + # assuming that time_between_checks_s is larger than the appproximately 5 seconds between checking whether the application is canclled + # Sleep time shouldn't be less than time_between_checks_s, so n_sleeps is forced to be 1 or more. + self.time_between_checks_s = time_between_checks_s + self.n_sleeps = max(1, round(time_between_checks_s / 5)) + self.sleep_time_s = self.time_between_checks_s / self.n_sleeps + #logger.warning(f'Time between checks: {self.time_between_checks_s}, n_sleeps: {self.n_sleeps}, sleep_time: {self.sleep_time_s}') + + self.warning_threshold_s = warning_threshold_s + self.critical_threshold_s = critical_threshold_s + self.add_unknown_heartbeats = add_unknown_heartbeats + self.endpoint_name_prefix = endpoint_name_prefix + + # Fill the dictionary mapping monitoring name to service name + self.monitoring_names = {} + for an_endpoint in self.sync_children.values(): + try: + self.monitoring_names[an_endpoint.service_name] = an_endpoint.name + except Exception as err: + logger.error(f'Error while attempting to fill monitoring_names: {err}') + logger.debug(f'Initial set of services monitored:\n{self.monitoring_names}') + + def run(self): + monitor_thread = threading.Thread(target=self.monitor_heartbeats) + + monitor_thread.start() + + # Run the standard service in the main thread + AlertConsumer.run(self) + + monitor_thread.join() + + def monitor_heartbeats(self): + ''' + Performs heartbeat monitoring + ''' + while not self.is_canceled(): + try: + logger.debug('Checking endpoints') + self.process_report(self.run_checks()) + + #logger.debug(f'Sleeping for {self.time_between_checks_s} s') + for i in range(self.n_sleeps): + if self.is_canceled(): + return + time.sleep(self.sleep_time_s) + except Exception as err: + logger.error(f'Exception caught in monitor_heartbeats\' outer check: {err}') + scarab.SignalHandler.cancel_all(1) + + def run_checks(self): + ''' + Checks all endpoints and collects endpoint names by heartbeat tracker status. + ''' + report_data = { + HeartbeatTracker.Status.OK: [], + HeartbeatTracker.Status.WARNING: [], + HeartbeatTracker.Status.CRITICAL: [], + HeartbeatTracker.Status.UNKNOWN: [], + } + for an_endpoint in self.sync_children.values(): + try: + endpoint_report = an_endpoint.check_delay() + report_data[endpoint_report['status']].append( + { + 'name': an_endpoint.name, + 'time_since_last_hb': endpoint_report['time_since_last_hb'], + } + ) + except Exception as err: + logger.error(f'Unable to get status of endpoint {an_endpoint.name}: {err}') + return report_data + + def process_report(self, report_data): + ''' + Print out the information from the monitoring report data. + + This function can be overridden to handle the monitoring report differently. + ''' + logger.info('Heartbeat Monitor Status Check') + if report_data[HeartbeatTracker.Status.CRITICAL]: + logger.error('Services with CRITICAL status:') + for endpoint_data in report_data[HeartbeatTracker.Status.CRITICAL]: + logger.error(f'\t{endpoint_data['name']} -- TSLH: {timedelta(seconds=endpoint_data['time_since_last_hb'])}') + if report_data[HeartbeatTracker.Status.WARNING]: + logger.warning('Services with WARNING status:') + for endpoint_data in report_data[HeartbeatTracker.Status.WARNING]: + logger.warning(f'\t{endpoint_data['name']} -- TSLH: {timedelta(seconds=endpoint_data['time_since_last_hb'])}') + if report_data[HeartbeatTracker.Status.OK]: + logger.info(f'Services with OK status:') + for endpoint_data in report_data[HeartbeatTracker.Status.OK]: + logger.info(f'\t{endpoint_data['name']} -- TSLH: {timedelta(seconds=endpoint_data['time_since_last_hb'])}') + if report_data[HeartbeatTracker.Status.UNKNOWN]: + logger.info(f'Services with UNKNOWN status:') + for endpoint_data in report_data[HeartbeatTracker.Status.UNKNOWN]: + logger.info(f'\t{endpoint_data['name']} -- TSLH: {timedelta(seconds=endpoint_data['time_since_last_hb'])}') + + def process_payload(self, a_payload, a_routing_key_data, a_message_timestamp): + service_name = a_routing_key_data['service_name'] + if not service_name in self.monitoring_names: + logger.warning(f'received unexpected heartbeat;\npayload: {a_payload}\nrouting key data: {a_routing_key_data}\ntimestamp: {a_message_timestamp}') + if self.add_unknown_heartbeats: + binding = self.endpoint_name_prefix+service_name + self.add_child(HeartbeatTracker(service_name=service_name, name=binding)) + self.monitoring_names[service_name] = binding + logger.debug(f'Started monitoring hearteats from {service_name}') + logger.warning(f'Heartbeat monitor is currently unable to listen for requests addressed to the endpoints of new heartbeat trackers; You will not be able to send messages to {binding}') + # We'd like to be able to bind the new end point to the service's connection. + # However, we're unable to bind while the connection is being listened on. + # We either need a way to stop the service and restart (at which point it would bind all of the endpoints, including the new one), + # or we use the endpoint as an asychronous endpoint, in which case we need a way to start its thread (there isn't a separate function in dl-cpp to do this at this point). + #self.bind_key(self.requests_exchange, binding+'.#') + #logger.debug(f'Added endpoint for unknown heartbeat from {service_name}') + return + + try: + self.sync_children[self.monitoring_names[service_name]].process_heartbeat(a_message_timestamp) + except Exception as err: + logger.error(f'Unable to handle payload for heartbeat from service {service_name}: {err}') + + def do_get(self): + return self.run_checks() + \ No newline at end of file diff --git a/dripline/implementations/postgres_interface.py b/dripline/implementations/postgres_interface.py index 080fea1e..b2095cf7 100644 --- a/dripline/implementations/postgres_interface.py +++ b/dripline/implementations/postgres_interface.py @@ -131,27 +131,26 @@ def do_select(self, return_cols=[], where_eq_dict={}, where_lt_dict={}, where_gt Returns: a tuple, 1st element is list of column names, 2nd is a list of tuples of the rows that matched the select ''' if not return_cols: - return_cols = self.table.c + this_select = sqlalchemy.select(self.table) else: - return_cols = [sqlalchemy.text(col) for col in return_cols] - this_select = sqlalchemy.select(return_cols) + this_select = sqlalchemy.select(*[getattr(self.table.c,col) for col in return_cols]) for c,v in where_eq_dict.items(): this_select = this_select.where(getattr(self.table.c,c)==v) for c,v in where_lt_dict.items(): this_select = this_select.where(getattr(self.table.c,c)v) - conn = self.service.engine.connect() - result = conn.execute(this_select) + with self.service.engine.connect() as conn: + result = conn.execute(this_select) return (result.keys(), [i for i in result]) def _insert_with_return(self, insert_kv_dict, return_col_names_list): ins = self.table.insert().values(**insert_kv_dict) if return_col_names_list: ins = ins.returning(*[self.table.c[col_name] for col_name in return_col_names_list]) - conn = self.service.engine.connect() - insert_result = conn.execute(ins) - conn.commit() + with self.service.engine.connect() as conn: + insert_result = conn.execute(ins) + conn.commit() if return_col_names_list: return_values = insert_result.first() else: diff --git a/module_bindings/dripline_core/_endpoint_pybind.hh b/module_bindings/dripline_core/_endpoint_pybind.hh index 61e44c8e..fadf1aca 100644 --- a/module_bindings/dripline_core/_endpoint_pybind.hh +++ b/module_bindings/dripline_core/_endpoint_pybind.hh @@ -17,7 +17,9 @@ namespace dripline_pybind std::list< std::string > all_items; all_items.push_back( "_Endpoint" ); - pybind11::classh< dripline::endpoint, _endpoint_trampoline >( mod, "_Endpoint", "Endpoint binding" ) + pybind11::classh< dripline::endpoint, + _endpoint_trampoline + >( mod, "_Endpoint", "Endpoint binding" ) .def( pybind11::init< const std::string& >(), DL_BIND_CALL_GUARD_STREAMS ) // mv_ properties diff --git a/module_bindings/dripline_core/_service_pybind.hh b/module_bindings/dripline_core/_service_pybind.hh index 244876a1..555fc156 100644 --- a/module_bindings/dripline_core/_service_pybind.hh +++ b/module_bindings/dripline_core/_service_pybind.hh @@ -26,6 +26,7 @@ namespace dripline_pybind _service_trampoline, dripline::core, dripline::endpoint, + dripline::receiver, dripline::scheduler<>, scarab::cancelable >( mod, "_Service", "Service binding" ) @@ -41,9 +42,29 @@ namespace dripline_pybind .def_property( "auth", (scarab::authentication& (dripline::service::*)()) &dripline::service::auth, [](_service& a_service, const scarab::authentication& a_auth){a_service.auth() = a_auth;}, pybind11::return_value_policy::reference_internal ) + + // Notes on send() bindings + // The Service.send() functions are useful because they set the sender service name in the message before sending. + // The bound functions use lambdas because the dripline::service functions include amqp_ptr_t arguments which aren't known to pybind11. + // Therefore when called from Python, the send process will use the default parameter, a new AMQP connection. + // The bindings to these functions are not included in the trampoline class because we're not directly overriding the C++ send() functions. + .def( "send", + [](_service& a_service, dripline::request_ptr_t a_request){return a_service.send(a_request);}, + DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "send a request message" + ) + .def( "send", + [](_service& a_service, dripline::reply_ptr_t a_reply){return a_service.send(a_reply);}, + DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "send a reply message" + ) + .def( "send", + [](_service& a_service, dripline::alert_ptr_t an_alert){return a_service.send(an_alert);}, + DL_BIND_CALL_GUARD_STREAMS_AND_GIL, + "send an alert message" + ) + .def_property( "enable_scheduling", &dripline::service::get_enable_scheduling, &dripline::service::set_enable_scheduling ) - .def_property_readonly( "alerts_exchange", (std::string& (dripline::service::*)()) &dripline::service::alerts_exchange ) - .def_property_readonly( "requests_exchange", (std::string& (dripline::service::*)()) &dripline::service::requests_exchange ) .def_property_readonly( "sync_children", (std::map& (dripline::service::*)()) &dripline::service::sync_children ) //TODO: need to deal with lr_ptr_t to bind this //.def_property_readonly( "async_children", &dripline::service::async_children ) diff --git a/module_bindings/dripline_core/_service_trampoline.hh b/module_bindings/dripline_core/_service_trampoline.hh index 5c7047e5..f0400d07 100644 --- a/module_bindings/dripline_core/_service_trampoline.hh +++ b/module_bindings/dripline_core/_service_trampoline.hh @@ -25,7 +25,7 @@ namespace dripline_pybind { public: using _service::_service; //inherit constructors - _service_trampoline(_service &&base) : _service(std::move(base)) {} + //_service_trampoline(_service &&base) : _service(std::move(base)) {} // Local overrides bool bind_keys() override diff --git a/module_bindings/dripline_core/constants_pybind.hh b/module_bindings/dripline_core/constants_pybind.hh index 9c86fa7d..1e5a5049 100644 --- a/module_bindings/dripline_core/constants_pybind.hh +++ b/module_bindings/dripline_core/constants_pybind.hh @@ -19,9 +19,9 @@ namespace dripline_pybind .value( "cmd", dripline::op_t::cmd ) .value( "unknown", dripline::op_t::unknown ) // helpers for type conversion - .def( "to_int", (int (*)(dripline::op_t))&dripline::to_int, "Convert an op_t to int" ) + .def( "to_uint", (unsigned (*)(dripline::op_t))&dripline::to_uint, "Convert an op_t to integer" ) .def( "to_string", (std::string (*)(dripline::op_t))&dripline::to_string, "Convert an op_t to string" ) - .def_static( "to_op_t", (dripline::op_t (*)(int))&dripline::to_op_t, "Convert an int to op_t" ) + .def_static( "to_op_t", (dripline::op_t (*)(unsigned))&dripline::to_op_t, "Convert an integer to op_t" ) .def_static( "to_op_t", (dripline::op_t (*)(std::string))&dripline::to_op_t, "Convert an string to op_t" ) ; @@ -33,9 +33,9 @@ namespace dripline_pybind .value( "alert", dripline::msg_t::alert ) .value( "unknown", dripline::msg_t::unknown ) // helpers for type conversion - .def( "to_int", (int (*)(dripline::msg_t))&dripline::to_int, "Convert a msg_t to int" ) + .def( "to_uint", (unsigned (*)(dripline::msg_t))&dripline::to_uint, "Convert a msg_t to integer" ) .def( "to_string", (std::string (*)(dripline::msg_t))&dripline::to_string, "Convert a msg_t to string" ) - .def_static( "to_msg_t", (dripline::msg_t (*)(int))&dripline::to_msg_t, "Convert an int to msg_t" ) + .def_static( "to_msg_t", (dripline::msg_t (*)(unsigned))&dripline::to_msg_t, "Convert an integer to msg_t" ) .def_static( "to_msg_t", (dripline::msg_t (*)(std::string))&dripline::to_msg_t, "Convert a string to msg_t" ) ; diff --git a/module_bindings/dripline_core/core_pybind.hh b/module_bindings/dripline_core/core_pybind.hh index 65338d16..792c1954 100644 --- a/module_bindings/dripline_core/core_pybind.hh +++ b/module_bindings/dripline_core/core_pybind.hh @@ -23,11 +23,11 @@ namespace dripline_pybind std::shared_ptr< dripline::sent_msg_pkg > >( mod, "SentMessagePackage", "Data structure for sent messages" ) .def_property_readonly( "successful_send", [](const dripline::sent_msg_pkg& an_obj){ return an_obj.f_successful_send; } ) + .def_property_readonly( "send_error_message", [](const dripline::sent_msg_pkg& an_obj){ return an_obj.f_send_error_message; } ) ; all_items.push_back( "Core" ); - pybind11::class_< dripline::core, - std::shared_ptr< dripline::core > + pybind11::classh< dripline::core > t_core( mod, "Core", "lower-level class for AMQP message sending and receiving" ); // bind the core class @@ -42,6 +42,11 @@ namespace dripline_pybind pybind11::arg( "make_connection" ) = true ) + // Notes on send() bindings + // The bound functions use lambdas because the dripline::core functions include amqp_ptr_t arguments which aren't known to pybind11. + // Therefore when called from Python, the send process will use the default parameter, a new AMQP connection. + // The bindings to these functions are not included in a trampoline class because we're not directly overriding the C++ send() functions. + // Therefore calls to send() from a base-class pointer will not redirect appropriately to the derived-class versions of send(). .def( "send", [](dripline::core& a_core, dripline::request_ptr_t a_request){return a_core.send(a_request);}, DL_BIND_CALL_GUARD_STREAMS_AND_GIL, @@ -57,7 +62,17 @@ namespace dripline_pybind DL_BIND_CALL_GUARD_STREAMS_AND_GIL, "send an alert message" ) - + //.def_property( "address", std::static_cast< const std::string& (const dripline::core::*) >( &dripline::core::address ), [](dripline::core& a_core, const std::string& a_value){a_core.address() = a_value;} ) + .def_property( "address", [](const dripline::core& a_core){return a_core.address();}, [](dripline::core& a_core, const std::string& a_value){a_core.address() = a_value;} ) + .def_property( "port", &dripline::core::get_port, &dripline::core::set_port ) + .def_property( "username", [](const dripline::core& a_core){return a_core.username();}, [](dripline::core& a_core, const std::string& a_value){a_core.username() = a_value;} ) + .def_property( "password", [](const dripline::core& a_core){return a_core.password();}, [](dripline::core& a_core, const std::string& a_value){a_core.password() = a_value;} ) + .def_property( "requests_exchange", [](const dripline::core& a_core){return a_core.requests_exchange();}, [](dripline::core& a_core, const std::string& a_value){a_core.requests_exchange() = a_value;} ) + .def_property( "alerts_exchange", [](const dripline::core& a_core){return a_core.alerts_exchange();}, [](dripline::core& a_core, const std::string& a_value){a_core.alerts_exchange() = a_value;} ) + .def_property( "heartbeat_routing_key", [](const dripline::core& a_core){return a_core.heartbeat_routing_key();}, [](dripline::core& a_core, const std::string& a_value){a_core.heartbeat_routing_key() = a_value;} ) + .def_property( "max_payload_size", &dripline::core::get_max_payload_size, &dripline::core::set_max_payload_size ) + .def_property( "make_connection", &dripline::core::get_make_connection, &dripline::core::set_make_connection ) + .def_property( "max_connection_attempts", &dripline::core::get_max_connection_attempts, &dripline::core::set_max_connection_attempts ) ; // bind core's internal types diff --git a/module_bindings/dripline_core/error_pybind.hh b/module_bindings/dripline_core/error_pybind.hh index ee3524ff..11184989 100644 --- a/module_bindings/dripline_core/error_pybind.hh +++ b/module_bindings/dripline_core/error_pybind.hh @@ -2,32 +2,118 @@ #define DRIPLINE_PYBIND_ERROR #include "dripline_exceptions.hh" +#include "message.hh" +#include "typename.hh" + #include "pybind11/pybind11.h" +#include +#include + namespace dripline_pybind { + // For use in tests of thowing exceptions on the C++ side + void throw_dripline_error( const std::string& what_msg = "" ) + { + throw dripline::dripline_error() << what_msg; + } + + // For use in tests of throwing messages on the C++ side (this is done in core::do_send()) + void throw_message() + { + throw dripline::msg_request::create(scarab::param_ptr_t(new scarab::param()), dripline::op_t::get, "hey"); + } + + // Utility function so we can load the string version of a message into an error message + template< typename T > + std::string stream_message_ptr_to_error_message( T a_ptr, const std::string& a_prefix ) + { + std::stringstream sstr; + sstr << a_prefix << *a_ptr; + std::string error_msg(sstr.str()); + return std::string(sstr.str()); + } std::list< std::string > export_error( pybind11::module& mod ) { std::list< std::string > all_items; - //TODO how do we actually want to deal with errors? - all_items.push_back( "DriplineError" ); - pybind11::register_exception< dripline::dripline_error >( mod, "DriplineError", PyExc_RuntimeError ); + // For use in tests thowing exceptions on the C++ side and catching them on the Python side + all_items.push_back( "throw_dripline_error" ); + mod.def( "throw_dripline_error", &dripline_pybind::throw_dripline_error, + "Test function for throwing a dripline_error on the C++ side" ); + + // For use in tests thowing messages on the C++ side and catching them on the Python side + all_items.push_back( "throw_message" ); + mod.def( "throw_message", &dripline_pybind::throw_message, + "Test function for throwing a message on the C++ side" ); + + // Exception and exception translator registrations go below here. + // Per Pybind11 docs, exception translation happens in reverse order from how they appear here. + // Therefore we put the register_exception_translator try/catch block first, because it has a catch-all term for unknown exception types. + // Within the try-catch block, we start with known classes that we want to catch, and finish with the `...` catch-all. + // Following that, we have registered exceptions of known type. /* + // this static definition is used for the C++ --> Python throw_reply translation + PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store throw_reply_storage; + throw_reply_storage.call_once_and_store_result( + [&]() { return dripline.core.ThrowReply; } + ); +*/ pybind11::register_exception_translator( [](std::exception_ptr p) { try { if ( p ) std::rethrow_exception( p ); } - catch ( const dripline::dripline_error &e ) +/* + catch ( const dripline::throw_reply& e ) { - // Set dripline_error as the active python error - pybind11::set_error( PyExc_Exception, e.what() ); + // Usually throw replies go Python --> C++, but this is here in case there's a C++ --> Python situation + pybind11::set_error( dripline.core.ThrowReply ) + } +*/ + catch ( const dripline::message_ptr_t& e ) + { + std::string error_msg = std::move(stream_message_ptr_to_error_message(e, "Thrown message:\n")); + //std::cerr << "Caught message thrown:\n" << error_msg << std::endl; + pybind11::set_error( PyExc_RuntimeError, error_msg.c_str() ); + } + catch ( const dripline::request_ptr_t& e ) + { + std::string error_msg = std::move(stream_message_ptr_to_error_message(e, "Thrown request:\n")); + //::cerr << "Caught request thrown:\n" << error_msg << std::endl; + pybind11::set_error( PyExc_RuntimeError, error_msg.c_str() ); + } + catch ( const dripline::reply_ptr_t& e ) + { + std::string error_msg = std::move(stream_message_ptr_to_error_message(e, "Thrown reply:\n")); + //std::cerr << "Caught reply thrown:\n" << error_msg << std::endl; + pybind11::set_error( PyExc_RuntimeError, error_msg.c_str() ); + } + catch ( const dripline::alert_ptr_t& e ) + { + std::string error_msg = std::move(stream_message_ptr_to_error_message(e, "Thrown alert:\n")); + //std::cerr << "Caught alert thrown:\n" << error_msg << std::endl; + pybind11::set_error( PyExc_RuntimeError, error_msg.c_str() ); + } + catch (...) + { + // catch-all for unknown exception types + std::string exName(abi::__cxa_current_exception_type()->name()); + std::stringstream sstr; + sstr << "Unknown exception: " << exName; + std::string error_msg(sstr.str()); + //std::cerr << error_msg << std::endl; + pybind11::set_error( PyExc_RuntimeError, error_msg.c_str() ); } } - );*/ + ); + + all_items.push_back( "DriplineError" ); + pybind11::register_exception< dripline::dripline_error >( mod, "DriplineError", PyExc_RuntimeError ); + + return all_items; } diff --git a/module_bindings/dripline_core/scheduler_pybind.hh b/module_bindings/dripline_core/scheduler_pybind.hh index 4bde3b89..d089d5b7 100644 --- a/module_bindings/dripline_core/scheduler_pybind.hh +++ b/module_bindings/dripline_core/scheduler_pybind.hh @@ -20,9 +20,8 @@ namespace dripline_pybind using executor_t = dripline::simple_executor; using executable_t = std::function< void() >; using clock_t = std::chrono::system_clock; - pybind11::class_< dripline::scheduler< executor_t, clock_t >, - scarab::cancelable, - std::shared_ptr< dripline::scheduler< executor_t, clock_t > > + pybind11::classh< dripline::scheduler< executor_t, clock_t >, + scarab::cancelable >( mod, "Scheduler", "schedule future function calls" ) .def( pybind11::init<>() ) diff --git a/tests/integration/docker-compose-dev.yaml b/tests/integration/docker-compose-dev.yaml new file mode 100644 index 00000000..334dc249 --- /dev/null +++ b/tests/integration/docker-compose-dev.yaml @@ -0,0 +1,22 @@ +# Compose file that can be used for a development workflow, integrated with the rest of the integration environment +# The dripline-python source path is mounted into the container, assuming that this is being run from [dl-py top]/tests/integration +# Once the container is started, run `pip install -e /usr/local/src_dev` +# Then you should be able to run dripline applications and edit library source files on the host. +# Note that the _dripline library does not get installed in the normal location with a `pip -e` installation, +# and it instead ends up in the source directory. +# You can preface any dl-serve command with `PYTHONPATH=/usr/local/src_dev` to set the python path correctly. + +services: + dev: + image: ghcr.io/driplineorg/dripline-python:${DLPY_IMG_TAG:-latest-dev} + depends_on: + rabbit-broker: + condition: service_healthy + volumes: + - ../..:/usr/local/src_dev + - ./dripline_mesh.yaml:/root/.dripline_mesh.yaml + environment: + - DRIPLINE_USER=dripline + - DRIPLINE_PASSWORD=dripline + command: > + bash diff --git a/tests/integration/docker-compose-services.yaml b/tests/integration/docker-compose-services.yaml index 46b9c649..1f65fbe2 100644 --- a/tests/integration/docker-compose-services.yaml +++ b/tests/integration/docker-compose-services.yaml @@ -94,6 +94,18 @@ services: configs: - dl_pw.txt + heartbeat-monitor: + image: ghcr.io/driplineorg/dripline-python:${DLPY_IMG_TAG:-latest-dev} + depends_on: + rabbit-broker: + condition: service_healthy + volumes: + - ./services/heartbeat-monitor.yaml:/root/heartbeat-monitor.yaml + command: + bash -c "dl-serve -vv -b rabbit-broker -u dripline --password-file /dl_pw.txt -c /root/heartbeat-monitor.yaml heartbeat_interval_s=10" + configs: + - dl_pw.txt + configs: dl_pw.txt: file: ./password.txt diff --git a/tests/integration/docker-compose.yaml b/tests/integration/docker-compose.yaml index aa0c27ab..d22546cb 100644 --- a/tests/integration/docker-compose.yaml +++ b/tests/integration/docker-compose.yaml @@ -2,7 +2,7 @@ services: # The broker for the mesh rabbit-broker: - image: rabbitmq:3-management + image: rabbitmq:4-management ports: - "15672:15672" environment: diff --git a/tests/integration/services/heartbeat-monitor.yaml b/tests/integration/services/heartbeat-monitor.yaml new file mode 100644 index 00000000..1ab4fd6f --- /dev/null +++ b/tests/integration/services/heartbeat-monitor.yaml @@ -0,0 +1,15 @@ +name: heartbeat_monitor +module: HeartbeatMonitor + +heartbeat_interval_s: 30 +warning_threshold_s: 45 + +# AlertConsumer Inits +alert_keys: + - "heartbeat.#" +alert_key_parser_re: 'heartbeat\.(?P\w+)' + +endpoints: + - name: hbmon_heartbeat_monitor + service_name: heartbeat_monitor + module: HeartbeatTracker diff --git a/tests/test_constants.py b/tests/test_constants.py index 657998bd..313d7de7 100644 --- a/tests/test_constants.py +++ b/tests/test_constants.py @@ -3,10 +3,10 @@ def test_op_t_to_int(): item = dripline.core.op_t - assert(item.to_int(item.set) == item.set.value) - assert(item.to_int(item.get) == item.get.value) - assert(item.to_int(item.cmd) == item.cmd.value) - assert(item.to_int(item.unknown) == item.unknown.value) + assert(item.to_uint(item.set) == item.set.value) + assert(item.to_uint(item.get) == item.get.value) + assert(item.to_uint(item.cmd) == item.cmd.value) + assert(item.to_uint(item.unknown) == item.unknown.value) def test_op_t_to_string(): item = dripline.core.op_t @@ -35,10 +35,10 @@ def test_op_t_string_to_op_t(): def test_msg_t_to_int(): item = dripline.core.msg_t - assert(item.to_int(item.reply) == item.reply.value) - assert(item.to_int(item.request) == item.request.value) - assert(item.to_int(item.alert) == item.alert.value) - assert(item.to_int(item.unknown) == item.unknown.value) + assert(item.to_uint(item.reply) == item.reply.value) + assert(item.to_uint(item.request) == item.request.value) + assert(item.to_uint(item.alert) == item.alert.value) + assert(item.to_uint(item.unknown) == item.unknown.value) def test_msg_t_to_string(): item = dripline.core.msg_t diff --git a/tests/test_endpoint.py b/tests/test_endpoint.py index 20291206..6efe6c9e 100644 --- a/tests/test_endpoint.py +++ b/tests/test_endpoint.py @@ -1,5 +1,7 @@ import scarab, dripline.core +import pytest + def test_endpoint_creation(): a_name = "an_endpoint" an_endpoint = dripline.core.Endpoint(a_name) @@ -28,43 +30,26 @@ def test_on_request_message(): def test_on_reply_message(): an_endpoint = dripline.core.Endpoint("hello") a_reply = dripline.core.MsgReply.create() - print(a_reply) - flag = False - try: + with pytest.raises(dripline.core.DriplineError) as excinfo: an_endpoint.on_reply_message(a_reply) - except Exception: # seems like this is not throwing an actual DriplineError, just a generic Exception. dripline.core.DriplineError - flag = True - assert(flag) def test_on_alert_message(): an_endpoint = dripline.core.Endpoint("hello") an_alert = dripline.core.MsgAlert.create() - flag = False - try: + with pytest.raises(dripline.core.DriplineError) as excinfo: an_endpoint.on_alert_message(an_alert) - except Exception: # seems like this is not throwing an actual DriplineError, just a generic Exception. dripline.core.DriplineError - flag =True - assert(flag) def test_do_get_request_no_specifier(): an_endpoint = dripline.core.Endpoint("hello") a_get_request = dripline.core.MsgRequest.create() - flag = False - try: + with pytest.raises(dripline.core.ThrowReply) as excinfo: a_reply = an_endpoint.do_get_request(a_get_request) - except dripline.core.ThrowReply: - flag = True - assert(flag) def test_do_get_request_invalid_specifier(): an_endpoint = dripline.core.Endpoint("an_endpoint") a_get_request = dripline.core.MsgRequest.create(scarab.ParamValue(5), dripline.core.op_t.get, "hey", "namee", "a_receiver") - correct_error = False - try: + with pytest.raises(dripline.core.ThrowReply) as excinfo: a_reply = an_endpoint.do_get_request(a_get_request) - except dripline.core.ThrowReply as e: - correct_error = True - assert(correct_error) def test_do_get_request_valid_specifier(): a_name = "an_endpoint" @@ -83,12 +68,8 @@ def test_do_set_request_no_specifier(): the_node.add("values", scarab.ParamArray()) the_node["values"].push_back(scarab.ParamValue("a_better_endpoint")) a_set_request = dripline.core.MsgRequest.create(the_node, dripline.core.op_t.set, "hey") - correct_error = False - try: + with pytest.raises(dripline.core.ThrowReply) as excinfo: a_reply = an_endpoint.do_set_request(a_set_request) - except dripline.core.ThrowReply as e: - correct_error = True - assert(correct_error) def test_do_set_request_invalid_specifier(): an_endpoint = dripline.core.Endpoint("an_endpoint") @@ -96,13 +77,8 @@ def test_do_set_request_invalid_specifier(): the_node.add("values", scarab.ParamArray()) the_node["values"].push_back(scarab.ParamValue("a_better_endpoint")) a_set_request = dripline.core.MsgRequest.create(the_node, dripline.core.op_t.set, "hey", "namee", "a_receiver") - correct_error = False - try: + with pytest.raises(dripline.core.ThrowReply) as excinfo: a_reply = an_endpoint.do_set_request(a_set_request) - except dripline.core.ThrowReply as e: - correct_error = True - print('content of error:\n{}'.format(dir(e))) - assert(correct_error) def test_do_set_request_valid_specifier(): value1 = "an_endpoint" @@ -125,14 +101,8 @@ class EndpointWithMember(dripline.core.Endpoint): def test_do_cmd_request_invalid_specifier(): an_endpoint = dripline.core.Endpoint("an_endpoint") a_cmd_request = dripline.core.MsgRequest.create(scarab.Param(), dripline.core.op_t.cmd, "hey", "on_gett", "a_receiver") - correct_error = False - try: + with pytest.raises(dripline.core.ThrowReply) as excinfo: a_reply = an_endpoint.do_cmd_request(a_cmd_request) - except dripline.core.ThrowReply as e: - correct_error = True - except Exception as e: - print("got a [{}]".format(type(e))) - assert(correct_error) def test_do_cmd_request_valid_specifier(): class AnotherEndpoint(dripline.core.Endpoint): diff --git a/tests/test_entity.py b/tests/test_entity.py new file mode 100644 index 00000000..6cfddbf0 --- /dev/null +++ b/tests/test_entity.py @@ -0,0 +1,12 @@ +import dripline +import pytest + +def test_log_a_value(): + a_service = dripline.core.Service("hello", make_connection=False) + a_entity = dripline.core.Entity(name="ent") + a_service.add_child(a_entity) + with pytest.raises(RuntimeError) as excinfo: + a_entity.log_a_value(5) + assert excinfo.type is RuntimeError + assert "Thrown alert:" in str(excinfo.value) + assert "Payload: 5" in str(excinfo.value) diff --git a/tests/test_error.py b/tests/test_error.py new file mode 100644 index 00000000..ffba0f33 --- /dev/null +++ b/tests/test_error.py @@ -0,0 +1,21 @@ +import dripline +import pytest + +def test_raise_dripline_error(): + with pytest.raises(dripline.core.DriplineError) as excinfo: + raise dripline.core.DriplineError + assert excinfo.type is dripline.core.DriplineError + +def test_throw_dripline_error(): + message = "test_throw" + with pytest.raises(dripline.core.DriplineError, match=message) as excinfo: + dripline.core.throw_dripline_error(message) + assert excinfo.type is dripline.core.DriplineError + +# In dripline::core::do_send() we throw a dripline message object. +# Let's make sure that gets caught in a reasonable way from the Python side +def test_throw_message(): + with pytest.raises(RuntimeError) as excinfo: + dripline.core.throw_message() + print(f'Exception value: {excinfo.value}') + assert excinfo.type is RuntimeError diff --git a/tests/test_messages.py b/tests/test_messages.py index 1780532f..ea0ae337 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -50,6 +50,11 @@ def test_lockout_key_roundtrip(): lk_rt = a_request.lockout_key # get the id back assert(lk_rt == random_uuid) # test being able to set and then get the id + random_uuid_2 = uuid.uuid4() + a_request.lockout_key = str(random_uuid_2) # set the id from a string + lk_rt_2 = a_request.lockout_key # get the id back + assert(lk_rt_2 == random_uuid_2) # test being able to set and then get the id + def test_request_reply_default(): a_request = _dripline.core.MsgRequest.create(reply_to = "a_receiver") a_reply = a_request.reply() diff --git a/tests/test_service.py b/tests/test_service.py new file mode 100644 index 00000000..7b56b394 --- /dev/null +++ b/tests/test_service.py @@ -0,0 +1,140 @@ +import scarab, dripline + +import pytest + +def test_service_creation(): + a_name = "a_service" + a_service = dripline.core.Service(a_name, make_connection=False) + assert(a_service.name == a_name) + +def test_submit_request_message(): + a_name = "a_service" + a_service = dripline.core.Service(a_name, make_connection=False) + a_request = dripline.core.MsgRequest.create(scarab.ParamValue(5), dripline.core.op_t.get, a_name, "name", "a_receiver") + # Should throw a reply: on_request_message results in a reply message that gets sent; in core, since the service is offline, the reply is thrown + with pytest.raises(RuntimeError) as excinfo: + a_reply = a_service.submit_request_message(a_request) + assert excinfo.type is RuntimeError + assert "Thrown reply:" in str(excinfo.value) + #assert(isinstance(a_reply, dripline.core.MsgReply)) + #assert(a_reply.return_code == 0) + #assert(a_reply.correlation_id == a_request.correlation_id) + #a_reply.payload.to_python()['values'] == [a_name] + +def test_on_request_message(): + a_name = "a_service" + a_service = dripline.core.Service(a_name, make_connection=False) + a_request = dripline.core.MsgRequest.create(scarab.ParamValue(5), dripline.core.op_t.get, a_name, "name", "a_receiver") + # Should throw a reply: on_request_message results in a reply message that gets sent; in core, since the service is offline, the reply is thrown + with pytest.raises(RuntimeError) as excinfo: + a_reply = a_service.on_request_message(a_request) + assert excinfo.type is RuntimeError + assert "Thrown reply:" in str(excinfo.value) +# assert(isinstance(a_reply, dripline.core.MsgReply)) +# assert(a_reply.return_code == 0) # 0 +# assert(a_reply.correlation_id == a_request.correlation_id) +# a_reply.payload.to_python()['values'] == [a_name] + +def test_on_reply_message(): + a_service = dripline.core.Service("hello", make_connection=False) + a_reply = dripline.core.MsgReply.create() + with pytest.raises(dripline.core.DriplineError) as excinfo: + a_service.on_reply_message(a_reply) + +def test_on_alert_message(): + a_service = dripline.core.Service("hello", make_connection=False) + an_alert = dripline.core.MsgAlert.create() + with pytest.raises(dripline.core.DriplineError) as excinfo: + a_service.on_alert_message(an_alert) + +def test_do_get_request_no_specifier(): + a_service = dripline.core.Service("hello", make_connection=False) + a_get_request = dripline.core.MsgRequest.create() + with pytest.raises(dripline.core.ThrowReply) as excinfo: + a_reply = a_service.do_get_request(a_get_request) + +def test_do_get_request_invalid_specifier(): + a_service = dripline.core.Service("a_service", make_connection=False) + a_get_request = dripline.core.MsgRequest.create(scarab.ParamValue(5), dripline.core.op_t.get, "hey", "namee", "a_receiver") + with pytest.raises(dripline.core.ThrowReply) as excinfo: + a_reply = a_service.do_get_request(a_get_request) + +def test_do_get_request_valid_specifier(): + a_name = "a_service" + a_service = dripline.core.Service(a_name, make_connection=False) + a_get_request = dripline.core.MsgRequest.create(scarab.ParamValue(5), dripline.core.op_t.get, "hey", "name", "a_receiver") + a_reply = a_service.do_get_request(a_get_request) + assert(isinstance(a_reply, dripline.core.MsgReply)) + assert(a_reply.return_code == 0) + assert(a_reply.correlation_id == a_get_request.correlation_id) + a_reply.payload.to_python()['values'] == [a_name] + +def test_do_set_request_no_specifier(): + print("start test") + a_service = dripline.core.Service("hello", make_connection=False) + the_node = scarab.ParamNode() + the_node.add("values", scarab.ParamArray()) + the_node["values"].push_back(scarab.ParamValue("a_better_service")) + a_set_request = dripline.core.MsgRequest.create(the_node, dripline.core.op_t.set, "hey") + with pytest.raises(dripline.core.ThrowReply) as excinfo: + a_reply = a_service.do_set_request(a_set_request) + +def test_do_set_request_invalid_specifier(): + a_service = dripline.core.Service("a_service", make_connection=False) + the_node = scarab.ParamNode() + the_node.add("values", scarab.ParamArray()) + the_node["values"].push_back(scarab.ParamValue("a_better_service")) + a_set_request = dripline.core.MsgRequest.create(the_node, dripline.core.op_t.set, "hey", "namee", "a_receiver") + with pytest.raises(dripline.core.ThrowReply) as excinfo: + a_reply = a_service.do_set_request(a_set_request) + +def test_do_set_request_valid_specifier(): + value1 = "a_service" + value2 = "a_better_service" + ## the service base class doesn't have any settable members, create one: + class ServiceWithMember(dripline.core.Service): + a_value = value1 + a_service = ServiceWithMember("a_service", make_connection=False) + the_node = scarab.ParamNode() + the_node.add("values", scarab.ParamArray()) + the_node["values"].push_back(scarab.ParamValue(value2)) + a_set_request = dripline.core.MsgRequest.create(the_node, dripline.core.op_t.set, "hey", "a_value", "a_receiver") + a_reply = a_service.do_set_request(a_set_request) + assert(isinstance(a_reply, dripline.core.MsgReply)) + assert(a_reply.return_code == 0) + assert(a_reply.correlation_id == a_set_request.correlation_id) + print(a_service.name) + assert(a_service.a_value == value2) + +def test_do_cmd_request_invalid_specifier(): + a_service = dripline.core.Service("a_service", make_connection=False) + a_cmd_request = dripline.core.MsgRequest.create(scarab.Param(), dripline.core.op_t.cmd, "hey", "on_gett", "a_receiver") + with pytest.raises(dripline.core.ThrowReply) as excinfo: + a_reply = a_service.do_cmd_request(a_cmd_request) + +def test_do_cmd_request_valid_specifier(): + class AnotherService(dripline.core.Service): + def __init__(self, name, **kwargs): + dripline.core.Service.__init__(self, name, **kwargs) + def a_method(self, n1, n2): + return n1 + n2 + a_service = AnotherService("a_service", make_connection=False) + the_node = scarab.ParamNode() + the_node.add("values", scarab.ParamArray()) + n1, n2 = 10, 13 + the_node["values"].push_back(scarab.ParamValue(n1)) + the_node["values"].push_back(scarab.ParamValue(n2)) + a_cmd_request = dripline.core.MsgRequest.create(the_node, dripline.core.op_t.cmd, "hey", "a_method", "a_receiver") + a_reply = a_service.do_cmd_request(a_cmd_request) + assert(isinstance(a_reply, dripline.core.MsgReply)) + assert(a_reply.return_code == 0) + assert(a_reply.correlation_id == a_cmd_request.correlation_id) + assert(a_reply.payload.to_python() == n1 + n2) + +def test_send_request(): + a_service = dripline.core.Service("a_service", make_connection=False) + a_request = dripline.core.MsgRequest.create(scarab.Param(), dripline.core.op_t.get, "hey", "on_get", "a_receiver") + with pytest.raises(RuntimeError, match="Thrown request:*") as excinfo: + a_sent_msg = a_service.send(a_request) + print(f'Exception value:\n{excinfo.value}') + assert excinfo.type is RuntimeError