diff --git a/Dockerfile b/Dockerfile index f881e57a..5b51f331 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ ARG img_user=ghcr.io/driplineorg ARG img_repo=dripline-cpp #ARG img_tag=develop -ARG img_tag=v2.10.0 +ARG img_tag=v2.10.1 FROM ${img_user}/${img_repo}:${img_tag} diff --git a/dripline/core/calibrate.py b/dripline/core/calibrate.py index f9c50ab8..10f72502 100644 --- a/dripline/core/calibrate.py +++ b/dripline/core/calibrate.py @@ -49,10 +49,12 @@ def wrapper(self, *args, **kwargs): val_dict['value_cal'] = cal elif isinstance(self._calibration, dict): logger.debug('calibration is dictionary, looking up value') - if val_dict['value_raw'] in self._calibration: - val_dict['value_cal'] = self._calibration[val_dict['value_raw']] + value_raw_str = str(val_dict['value_raw']) + if value_raw_str in self._calibration: + val_dict['value_cal'] = self._calibration[str(val_dict['value_raw'])] else: - raise ThrowReply('service_error_invalid_value', f"raw value <{repr(val_dict['value_raw'])}> not in cal dict") + raise ThrowReply('service_error_invalid_value', f"raw value <{str(val_dict['value_raw'])}> {type(val_dict['value_raw'])} not in cal dict with calibrate dict {self._calibration}") + logger.debug(f"formatted cal is:\n{ val_dict['value_cal'] }") else: logger.warning('the _calibration property is of unknown type') return val_dict diff --git a/dripline/core/entity.py b/dripline/core/entity.py index cb9ef3bd..13b7db20 100644 --- a/dripline/core/entity.py +++ b/dripline/core/entity.py @@ -1,13 +1,13 @@ import datetime import functools -import types import numbers import scarab +from _dripline.core import MsgAlert from .endpoint import Endpoint -from dripline.core import MsgAlert +from .throw_reply import ThrowReply __all__ = [] import logging @@ -31,6 +31,11 @@ def wrapper(*args, **kwargs): values.update({'value_raw': args[0]}) logger.debug('set done, now log') self.log_a_value(values) + try: + this_value = float(values[self._check_field]) + except (TypeError, ValueError): + this_value = False + self._last_log_value = this_value return result return wrapper @@ -58,16 +63,29 @@ class Entity(Endpoint): ''' #check_on_set -> allows for more complex logic to confirm successful value updates # (for example, the success condition may be measuring another endpoint) - def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_interval=0, log_on_set=False, calibration=None, **kwargs): + def __init__(self, + get_on_set=False, + log_on_set=False, + log_routing_key_prefix='sensor_value', + log_interval=0, + max_interval=0, + max_fractional_change=0, + check_field='value_cal', + calibration=None, + **kwargs): ''' Args: get_on_set: if true, calls to on_set are immediately followed by an on_get, which is returned + log_on_set: if true, always call log_a_value() immediately after on_set + **Note:** requires get_on_set be true, overrides must be equivalent log_routing_key_prefix: first term in routing key used in alert messages which log values - log_interval: how often to log the Entity's value. If 0 then scheduled logging is disabled; + log_interval: how often to check the Entity's value. If 0 then scheduled logging is disabled; if a number, interpreted as number of seconds; if a dict, unpacked as arguments to the datetime.time_delta initializer; if a datetime.timedelta taken as the new value - log_on_set: if true, always call log_a_value() immediately after on_set - **Note:** requires get_on_set be true, overrides must be equivalent + max_interval: max allowed time interval between logging, allows usage of conditional logging. If 0, + then logging values occurs every log_interval. + max_fractional_change: max allowed fractional difference between subsequent values to trigger log condition. + check_field: result field to check, 'value_cal' or 'value_raw' calibration (string || dict) : if string, updated with raw on_get() result via str.format() in @calibrate decorator, used to populate raw and calibrated values fields of a result payload. If a dictionary, the raw result is used @@ -81,13 +99,16 @@ def __init__(self, get_on_set=False, log_routing_key_prefix='sensor_value', log_ # keep a reference to the on_set (possibly decorated in a subclass), needed for changing *_on_set configurations self.__initial_on_set = self.on_set - self._get_on_set = None self._log_on_set = None self.get_on_set = get_on_set self.log_on_set = log_on_set self.log_interval = log_interval + self._max_interval = max_interval + self._max_fractional_change = max_fractional_change + self._check_field = check_field self._log_action_id = None + self._last_log_time = None @property def get_on_set(self): @@ -136,10 +157,31 @@ def log_interval(self, new_interval): def scheduled_log(self): logger.debug("in a scheduled log event") result = self.on_get() + try: + this_value = float(result[self._check_field]) + except (TypeError, ValueError): + this_value = False + # Various checks for log condition + if self._last_log_time is None: + logger.debug("log because no last log") + elif (datetime.datetime.utcnow() - self._last_log_time).total_seconds() > self._max_interval: + logger.debug("log because too much time") + elif this_value is False: + logger.warning(f"cannot check value change for {self.name}") + return + elif ((self._last_log_value == 0 and this_value != 0) or + (self._last_log_value != 0 and\ + abs((self._last_log_value - this_value)/self._last_log_value)>self._max_fractional_change)): + logger.debug("log because change magnitude") + else: + logger.debug("no log condition met, not logging for {self.name}") + return + self._last_log_value = this_value self.log_a_value(result) def log_a_value(self, the_value): - logger.debug(f"value to log is:\n{the_value}") + logger.info(f"value to log for {self.name} is:\n{the_value}") + self._last_log_time = datetime.datetime.utcnow() the_alert = MsgAlert.create(payload=scarab.to_param(the_value), routing_key=f'{self.log_routing_key_prefix}.{self.name}') alert_sent = self.service.send(the_alert) diff --git a/dripline/core/interface.py b/dripline/core/interface.py index 05182b1d..77dec191 100644 --- a/dripline/core/interface.py +++ b/dripline/core/interface.py @@ -13,7 +13,7 @@ class Interface(Core): A class that provides user-friendly methods for dripline interactions in a Python interpreter. Intended for use as a dripline client in scripts or interactive sessions. ''' - def __init__(self, username: str | dict=None, password: str | dict=None, dripline_mesh: dict=None, timeout_s: int=10, confirm_retcodes: bool=True): + def __init__(self, authentication_obj = None, username: str | dict=None, password: str | dict=None, dripline_mesh: dict=None, timeout_s: int=10, confirm_retcodes: bool=True): ''' Configures an interface with the necessary parameters. @@ -48,18 +48,21 @@ def __init__(self, username: str | dict=None, password: str | dict=None, driplin if dripline_mesh is not None: dripline_config.update(dripline_mesh) - dl_auth_spec = create_dripline_auth_spec() - auth_args = { - 'username': {} if username is None else username, - 'password': {} if password is None else password, - } - dl_auth_spec.merge( scarab.to_param(auth_args) ) - auth_spec = scarab.ParamNode() - auth_spec.add('dripline', dl_auth_spec) - logger.debug(f'Loading auth spec:\n{auth_spec}') - auth = scarab.Authentication() - auth.add_groups(auth_spec) - auth.process_spec() + if authentication_obj is not None: + auth = authentication_obj + else: + dl_auth_spec = create_dripline_auth_spec() + auth_args = { + 'username': {} if username is None else username, + 'password': {} if password is None else password, + } + dl_auth_spec.merge( scarab.to_param(auth_args) ) + auth_spec = scarab.ParamNode() + auth_spec.add('dripline', dl_auth_spec) + logger.debug(f'Loading auth spec:\n{auth_spec}') + auth = scarab.Authentication() + auth.add_groups(auth_spec) + auth.process_spec() Core.__init__(self, config=scarab.to_param(dripline_config), auth=auth) diff --git a/dripline/core/service.py b/dripline/core/service.py index 3a185220..36815a17 100644 --- a/dripline/core/service.py +++ b/dripline/core/service.py @@ -2,11 +2,16 @@ import scarab from _dripline.core import _Service, DriplineConfig, create_dripline_auth_spec +from _dripline.core import MsgRequest, Receiver, op_t from .throw_reply import ThrowReply from .object_creator import ObjectCreator -import datetime import logging +import types +import datetime +import numbers +import subprocess +import time logger = logging.getLogger(__name__) @@ -59,7 +64,10 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now broadcast_key='broadcast', loop_timeout_ms=1000, message_wait_ms=1000, heartbeat_interval_s=60, username=None, password=None, authentication_obj=None, - dripline_mesh=None, **kwargs): + dripline_mesh=None, + heartbeat_broker_s=60, + rk_aliveness="state", + **kwargs): ''' Configures a service with the necessary parameters. @@ -97,6 +105,8 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now Authentication information provided as a scarab.Authentication object; this will override the auth parameter. dripline_mesh : dict, optional Provide optional dripline mesh configuration information (see dripline_config for more information) + heartbeat_broker_s (int): self defined scheduler interval (seconds) to check the aliveness + rk_aliveness (str): the routing key to check the aliveness (on_get run by default) ''' # Final dripline_mesh config should be the default updated by the parameters passed by the caller dripline_config = DriplineConfig().to_python() @@ -138,6 +148,54 @@ def __init__(self, name, make_connection=True, endpoints=None, add_endpoints_now if kwargs: logger.debug(f'Service received some kwargs that it doesn\'t handle, which will be ignored: {kwargs}') + self._heartbeat_action_id = None + self._message_wait_ms = message_wait_ms + self.heartbeat_broker_s = heartbeat_broker_s + self.rk_aliveness = rk_aliveness + self.broker = config["dripline_mesh"]["broker"] + self.start_heartbeat() + + @property + def heartbeat_broker_s(self): + return self._heartbeat_broker_s + @heartbeat_broker_s.setter + def heartbeat_broker_s(self, new_interval): + if isinstance(new_interval, numbers.Number): + self._heartbeat_broker_s = datetime.timedelta(seconds=new_interval) + elif isinstance(new_interval, dict): + self._heartbeat_broker_s = datetime.timedelta(**new_interval) + elif isinstance(new_interval, datetime.timedelta): + self._heartbeat_broker_s = new_interval + else: + raise ThrowReply('service_error_invalid_value', f"unable to interpret a new_interval for heartbeat test of type <{type(new_interval)}>") + + def scheduled_heartbeat(self): + logger.info("in a scheduled hearbeat event") + a_node= scarab.ParamNode() + a_receiver = Receiver() + #a_node.add('values',scarab.ParamValue('5')) + the_request = MsgRequest.create(a_node, op_t.get,self.rk_aliveness) + reply_pkg = self.send(the_request) + if not reply_pkg.successful_send: + raise ThrowReply("Failed rabbitmq connection test.") + sig_handler = scarab.SignalHandler() + sig_handler.add_cancelable(a_receiver) + result = a_receiver.wait_for_reply(reply_pkg, self._message_wait_ms) # receiver expects ms + sig_handler.remove_cancelable(a_receiver) + the_value = result.payload + logger.info(f"get {self.rk_aliveness}, result: {the_value}") + logger.info(f"rabbitmq connection is found in this scheduler") + + def start_heartbeat(self): + if self._heartbeat_action_id is not None: + self.unschedule(self._heartbeat_action_id) + if self.heartbeat_broker_s: + logger.info(f'should start heart beat connection check every {self.heartbeat_broker_s}') + self._heartbeat_action_id = self.schedule(self.scheduled_heartbeat, self.heartbeat_broker_s, datetime.datetime.now() + self.execution_buffer*3) + else: + raise ValueError('unable to start logging when heartbeat_broker_s evaluates false') + logger.debug(f'heartbeat action id is {self._heartbeat_action_id}') + def add_endpoints_from_config(self): if self.endpoint_configs is not None: for an_endpoint_conf in self.endpoint_configs: diff --git a/dripline/implementations/postgres_interface.py b/dripline/implementations/postgres_interface.py index 080fea1e..9ebfb606 100644 --- a/dripline/implementations/postgres_interface.py +++ b/dripline/implementations/postgres_interface.py @@ -104,6 +104,10 @@ def __init__(self, table_name, self._optional_insert_names = self._ensure_col_key_map(optional_insert_names) self._default_insert_dict = default_insert_values + def on_get(self): + result = {"default":"this is a default output"} + return result + def _ensure_col_key_map(self, column_list): to_return = [] for a_col in column_list: @@ -134,7 +138,8 @@ def do_select(self, return_cols=[], where_eq_dict={}, where_lt_dict={}, where_gt return_cols = self.table.c else: return_cols = [sqlalchemy.text(col) for col in return_cols] - this_select = sqlalchemy.select(return_cols) + + this_select = sqlalchemy.select(*return_cols) for c,v in where_eq_dict.items(): this_select = this_select.where(getattr(self.table.c,c)==v) for c,v in where_lt_dict.items(): @@ -143,6 +148,7 @@ def do_select(self, return_cols=[], where_eq_dict={}, where_lt_dict={}, where_gt this_select = this_select.where(getattr(self.table.c,c)>v) conn = self.service.engine.connect() result = conn.execute(this_select) + conn.commit() return (result.keys(), [i for i in result]) def _insert_with_return(self, insert_kv_dict, return_col_names_list):