Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f9a86d9
Added RequestSender class that should handle sending dl messages. Mod…
pkolbeck Feb 15, 2025
63d0d55
Added request_sender class. Tested with service. Not tested with Inte…
pkolbeck Feb 19, 2025
7907603
Removed weird encapsulation fo response in do_get_request.
pkolbeck Mar 4, 2025
a3c2ca2
Changed do_get_request to handle returning dictionaries
pkolbeck Apr 7, 2025
e344704
Changed result_to_scarab_payload to handle dictionaries and any.
pkolbeck Apr 7, 2025
e19cbb0
Adjusted do_get_request and result_to_scarab_payload to handle dicts …
pkolbeck Apr 7, 2025
5f19450
Merge branch 'bugfix/do_get_dicts' into feature/getsetcmd-mixin
pkolbeck Apr 7, 2025
4d595e4
Added RequestReceiver class as a mixin for endpoint and service. Get …
pkolbeck Apr 8, 2025
bb17798
updated versions.
pkolbeck Apr 10, 2025
d54beca
undo
pkolbeck Apr 10, 2025
3cfb40c
Add UUID setting from string test
nsoblath Apr 10, 2025
2ca25d2
_check_lockout_key handles uuid objs and strs
pkolbeck Apr 10, 2025
5cdfe03
Implemented changes from pull request comments.
pkolbeck Apr 11, 2025
c28bde1
Add bindings to service::send() and comments on how send bindings are…
nsoblath Apr 11, 2025
705e32c
Undoing the breaking changes I made in the previous commit to the _se…
nsoblath Apr 11, 2025
1310ae4
Simplified on_get/set/cmd_request() function calls and added some doc…
nsoblath Apr 12, 2025
1488b1b
Fixing on_[op]_request() functions to correctly get trampolined calls…
nsoblath Apr 16, 2025
e9326be
Improving tests in test_endpoint.py that check for raising of exceptions
nsoblath Apr 16, 2025
49d46f8
Merge commit '51b4ec9d303b3ac54512e3028b96bf7c6887864c' into feature/…
nsoblath Apr 17, 2025
8a29b91
Change class name to avoid confusion with dl-cpp classes: RequestRece…
nsoblath Apr 17, 2025
d6f1c7e
Fixed a couple more names
nsoblath Apr 17, 2025
ff8e495
Added classes to __init__.py
nsoblath Apr 17, 2025
f0fd43a
Merge branch 'develop' into feature/getsetcmd-mixin
nsoblath Apr 18, 2025
1096230
Merge branch 'feature/getsetcmd-mixin' into develop
nsoblath Apr 18, 2025
f3d492d
Fixing deprecated utc timezone use
nsoblath Apr 26, 2025
a07dd2b
Removing unnecessary imports
nsoblath Apr 26, 2025
98d1b6f
Fixing Service send-mesage API
nsoblath Apr 26, 2025
a0315a7
Fixing service binding
nsoblath Apr 26, 2025
bf2d05a
Add send_error_message binding
nsoblath Apr 26, 2025
93610f4
Removing unnecessary line
nsoblath Apr 26, 2025
6484826
Adding test_service.py
nsoblath Apr 26, 2025
56368a8
Specify no connection made in test_service
nsoblath Apr 29, 2025
30d3b2a
More complete exception translation
nsoblath May 2, 2025
647e07a
Minor fix in entity.py
nsoblath May 2, 2025
ce14b84
Add test_error.py and update test_service.py
nsoblath May 2, 2025
d8d6efd
Add very-incomplete test_entity.py
nsoblath May 2, 2025
e6eaad3
Use an alternate image for the dl-cpp base class
nsoblath May 2, 2025
1f9ae8e
Fixing test_entity test
nsoblath May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ env:
REGISTRY_OLD: docker.io
BASE_IMAGE_USER: ghcr.io/driplineorg
BASE_IMAGE_REPO: dripline-cpp
DEV_SUFFIX: '-dev'
BASE_IMAGE_TAG: 'v2.10.4'
# BASE_IMAGE_TAG: 'hf2.10.4'
# DEV_SUFFIX: ''
#DEV_SUFFIX: '-dev'
#BASE_IMAGE_TAG: 'v2.10.4'
BASE_IMAGE_TAG: 'mols'
DEV_SUFFIX: ''

jobs:

Expand Down
2 changes: 2 additions & 0 deletions dripline/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from .entity import *
from .interface import *
from .object_creator import *
from .request_handler import *
from .request_sender import *
from .return_codes import *
from .service import *
from .throw_reply import *
123 changes: 44 additions & 79 deletions dripline/core/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
__all__ = []

import scarab
from _dripline.core import _Endpoint
from .throw_reply import ThrowReply
from .request_handler import RequestHandler

import logging

Expand All @@ -11,7 +10,7 @@
__all__.append('Endpoint')


class Endpoint(_Endpoint):
class Endpoint(_Endpoint, RequestHandler):
'''
Base class for all objects which can be sent dripline requests.
Every object described in a runtime configuration passed to `dl-serve` should derive from this class (either directly or indirectly).
Expand All @@ -24,88 +23,54 @@ def __init__(self, name):
'''
_Endpoint.__init__(self, name)

def result_to_scarab_payload(self, result: str):
"""
Intercept result values and throw error if scarab is unable to convert to param
TODO: Handles global Exception case, could be more specific
def do_get_request(self, a_request_message):
'''
Default function for handling an OP_GET request message addressed to this endpoint.

.. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic
dripline-python behavior for an endpoint receiving a get request, including using the specifier to access attributes,
and calling on_get() when there is no specifier.
As an extension author you might typically override RequestReciever.on_get(), but leave this function alone.

.. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from
the C++ base class. It intentionally just calls the version of do_get_request() in RequestHandler.

Args:
result (str): request message passed
"""
try:
return scarab.to_param(result)
except Exception as e:
raise ThrowReply('service_error_bad_payload',
f"{self.name} unable to convert result to scarab payload: {result}")
a_request_message (MsgRequest): the message receveived by this endpoint
'''

def do_get_request(self, a_request_message):
logger.info("in do_get_request")
a_specifier = a_request_message.specifier.to_string()
if (a_specifier):
logger.debug("has specifier")
try:
logger.debug(f"specifier is: {a_specifier}")
an_attribute = getattr(self, a_specifier)
logger.debug(f"attribute '{a_specifier}' value is [{an_attribute}]")
the_node = scarab.ParamNode()
the_node.add("values", scarab.ParamArray())
the_node["values"].push_back(scarab.ParamValue(an_attribute))
return a_request_message.reply(payload=the_node)
except AttributeError as this_error:
raise ThrowReply('service_error_invalid_specifier',
f"endpoint {self.name} has no attribute {a_specifier}, unable to get")
else:
logger.debug('no specifier')
the_value = self.on_get()
return a_request_message.reply(payload=self.result_to_scarab_payload(the_value))
return RequestHandler.do_get_request(self, a_request_message)

def do_set_request(self, a_request_message):
a_specifier = a_request_message.specifier.to_string()
try:
new_value = a_request_message.payload["values"][0]()
except Exception as err:
raise ThrowReply('service_error_bad_payload', f'Set called with invalid values: {err}')
new_value = getattr(new_value, "as_"+new_value.type())()
logger.debug(f'new_value is [{new_value}]')
if ( a_specifier ):
if not hasattr(self, a_specifier):
raise ThrowReply('service_error_invalid_specifier',
"endpoint {} has no attribute {}, unable to set".format(self.name, a_specifier))
setattr(self, a_specifier, new_value)
return a_request_message.reply()
else:
result = self.on_set(new_value)
return a_request_message.reply(payload=self.result_to_scarab_payload(result))

def do_cmd_request(self, a_request_message):
# Note: any command executed in this way must return a python data structure which is
# able to be converted to a Param object (to be returned in the reply message)
method_name = a_request_message.specifier.to_string()
try:
method_ref = getattr(self, method_name)
except AttributeError as e:
raise ThrowReply('service_error_invalid_method',
"error getting command's corresponding method: {}".format(str(e)))
the_kwargs = a_request_message.payload.to_python()
the_args = the_kwargs.pop('values', [])
try:
result = method_ref(*the_args, **the_kwargs)
except TypeError as e:
raise ThrowReply('service_error_invalid_value',
f'A TypeError occurred while calling the requested method for endpoint {self.name}: {method_name}. Values provided may be invalid.\nOriginal error: {str(e)}')
return a_request_message.reply(payload=self.result_to_scarab_payload(result))

def on_get(self):
'''
placeholder method for getting the value of an endpoint.
Implementations may override to enable OP_GET operations.
The implementation must return a value which is able to be passed to the ParamValue constructor.
Default function for handling an OP_SET request message addressed to this endpoint.

.. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic
dripline-python behavior for an endpoint receiving a set request, including using the specifier to access attributes,
and calling on_set() when there is no specifier.
As an extension author you might typically override RequestReciever.on_set(), but leave this function alone.

.. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from
the C++ base class. It intentionally just calls the version of do_set_request() in RequestHandler.

Args:
a_request_message (MsgRequest): the message receveived by this endpoint
'''
raise ThrowReply('service_error_invalid_method', "{} does not implement on_get".format(self.__class__))

def on_set(self, _value):
return RequestHandler.do_set_request(self, a_request_message)

def do_cmd_request(self, a_request_message):
'''
placeholder method for setting the value of an endpoint.
Implementations may override to enable OP_SET operations.
Any returned object must already be a scarab::Param object
Default function for handling an OP_CMD request message addressed to this endpoint.

.. note: For dripline extension developers -- This function, as defined in RequestHandler, implements the characteristic
dripline-python behavior for an endpoint receiving a cmd request, namesly using the specifier to call endpoint methods.

.. note: For core dripline developers -- This function has to be here to correctly receive trampolined calls from
the C++ base class. It intentionally just calls the version of do_cmd_request() in RequestHandler.

Args:
a_request_message (MsgRequest): the message receveived by this endpoint
'''
raise ThrowReply('service_error_invalid_method', "{} does not implement on_set".format(self.__class__))

return RequestHandler.do_cmd_request(self, a_request_message)
6 changes: 3 additions & 3 deletions dripline/core/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def scheduled_log(self):
# Various checks for log condition
if self._last_log_time is None:
logger.debug("log because no last log")
elif (datetime.datetime.utcnow() - self._last_log_time).total_seconds() > self._max_interval:
elif (datetime.datetime.now(datetime.timezone.utc) - self._last_log_time).total_seconds() > self._max_interval:
logger.debug("log because too much time")
elif this_value is False:
logger.warning(f"cannot check value change for {self.name}")
Expand All @@ -181,9 +181,9 @@ def scheduled_log(self):

def log_a_value(self, the_value):
logger.info(f"value to log for {self.name} is:\n{the_value}")
self._last_log_time = datetime.datetime.utcnow()
self._last_log_time = datetime.datetime.now(datetime.timezone.utc)
the_alert = MsgAlert.create(payload=scarab.to_param(the_value), routing_key=f'{self.log_routing_key_prefix}.{self.name}')
alert_sent = self.service.send(the_alert)
_ = self.service.send(the_alert)

def start_logging(self):
if self._log_action_id is not None:
Expand Down
93 changes: 6 additions & 87 deletions dripline/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@

import scarab

from _dripline.core import op_t, create_dripline_auth_spec, Core, DriplineConfig, Receiver, MsgRequest, MsgReply, DriplineError
from _dripline.core import create_dripline_auth_spec, Core, DriplineConfig, Receiver
from .request_sender import RequestSender

import logging
logger = logging.getLogger(__name__)

__all__.append("Interface")
class Interface(Core):
class Interface(Core, RequestSender):
'''
A class that provides user-friendly methods for dripline interactions in a Python interpreter.
Intended for use as a dripline client in scripts or interactive sessions.

See :py:class:dripline.core.RequestSender for the message-sending interface.
'''
def __init__(self, username: str | dict=None, password: str | dict=None, dripline_mesh: dict=None, timeout_s: int=10, confirm_retcodes: bool=True):
'''
Expand Down Expand Up @@ -62,93 +65,9 @@ def __init__(self, username: str | dict=None, password: str | dict=None, driplin
auth.process_spec()

Core.__init__(self, config=scarab.to_param(dripline_config), auth=auth)
RequestSender.__init__(self, sender=self)

self._confirm_retcode = confirm_retcodes
self.timeout_s = timeout_s
self._receiver = Receiver()


def _send_request(self, msgop, target, specifier=None, payload=None, timeout=None, lockout_key=None):
'''
internal helper method to standardize sending request messages
'''
a_specifier = specifier if specifier is not None else ""
a_request = MsgRequest.create(payload=scarab.to_param(payload), msg_op=msgop, routing_key=target, specifier=a_specifier)
a_request.lockout_key = lockout_key if lockout_key is not None else ""
receive_reply = self.send(a_request)
if not receive_reply.successful_send:
raise DriplineError('unable to send request')
return receive_reply

def _receive_reply(self, reply_pkg, timeout_s):
'''
internal helper method to standardize receiving reply messages
'''
sig_handler = scarab.SignalHandler()
sig_handler.add_cancelable(self._receiver)
result = self._receiver.wait_for_reply(reply_pkg, timeout_s * 1000) # receiver expects ms
sig_handler.remove_cancelable(self._receiver)
return result

def get(self, endpoint: str, specifier: str=None, lockout_key=None, timeout_s: int=0) -> MsgReply:
'''
Send a get request to an endpoint and return the reply message.

Parameters
----------
endpoint: str
Routing key to which the request should be sent.
specifier: str, optional
Specifier to add to the request, if needed.
timeout_s: int | float, optional
Maximum time to wait for a reply in seconds (default is 0)
A timeout of 0 seconds means no timeout will be used.
'''
reply_pkg = self._send_request( msgop=op_t.get, target=endpoint, specifier=specifier, lockout_key=lockout_key )
result = self._receive_reply( reply_pkg, timeout_s )
return result

def set(self, endpoint: str, value: str | int | float | bool, specifier: str=None, lockout_key=None, timeout_s: int | float=0) -> MsgReply:
'''
Send a set request to an endpoint and return the reply message.

Parameters
----------
endpoint: str
Routing key to which the request should be sent.
value: str | int | float | bool
Value to assign in the set operation
specifier: str, optional
Specifier to add to the request, if needed.
timeout_s: int | float, optional
Maximum time to wait for a reply in seconds (default is 0)
A timeout of 0 seconds means no timeout will be used.
'''
payload = {'values':[value]}
reply_pkg = self._send_request( msgop=op_t.set, target=endpoint, specifier=specifier, payload=payload, lockout_key=lockout_key )
result = self._receive_reply( reply_pkg, timeout_s )
return result

def cmd(self, endpoint: str, specifier: str, ordered_args=None, keyed_args=None, lockout_key=None, timeout_s: int | float=0) -> MsgReply:
'''
Send a cmd request to an endpoint and return the reply message.

Parameters
----------
endpoint: str
Routing key to which the request should be sent.
ordered_args: array, optional
Array of values to assign under 'values' in the payload, if any
keyed_args: dict, optional
Keyword arguments to assign to the payload, if any
specifier: str
Specifier to add to the request. For a dripline-python endpoint, this will be the method executed.
timeout_s: int | float, optional
Maximum time to wait for a reply in seconds (default is 0)
A timeout of 0 seconds means no timeout will be used.
'''
payload = {'values': [] if ordered_args is None else ordered_args}
payload.update({} if keyed_args is None else keyed_args)
reply_pkg = self._send_request( msgop=op_t.cmd, target=endpoint, specifier=specifier, lockout_key=lockout_key, payload=payload )
result = self._receive_reply( reply_pkg, timeout_s )
return result
Loading