From a9199410d5a7954a3be81aa9b5756451e3923dea Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 6 Aug 2025 09:58:21 +0200 Subject: [PATCH 1/6] inital setup for run_state --- dripline/extensions/__init__.py | 1 + dripline/extensions/run_state.py | 68 ++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 dripline/extensions/run_state.py diff --git a/dripline/extensions/__init__.py b/dripline/extensions/__init__.py index e9cb836..7656a18 100644 --- a/dripline/extensions/__init__.py +++ b/dripline/extensions/__init__.py @@ -10,3 +10,4 @@ from .add_auth_spec import * from .thermo_fisher_endpoint import * from .ethernet_thermo_fisher_service import * +from .run_state import * diff --git a/dripline/extensions/run_state.py b/dripline/extensions/run_state.py new file mode 100644 index 0000000..e8a5805 --- /dev/null +++ b/dripline/extensions/run_state.py @@ -0,0 +1,68 @@ +import threading +import os +import json +from dripline.core import Service, ThrowReply, Entity + +import logging +logger = logging.getLogger(__name__) + +__all__ = [] + +__all__.append('StateGetEntity') +class StateGetEntity(Entity): + def __init__(self, key, **kwargs): + self.key = key + Entity.__init__(self, **kwargs) + + def on_get(self): + return self.service.get_state()[key] + + def on_set(self, value): + raise ThrowReply("on_set_error", f"on_set not available for {self.name}") + + +__all__.append('RunStateService') +class RunStateService(Service): + ''' + A service providing information about the current run status. + ''' + def __init__(self, + state_file=None, + **kwargs + ): + ''' + Args: + state_file (str): File that is used to store the current run state + ''' + Service.__init__(self, **kwargs) + + if state_file is None or not isinstance(state_file, str): + raise ThrowReply('service_error_invalid_value', f"Invalid state file: <{state_file}>! Expect string") + + self.alock = threading.Lock() + self.state_file + + def get_state(self): + ''' + Read the state of the run from a file. + ''' + logger.info(f"Ethernet socket {self.socket_info} established") + self.alock.acquire() + try: + if os.path.exists(self.state_file): + with open(self.state_file, "r") as open_file: + state = json.load(open_file) + else: + state = {"run_number": 1, + "run_comment": "Initial run", + "run_active": False} + except Exception as err: + logger.critical("") + finally: + self.alock.release() + return state + + def save_state(self): + with open(self.state_file, "w") as open_file: + json.dump(state, open_file) + return From 35430642b19e92e6eca6a391e3b2682184220151 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 6 Aug 2025 13:23:11 +0200 Subject: [PATCH 2/6] updating to current dripline version --- Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 27455bb..1ff0f10 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,6 @@ ARG img_user=ghcr.io/driplineorg ARG img_repo=dripline-python -#ARG img_tag=develop-dev -ARG img_tag=receiver-test +ARG img_tag=develop-dev FROM ${img_user}/${img_repo}:${img_tag} From 0cca0681cc27e29185bdff08c7b4a03ccc1821ac Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 6 Aug 2025 13:25:25 +0200 Subject: [PATCH 3/6] adding start run and stop run, adding logging in database, fixing issues --- dripline/extensions/run_state.py | 41 +++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/dripline/extensions/run_state.py b/dripline/extensions/run_state.py index e8a5805..5699c1f 100644 --- a/dripline/extensions/run_state.py +++ b/dripline/extensions/run_state.py @@ -1,9 +1,9 @@ import threading import os import json -from dripline.core import Service, ThrowReply, Entity - +from dripline.core import Service, ThrowReply, Entity, MsgAlert import logging +import scarab logger = logging.getLogger(__name__) __all__ = [] @@ -15,7 +15,7 @@ def __init__(self, key, **kwargs): Entity.__init__(self, **kwargs) def on_get(self): - return self.service.get_state()[key] + return self.service.get_state()[self.key] def on_set(self, value): raise ThrowReply("on_set_error", f"on_set not available for {self.name}") @@ -40,13 +40,13 @@ def __init__(self, raise ThrowReply('service_error_invalid_value', f"Invalid state file: <{state_file}>! Expect string") self.alock = threading.Lock() - self.state_file + self.state_file = state_file def get_state(self): ''' Read the state of the run from a file. ''' - logger.info(f"Ethernet socket {self.socket_info} established") + logger.info(f"reading state") self.alock.acquire() try: if os.path.exists(self.state_file): @@ -62,7 +62,36 @@ def get_state(self): self.alock.release() return state - def save_state(self): + def save_state(self, state): with open(self.state_file, "w") as open_file: json.dump(state, open_file) return + + def stop_run(self): + state = self.get_state() + if not state["run_active"]: + logger.info("There is no run to stop") + else: + state["run_active"] = False + self.save_state(state) + logger.info(f'Stopped run {state["run_number"]}') + return state["run_number"] + + def start_run(self, comment): + logger.info(f"Starting run with comment {comment}") + state = self.get_state() + if state["run_active"]: + self.stop_run() + + state["run_number"] = state["run_number"]+1 + state["run_comment"] = comment + state["run_active"] = True + self.save_state(state) + the_alert = MsgAlert.create(payload=scarab.to_param(state["run_number"]), routing_key=f'sensor_value.run_number') + alert_sent = self.send(the_alert) + the_alert = MsgAlert.create(payload=scarab.to_param(state["run_comment"]), routing_key=f'sensor_value.run_comment') + alert_sent = self.send(the_alert) + the_alert = MsgAlert.create(payload=scarab.to_param(state["run_active"]), routing_key=f'sensor_value.run_active') + alert_sent = self.send(the_alert) + logger.info(f'Started run {state["run_number"]} with comment: {state["run_comment"]}') + return state["run_number"] From 20f5b7adfccebb01a8f6c4c515c172289da8c35e Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 6 Aug 2025 13:25:47 +0200 Subject: [PATCH 4/6] example script for run_state --- examples/run_state.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 examples/run_state.yaml diff --git a/examples/run_state.yaml b/examples/run_state.yaml new file mode 100644 index 0000000..0fb4ff8 --- /dev/null +++ b/examples/run_state.yaml @@ -0,0 +1,16 @@ +name: run_state +module: RunStateService +state_file: ./state.json +dripline_mesh: + broker: rabbit-broker + broker_port: 5672 +endpoints: + - name: run_number + module: StateGetEntity + key: run_number + - name: run_comment + module: StateGetEntity + key: run_comment + - name: run_is_active + module: StateGetEntity + key: run_active From 12ff6a02ffc1a1a521caebe40f3336b242318185 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Wed, 6 Aug 2025 13:33:17 +0200 Subject: [PATCH 5/6] adding doc strings --- dripline/extensions/run_state.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/dripline/extensions/run_state.py b/dripline/extensions/run_state.py index 5699c1f..9bb1331 100644 --- a/dripline/extensions/run_state.py +++ b/dripline/extensions/run_state.py @@ -10,7 +10,16 @@ __all__.append('StateGetEntity') class StateGetEntity(Entity): + """ + Simple Entity to get the state information of the RunStateService. + While the information are stored in RunStateService itself, this entity can excess the information from the information dict by `key`. + """ + def __init__(self, key, **kwargs): + """ + Args: + * key (str): key of the information from the state dictionary + """ self.key = key Entity.__init__(self, **kwargs) @@ -24,7 +33,8 @@ def on_set(self, value): __all__.append('RunStateService') class RunStateService(Service): ''' - A service providing information about the current run status. + A service providing information about the current run status. Information are stored in a state dict. + Two functions `start_run` and `stop_run` are available. start_run takes a run comment (str) as argument which should describe the current run. ''' def __init__(self, state_file=None, @@ -63,11 +73,17 @@ def get_state(self): return state def save_state(self, state): + ''' + Save the state of the run to a file. + ''' with open(self.state_file, "w") as open_file: json.dump(state, open_file) return def stop_run(self): + ''' + Sets the state of the run to "inactive" and returns the run number of the stopped run. + ''' state = self.get_state() if not state["run_active"]: logger.info("There is no run to stop") @@ -78,6 +94,10 @@ def stop_run(self): return state["run_number"] def start_run(self, comment): + ''' + Starts a new run by increasing the run number by 1, setting a new run comment and changing the state to "active". + If a run is still ongoing, it will be stopped first. + ''' logger.info(f"Starting run with comment {comment}") state = self.get_state() if state["run_active"]: From 66d133b6bd095bac79eb225404fde266939b12b1 Mon Sep 17 00:00:00 2001 From: Rene Reimann Date: Mon, 6 Oct 2025 15:16:00 +0200 Subject: [PATCH 6/6] update dripline to recent version --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 1ff0f10..c4a291f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ ARG img_user=ghcr.io/driplineorg ARG img_repo=dripline-python -ARG img_tag=develop-dev +ARG img_tag=v5.1.0 FROM ${img_user}/${img_repo}:${img_tag}