From 20ff5df6427e9ccb7003befcefb9cd8b4a047261 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Mon, 1 Dec 2025 16:46:48 +0100 Subject: [PATCH 1/3] feat: add beamline states --- bec_lib/bec_lib/beamline_states.py | 307 +++++++++ bec_lib/bec_lib/client.py | 3 + bec_lib/bec_lib/endpoints.py | 34 + bec_lib/bec_lib/messages.py | 48 ++ bec_lib/tests/test_beamline_conditions.py | 591 ++++++++++++++++++ .../scan_server/beamline_condition_manager.py | 74 +++ .../bec_server/scan_server/scan_server.py | 6 + 7 files changed, 1063 insertions(+) create mode 100644 bec_lib/bec_lib/beamline_states.py create mode 100644 bec_lib/tests/test_beamline_conditions.py create mode 100644 bec_server/bec_server/scan_server/beamline_condition_manager.py diff --git a/bec_lib/bec_lib/beamline_states.py b/bec_lib/bec_lib/beamline_states.py new file mode 100644 index 000000000..2917d6f2f --- /dev/null +++ b/bec_lib/bec_lib/beamline_states.py @@ -0,0 +1,307 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +from rich.console import Console +from rich.table import Table + +from bec_lib import messages +from bec_lib.device import DeviceBase +from bec_lib.endpoints import MessageEndpoints + +if TYPE_CHECKING: + from bec_lib.client import BECClient + from bec_lib.redis_connector import MessageObject, RedisConnector + + +class BeamlineConditionConfig: + """Manager for beamline conditions.""" + + def __init__(self, client: BECClient) -> None: + self._client = client + self._connector = client.connector + self._conditions: list[messages.BeamlineConditionUpdateEntry] = [] + self._connector.register( + MessageEndpoints.available_beamline_conditions(), + cb=self._on_condition_update, + parent=self, + from_start=True, + ) + + @staticmethod + def _on_condition_update(msg_dict: dict, *, parent: BeamlineConditionConfig, **_kwargs) -> None: + msg: messages.BeamlineConditionUpdate = msg_dict["data"] # type: ignore ; we know it's a BeamlineConditionUpdateMessage + parent._conditions = msg.conditions + + def add(self, condition: BeamlineCondition) -> None: + """ + Add a new beamline condition to the manager. + Args: + condition (BeamlineCondition): The beamline condition to add. + """ + + if any(cond.name == condition.name for cond in self._conditions): + return # condition already exists + + info: messages.BeamlineConditionUpdateEntry = messages.BeamlineConditionUpdateEntry( + name=condition.name, + title=condition.title, + condition_type=condition.__class__.__name__, + parameters=condition.parameters(), + ) + cls = condition.__class__ + + try: + condi = cls(name=condition.name, redis_connector=self._connector) + condi.configure(**condition.parameters()) + except Exception as e: + raise RuntimeError(f"Failed to add condition {condition.name}: {e}") from e + + if isinstance(condition, DeviceBeamlineCondition): + self._verify_signal_exists(condition) + + self._conditions.append(info) + msg = messages.AvailableBeamlineConditionsMessage(conditions=self._conditions) + self._connector.xadd( + MessageEndpoints.available_beamline_conditions(), {"data": msg}, max_size=1 + ) + + def _verify_signal_exists(self, condition: DeviceBeamlineCondition) -> None: + """ + Verify that the device and signal exist in the device manager. + + Args: + condition (DeviceBeamlineCondition): The condition to verify. + + Raises: RuntimeError if the device or signal does not exist. + """ + device = condition.parameters().get("device") + signal = condition.parameters().get("signal") + if isinstance(device, DeviceBase): + device = device.name + + if not self._client.device_manager.devices.get(device): + raise RuntimeError( + f"Device {device} not found in device manager. Cannot add condition {condition.name}." + ) + if signal is not None: + if signal not in self._client.device_manager.devices[device].read(): + raise RuntimeError( + f"Signal {signal} not found in device {device}. Cannot add condition {condition.name}." + ) + else: + hinted_signals = self._client.device_manager.devices[device]._hints + if hinted_signals: + signal = hinted_signals[0] + else: + signal = device + condition.parameters().update({"device": device, "signal": signal}) + + def remove(self, condition_name: str) -> None: + """ + Remove a beamline condition by name. + + Args: + condition_name (str): The name of the condition to remove. + """ + if not any(cond.name == condition_name for cond in self._conditions): + return # condition does not exist + self._conditions = [cond for cond in self._conditions if cond.name != condition_name] + msg = messages.AvailableBeamlineConditionsMessage(conditions=self._conditions) + self._connector.xadd( + MessageEndpoints.available_beamline_conditions(), {"data": msg}, max_size=1 + ) + + def show_all(self): + """ + Pretty print all beamline conditions using rich. + """ + console = Console() + table = Table(title="Beamline Conditions") + table.add_column("Name", style="cyan", no_wrap=True) + table.add_column("Type", style="magenta") + table.add_column("Parameters", style="green") + + for cond in self._conditions: + params = cond.parameters if cond.parameters else "-" + table.add_row(str(cond.name), str(cond.condition_type), str(params)) + + console.print(table) + + +class BeamlineCondition(ABC): + """Abstract base class for beamline conditions.""" + + def __init__( + self, name: str, redis_connector: RedisConnector | None = None, title: str | None = None + ) -> None: + self.name = name + self.connector = redis_connector + self.title = title if title is not None else name + self._configured = False + self._last_state: messages.BeamlineConditionMessage | None = None + + def configure(self, **kwargs) -> None: + """Configure the condition with given parameters.""" + self._configured = True + + def parameters(self) -> dict: + """Return the configuration parameters of the condition.""" + return {} + + @abstractmethod + def evaluate(self, *args, **kwargs) -> messages.BeamlineConditionMessage | None: + """Evaluate the condition and return its state.""" + + def start(self) -> None: + """Start monitoring the condition if needed.""" + + def stop(self) -> None: + """Stop monitoring the condition if needed.""" + + +class DeviceBeamlineCondition(BeamlineCondition): + """A beamline condition that depends on a device reading.""" + + def configure(self, device: str | DeviceBase, signal: str | None = None, **kwargs) -> None: + self.device = device if isinstance(device, str) else device.name + self.signal = signal + super().configure(**kwargs) + + def parameters(self) -> dict: + params = super().parameters() + params.update({"device": self.device, "signal": self.signal}) + return params + + def start(self) -> None: + if not self._configured: + raise RuntimeError("Condition must be configured before starting.") + if self.connector is None: + raise RuntimeError("Redis connector is not set.") + self.connector.register( + MessageEndpoints.device_readback(self.device), cb=self._update_device_state, parent=self + ) + + def stop(self) -> None: + if not self._configured: + return + if self.connector is None: + return + self.connector.unregister( + MessageEndpoints.device_readback(self.device), cb=self._update_device_state + ) + + @staticmethod + def _update_device_state(msg_obj: MessageObject, parent: DeviceBeamlineCondition) -> None: + + # Since this is called from the Redis connector, we + assert parent.connector is not None + + msg: messages.DeviceMessage = msg_obj.value # type: ignore ; we know it's a DeviceMessage + out = parent.evaluate(msg) + if out is not None and out != parent._last_state: + parent._last_state = out + parent.connector.xadd( + MessageEndpoints.beamline_condition(parent.name), {"data": out}, max_size=1 + ) + + +class ShutterCondition(DeviceBeamlineCondition): + """ + A condition that checks if the shutter is open. + + Example: + shutter_condition = ShutterCondition(name="shutter_open") + shutter_condition.configure(device="shutter1") + bec.beamline_conditions.add(shutter_condition) + """ + + def evaluate(self, msg: messages.DeviceMessage, **kwargs) -> messages.BeamlineConditionMessage: + val = msg.signals.get(self.signal, {}).get("value", "").lower() + if val == "open": + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Shutter is open." + ) + return messages.BeamlineConditionMessage( + name=self.name, status="alarm", message="Shutter is closed." + ) + + +class DeviceWithinLimitsCondition(DeviceBeamlineCondition): + """ + A condition that checks if a positioner is within limits. + + Example: + device_condition = DeviceWithinLimitsCondition(name="sample_x_within_limits") + device_condition.configure(device="sample_x", signal="sample_x_signal_name", min_limit=0.0, max_limit=10.0) + bec.beamline_conditions.add(device_condition) + + """ + + def configure( + self, + device: str, + min_limit: float, + max_limit: float, + tolerance: float = 0.1, + signal: str | None = None, + **kwargs, + ) -> None: + """ + Configure the positioner condition. + + Args: + device (str): The name of the positioner device. + min_limit (float): The minimum limit for the positioner. + max_limit (float): The maximum limit for the positioner. + tolerance (float): The tolerance for warning conditions (default is 0.1). When the positioner is within + 10% of the limits, a warning condition will be issued. + signal (str, optional): The name of the signal to monitor. If not provided, defaults to the device name. + """ + self.min_limit = min_limit + self.max_limit = max_limit + self.tolerance = tolerance + super().configure(device=device, signal=signal, **kwargs) + + def parameters(self) -> dict: + params = super().parameters() + params.update( + { + "device": self.device, + "min_limit": self.min_limit, + "max_limit": self.max_limit, + "tolerance": self.tolerance, + "signal": self.signal, + } + ) + return params + + def evaluate(self, msg: messages.DeviceMessage, **kwargs) -> messages.BeamlineConditionMessage: + """ + Evaluate if the positioner is within the defined limits. If it is outside the limits, + return an alarm condition. Otherwise, return a normal condition. If it is within 10% of the limits, + return a warning condition. + """ + + val = msg.signals.get(self.device, {}).get("value", None) + if val is None: + return messages.BeamlineConditionMessage( + name=self.name, status="alarm", message=f"Positioner {self.device} value not found." + ) + + if val < self.min_limit or val > self.max_limit: + + return messages.BeamlineConditionMessage( + name=self.name, status="alarm", message=f"Positioner {self.device} out of limits" + ) + if val < self.min_limit + self.tolerance * ( + self.max_limit - self.min_limit + ) or val > self.max_limit - self.tolerance * (self.max_limit - self.min_limit): + return messages.BeamlineConditionMessage( + name=self.name, status="warning", message=f"Positioner {self.device} near limits" + ) + + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message=f"Positioner {self.device} within limits" + ) diff --git a/bec_lib/bec_lib/client.py b/bec_lib/bec_lib/client.py index ef8ae52eb..4c2b17ced 100644 --- a/bec_lib/bec_lib/client.py +++ b/bec_lib/bec_lib/client.py @@ -19,6 +19,7 @@ from rich.table import Table from bec_lib.alarm_handler import AlarmHandler, Alarms +from bec_lib.beamline_states import BeamlineConditionConfig from bec_lib.bec_service import BECService from bec_lib.bl_checks import BeamlineChecks from bec_lib.callback_handler import CallbackHandler, EventType @@ -150,6 +151,7 @@ def __init__( self._initialized = True self._username = "" self._system_user = "" + self.beamline_conditions = None def __new__(cls, *args, forced=False, **kwargs): if forced or BECClient._client is None: @@ -227,6 +229,7 @@ def _start_services(self): self.bl_checks.start() self.device_monitor = DeviceMonitorPlugin(self.connector) self._update_username() + self.beamline_conditions = BeamlineConditionConfig(client=self) def alarms(self, severity=Alarms.WARNING): """get the next alarm with at least the specified severity""" diff --git a/bec_lib/bec_lib/endpoints.py b/bec_lib/bec_lib/endpoints.py index 72f073085..d17e39d1d 100644 --- a/bec_lib/bec_lib/endpoints.py +++ b/bec_lib/bec_lib/endpoints.py @@ -1704,6 +1704,40 @@ def macro_update(): endpoint=endpoint, message_type=messages.MacroUpdateMessage, message_op=MessageOp.SEND ) + @staticmethod + def beamline_condition(condition_name: str): + """ + Endpoint for beamline condition. This endpoint is used to publish the beamline condition + state using a messages.BeamlineConditionMessage message. + + Args: + condition_name (str): Condition name. + Returns: + EndpointInfo: Endpoint for beamline condition. + """ + endpoint = f"{EndpointType.INFO.value}/beamline_condition/{condition_name}" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.BeamlineConditionMessage, + message_op=MessageOp.STREAM, + ) + + @staticmethod + def available_beamline_conditions(): + """ + Endpoint for updating the available beamline conditions. This endpoint is used to + publish beamline condition updates using a messages.AvailableBeamlineConditionsMessage message. + + Returns: + EndpointInfo: Endpoint for beamline condition updates. + """ + endpoint = f"{EndpointType.INFO.value}/available_beamline_conditions" + return EndpointInfo( + endpoint=endpoint, + message_type=messages.AvailableBeamlineConditionsMessage, + message_op=MessageOp.STREAM, + ) + @staticmethod def atlas_websocket_state(deployment_name: str, host_id: str): """ diff --git a/bec_lib/bec_lib/messages.py b/bec_lib/bec_lib/messages.py index 43f8907e6..95bb46d6e 100644 --- a/bec_lib/bec_lib/messages.py +++ b/bec_lib/bec_lib/messages.py @@ -1443,3 +1443,51 @@ def check_macro(cls, values): if values.update_type == "add" and not values.file_path: raise ValueError("file_path must be provided for add actions") return values + + +class BeamlineConditionMessage(BECMessage): + """ + Message for beamline condition updates + + Args: + name (str): Name of the beamline condition + status (Literal["normal", "warning", "alarm"]): Status of the beamline condition + message (str): Description of the beamline condition + """ + + msg_type: ClassVar[str] = "beamline_condition_message" + name: str + status: Literal["normal", "warning", "alarm"] + message: str + + +class BeamlineConditionUpdateEntry(BaseModel): + """ + Entry for beamline condition update + + Args: + name (str): Name of the beamline condition + title (str): Title of the beamline condition + condition_type (str): Type of the beamline condition + parameters (dict, optional): Additional parameters for the condition + """ + + name: str + title: str + condition_type: str + parameters: dict = Field(default_factory=dict) + + +class AvailableBeamlineConditionsMessage(BECMessage): + """ + Message for updating beamline conditions + + Args: + conditions (list[BeamlineConditionUpdateEntry]): List of beamline condition update entries + name (str): Name of the beamline condition group + condition_type (str): Type of the beamline condition group + parameters (dict, optional): Additional parameters for the condition group + """ + + msg_type: ClassVar[str] = "beamline_condition_update_message" + conditions: list[BeamlineConditionUpdateEntry] diff --git a/bec_lib/tests/test_beamline_conditions.py b/bec_lib/tests/test_beamline_conditions.py new file mode 100644 index 000000000..775cd7a0a --- /dev/null +++ b/bec_lib/tests/test_beamline_conditions.py @@ -0,0 +1,591 @@ +from unittest import mock + +import pytest + +from bec_lib import messages +from bec_lib.beamline_states import ( + BeamlineCondition, + BeamlineConditionConfig, + DeviceBeamlineCondition, + DeviceWithinLimitsCondition, + ShutterCondition, +) +from bec_lib.endpoints import MessageEndpoints +from bec_lib.redis_connector import MessageObject + + +@pytest.fixture +def beamline_config(connected_connector): + client = mock.MagicMock() + client.connector = connected_connector + client.device_manager = mock.MagicMock() + config = BeamlineConditionConfig(client) + yield config + + +# ============================================================================ +# BeamlineCondition tests +# ============================================================================ + + +class TestBeamlineCondition: + """Tests for the abstract BeamlineCondition base class.""" + + def test_beamline_condition_initialization(self): + """Test basic initialization of a BeamlineCondition.""" + + class ConcreteCondition(BeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteCondition(name="test_condition", title="Test Condition") + assert condition.name == "test_condition" + assert condition.title == "Test Condition" + assert condition.connector is None + assert condition._configured is False + assert condition._last_state is None + + def test_beamline_condition_default_title(self): + """Test that title defaults to name if not provided.""" + + class ConcreteCondition(BeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteCondition(name="test_condition") + assert condition.title == "test_condition" + + def test_beamline_condition_configure(self): + """Test that configure marks the condition as configured.""" + + class ConcreteCondition(BeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteCondition(name="test_condition") + assert condition._configured is False + condition.configure() + assert condition._configured is True + + def test_beamline_condition_parameters(self): + """Test that parameters returns an empty dict by default.""" + + class ConcreteCondition(BeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteCondition(name="test_condition") + assert condition.parameters() == {} + + def test_beamline_condition_with_connector(self, connected_connector): + """Test BeamlineCondition initialization with a connector.""" + + class ConcreteCondition(BeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteCondition(name="test_condition", redis_connector=connected_connector) + assert condition.connector == connected_connector + + +# ============================================================================ +# DeviceBeamlineCondition tests +# ============================================================================ + + +class TestDeviceBeamlineCondition: + """Tests for DeviceBeamlineCondition.""" + + def test_device_condition_configure(self, connected_connector): + """Test DeviceBeamlineCondition configuration.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + condition.configure(device="samx", signal="samx_value") + assert condition.device == "samx" + assert condition.signal == "samx_value" + assert condition._configured is True + + def test_device_condition_configure_default_signal(self, connected_connector): + """Test that signal defaults to device name if not provided.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + condition.configure(device="samx", signal="samx") + assert condition.device == "samx" + assert condition.signal == "samx" + + def test_device_condition_parameters(self, connected_connector): + """Test that parameters includes device and signal.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + condition.configure(device="samx", signal="samx_value") + params = condition.parameters() + assert params["device"] == "samx" + assert params["signal"] == "samx_value" + + def test_device_condition_start_not_configured(self, connected_connector): + """Test that start raises RuntimeError if condition is not configured.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + with pytest.raises(RuntimeError, match="Condition must be configured before starting"): + condition.start() + + def test_device_condition_start_no_connector(self): + """Test that start raises RuntimeError if connector is not set.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test") + condition.configure(device="samx") + with pytest.raises(RuntimeError, match="Redis connector is not set"): + condition.start() + + def test_device_condition_start_registers_callback(self, connected_connector): + """Test that start registers the callback with the connector.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + condition.configure(device="samx") + + with mock.patch.object(connected_connector, "register") as mock_register: + condition.start() + mock_register.assert_called_once() + call_args = mock_register.call_args + assert call_args[0][0] == MessageEndpoints.device_readback("samx") + + def test_device_condition_stop(self, connected_connector): + """Test that stop unregisters the callback.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + condition.configure(device="samx") + + with mock.patch.object(connected_connector, "unregister") as mock_unregister: + condition.stop() + mock_unregister.assert_called_once() + + def test_device_condition_stop_not_configured(self, connected_connector): + """Test that stop doesn't raise an error if not configured.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + # Should not raise an error + condition.stop() + + def test_device_condition_stop_no_connector(self): + """Test that stop doesn't raise an error if connector is not set.""" + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return messages.BeamlineConditionMessage( + name=self.name, status="normal", message="Test" + ) + + condition = ConcreteDeviceCondition(name="device_test") + condition.configure(device="samx") + # Should not raise an error + condition.stop() + + def test_device_condition_update_device_state(self, connected_connector): + """Test that _update_device_state calls evaluate and updates _last_state.""" + + msg = messages.BeamlineConditionMessage(name="device_test", status="normal", message="Test") + + class ConcreteDeviceCondition(DeviceBeamlineCondition): + def evaluate(self, *args, **kwargs): + return msg + + condition = ConcreteDeviceCondition(name="device_test", redis_connector=connected_connector) + condition.configure(device="samx") + + msg_obj = MessageObject(value=msg, topic="test_topic") + condition._update_device_state(msg_obj, parent=condition) + assert condition._last_state == msg + out = condition.connector.xread( + MessageEndpoints.beamline_condition("device_test"), from_start=True + ) + assert out is not None + assert out[0]["data"] == msg + + +# ============================================================================ +# ShutterCondition tests +# ============================================================================ + + +class TestShutterCondition: + """Tests for ShutterCondition.""" + + def test_shutter_open(self, connected_connector): + """Test evaluation when shutter is open.""" + condition = ShutterCondition(name="shutter_open", redis_connector=connected_connector) + condition.configure(device="shutter1", signal="shutter1") + + msg = messages.DeviceMessage( + signals={"shutter1": {"value": "open", "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.name == "shutter_open" + assert result.status == "normal" + assert result.message == "Shutter is open." + + def test_shutter_open_uppercase(self, connected_connector): + """Test evaluation when shutter value is uppercase and gets lowercased.""" + condition = ShutterCondition(name="shutter_open", redis_connector=connected_connector) + condition.configure(device="shutter1", signal="shutter1") + + msg = messages.DeviceMessage( + signals={"shutter1": {"value": "OPEN", "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.status == "normal" + assert result.message == "Shutter is open." + + def test_shutter_closed(self, connected_connector): + """Test evaluation when shutter is closed.""" + condition = ShutterCondition(name="shutter_open", redis_connector=connected_connector) + condition.configure(device="shutter1") + + msg = messages.DeviceMessage( + signals={"shutter1": {"value": "closed", "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.name == "shutter_open" + assert result.status == "alarm" + assert result.message == "Shutter is closed." + + def test_shutter_missing_value(self, connected_connector): + """Test evaluation when value is missing.""" + condition = ShutterCondition(name="shutter_open", redis_connector=connected_connector) + condition.configure(device="shutter1") + + msg = messages.DeviceMessage( + signals={"shutter1": {"timestamp": 1234567890.0}}, metadata={"stream": "primary"} + ) + + result = condition.evaluate(msg) + assert result.status == "alarm" + assert result.message == "Shutter is closed." + + +# ============================================================================ +# DeviceWithinLimitsCondition tests +# ============================================================================ + + +class TestDeviceWithinLimitsCondition: + """Tests for DeviceWithinLimitsCondition.""" + + def test_within_limits_configure(self, connected_connector): + """Test configuration of DeviceWithinLimitsCondition.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0) + + assert condition.device == "sample_x" + assert condition.min_limit == 0.0 + assert condition.max_limit == 10.0 + assert condition.tolerance == 0.1 + + def test_within_limits_configure_custom_tolerance(self, connected_connector): + """Test configuration with custom tolerance.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0, tolerance=0.2) + + assert condition.tolerance == 0.2 + + def test_within_limits_value_inside(self, connected_connector): + """Test evaluation when value is within limits.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0) + + msg = messages.DeviceMessage( + signals={"sample_x": {"value": 5.0, "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.status == "normal" + assert result.message == "Positioner sample_x within limits" + + def test_within_limits_value_outside_low(self, connected_connector): + """Test evaluation when value is below minimum limit.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0) + + msg = messages.DeviceMessage( + signals={"sample_x": {"value": -1.0, "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.status == "alarm" + assert result.message == "Positioner sample_x out of limits" + + def test_within_limits_value_outside_high(self, connected_connector): + """Test evaluation when value is above maximum limit.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0) + + msg = messages.DeviceMessage( + signals={"sample_x": {"value": 11.0, "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.status == "alarm" + assert result.message == "Positioner sample_x out of limits" + + def test_within_limits_value_near_min(self, connected_connector): + """Test evaluation when value is near minimum limit (within tolerance).""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0, tolerance=0.1) + + # 10% of (10 - 0) = 1.0, so near min is < 1.0 + msg = messages.DeviceMessage( + signals={"sample_x": {"value": 0.5, "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.status == "warning" + assert result.message == "Positioner sample_x near limits" + + def test_within_limits_value_near_max(self, connected_connector): + """Test evaluation when value is near maximum limit (within tolerance).""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0, tolerance=0.1) + + # 10% of (10 - 0) = 1.0, so near max is > 9.0 + msg = messages.DeviceMessage( + signals={"sample_x": {"value": 9.5, "timestamp": 1234567890.0}}, + metadata={"stream": "primary"}, + ) + + result = condition.evaluate(msg) + assert result.status == "warning" + assert result.message == "Positioner sample_x near limits" + + def test_within_limits_missing_value(self, connected_connector): + """Test evaluation when value is missing.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0) + + msg = messages.DeviceMessage( + signals={"sample_x": {"timestamp": 1234567890.0}}, metadata={"stream": "primary"} + ) + + result = condition.evaluate(msg) + assert result.status == "alarm" + assert "value not found" in result.message + + def test_within_limits_parameters(self, connected_connector): + """Test that parameters includes all configuration.""" + condition = DeviceWithinLimitsCondition( + name="sample_x_limits", redis_connector=connected_connector + ) + condition.configure(device="sample_x", min_limit=0.0, max_limit=10.0, signal="x_readback") + + params = condition.parameters() + assert params["device"] == "sample_x" + assert params["min_limit"] == 0.0 + assert params["max_limit"] == 10.0 + assert params["tolerance"] == 0.1 + assert params["signal"] == "x_readback" + + +# ============================================================================ +# BeamlineConditionConfig tests +# ============================================================================ + + +class TestBeamlineConditionConfig: + """Tests for BeamlineConditionConfig manager.""" + + def test_add_condition(self, beamline_config): + """Test adding a condition.""" + condition = ShutterCondition(name="shutter_open", title="Shutter Open") + condition.configure(device="shutter1") + + # Setup device manager mock - the signal should match the device name when no signal is provided + beamline_config._client.device_manager.devices = {"shutter1": mock.MagicMock()} + beamline_config._client.device_manager.devices["shutter1"].read.return_value = { + "shutter1": {"value": "open"} + } + + beamline_config.add(condition) + # Check that the condition was added + assert any(c.name == "shutter_open" for c in beamline_config._conditions) + + def test_add_condition_already_exists(self, beamline_config): + """Test that adding a duplicate condition is ignored.""" + condition = ShutterCondition(name="shutter_open", title="Shutter Open") + condition.configure(device="shutter1") + + # Setup device manager mock + beamline_config._client.device_manager.devices = {"shutter1": mock.MagicMock()} + beamline_config._client.device_manager.devices["shutter1"].read.return_value = { + "shutter1": {"value": "open"} + } + + # Add the condition once + beamline_config.add(condition) + initial_count = len(beamline_config._conditions) + + # Add the same condition again + beamline_config.add(condition) + # Count should not increase + assert len(beamline_config._conditions) == initial_count + + def test_add_condition_device_not_found(self, beamline_config): + """Test that adding a condition with invalid device raises RuntimeError.""" + condition = ShutterCondition(name="shutter_open", title="Shutter Open") + condition.configure(device="nonexistent_shutter") + + beamline_config._client.device_manager.devices = {} + + with pytest.raises(RuntimeError, match="Device nonexistent_shutter not found"): + beamline_config.add(condition) + + def test_add_condition_signal_not_found(self, beamline_config): + """Test that adding a condition with invalid signal raises RuntimeError.""" + condition = ShutterCondition(name="shutter_open", title="Shutter Open") + + # Setup device manager mock with device but without the signal + mock_device = mock.MagicMock() + mock_device.read.return_value = {"other_signal": {"value": "open"}} + beamline_config._client.device_manager.devices = {"shutter1": mock_device} + + condition.configure(device="shutter1", signal="value") + + with pytest.raises(RuntimeError, match="Signal value not found in device shutter1"): + beamline_config.add(condition) + + def test_remove_condition(self, beamline_config): + """Test removing a condition.""" + condition = ShutterCondition(name="shutter_open", title="Shutter Open") + condition.configure(device="shutter1") + + # Setup device manager mock + beamline_config._client.device_manager.devices = {"shutter1": mock.MagicMock()} + beamline_config._client.device_manager.devices["shutter1"].read.return_value = { + "shutter1": {"value": "open"} + } + + # Add and then remove + beamline_config.add(condition) + assert any(c.name == "shutter_open" for c in beamline_config._conditions) + + beamline_config.remove("shutter_open") + assert not any(c.name == "shutter_open" for c in beamline_config._conditions) + + def test_remove_nonexistent_condition(self, beamline_config): + """Test removing a condition that doesn't exist.""" + # Should not raise an error + beamline_config.remove("nonexistent") + assert len(beamline_config._conditions) == 0 + + def test_show_all(self, beamline_config, capsys): + """Test that show_all displays conditions in a table.""" + condition = ShutterCondition(name="shutter_open", title="Shutter Open") + condition.configure(device="shutter1") + + # Setup device manager mock + beamline_config._client.device_manager.devices = {"shutter1": mock.MagicMock()} + beamline_config._client.device_manager.devices["shutter1"].read.return_value = {"shutter1"} + + beamline_config.add(condition) + beamline_config.show_all() + + # The output should be printed (checked via capsys) + captured = capsys.readouterr() + # Check that the condition name appears in the output + assert "shutter_open" in captured.out or "shutter_open" in captured.err + + def test_on_condition_update(self, beamline_config): + """Test that _on_condition_update updates the conditions list.""" + update_entry = messages.BeamlineConditionUpdateEntry( + name="test_condition", + title="Test Condition", + condition_type="ShutterCondition", + parameters={}, + ) + msg = messages.AvailableBeamlineConditionsMessage(conditions=[update_entry]) + + BeamlineConditionConfig._on_condition_update({"data": msg}, parent=beamline_config) + + assert len(beamline_config._conditions) == 1 + assert beamline_config._conditions[0].name == "test_condition" diff --git a/bec_server/bec_server/scan_server/beamline_condition_manager.py b/bec_server/bec_server/scan_server/beamline_condition_manager.py new file mode 100644 index 000000000..825fd5e38 --- /dev/null +++ b/bec_server/bec_server/scan_server/beamline_condition_manager.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from bec_lib import beamline_states as bl_states +from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints +from bec_lib.redis_connector import RedisConnector + + +class BeamlineConditionManager: + """Manager for beamline conditions.""" + + def __init__(self, connector: RedisConnector) -> None: + self.connector = connector + self.conditions: list[bl_states.BeamlineCondition] = [] + self.connector.register( + MessageEndpoints.available_beamline_conditions(), + cb=self._handle_condition_update, + parent=self, + from_start=True, + ) + + @staticmethod + def _handle_condition_update( + msg_dict: dict, *, parent: BeamlineConditionManager, **_kwargs + ) -> None: + + msg: messages.AvailableBeamlineConditionsMessage = msg_dict["data"] # type: ignore ; we know it's a AvailableBeamlineConditionsMessage + parent.update_conditions(msg) + + def update_conditions(self, msg: messages.AvailableBeamlineConditionsMessage) -> None: + """ + Update the beamline conditions based on the received update message. + + Args: + msg (messages.AvailableBeamlineConditionsMessage): The update message containing condition updates. + """ + + # get the conditions that we need to remove + conditions_in_msg = {cond.name for cond in msg.conditions} + current_conditions = {cond.name for cond in self.conditions} + conditions_to_remove = current_conditions - conditions_in_msg + # remove conditions that are no longer needed + for cond_name in conditions_to_remove: + cond = next((c for c in self.conditions if c.name == cond_name), None) + if cond: + cond.stop() + self.conditions.remove(cond) + # filter out existing conditions from the message + new_conditions = [cond for cond in msg.conditions if cond.name not in current_conditions] + + # add new conditions + for cond in new_conditions: + self.conditions.append(self.create_condition_from_message(cond)) + + def create_condition_from_message( + self, cond_info: messages.BeamlineConditionUpdateEntry + ) -> bl_states.BeamlineCondition: + """ + Create a BeamlineCondition instance from a BeamlineConditionUpdateEntry message. + + Args: + cond_info (messages.BeamlineConditionUpdateEntry): The condition update entry message. + Returns: + BeamlineCondition: The created BeamlineCondition instance. + """ + cls = getattr(bl_states, cond_info.condition_type, None) + if cls is None: + raise ValueError( + f"Condition type {cond_info.condition_type} not found in beamline_states." + ) + condition = cls(name=cond_info.name, redis_connector=self.connector, title=cond_info.title) + condition.configure(**cond_info.parameters) + condition.start() + return condition diff --git a/bec_server/bec_server/scan_server/scan_server.py b/bec_server/bec_server/scan_server/scan_server.py index f132ef2d4..c69c190b9 100644 --- a/bec_server/bec_server/scan_server/scan_server.py +++ b/bec_server/bec_server/scan_server/scan_server.py @@ -11,6 +11,7 @@ from bec_lib.scan_number_container import ScanNumberContainer from bec_lib.service_config import ServiceConfig +from .beamline_condition_manager import BeamlineConditionManager from .scan_assembler import ScanAssembler from .scan_guard import ScanGuard from .scan_manager import ScanManager @@ -40,6 +41,8 @@ def __init__(self, config: ServiceConfig, connector_cls: type[RedisConnector]): self._start_alarm_handler() self._reset_scan_number() self.status = messages.BECStatus.RUNNING + self.beamline_conditions = None + self._start_beamline_condition_manager() def _start_device_manager(self): self.wait_for_service("DeviceServer") @@ -58,6 +61,9 @@ def _start_scan_assembler(self): def _start_scan_guard(self): self.scan_guard = ScanGuard(parent=self) + def _start_beamline_condition_manager(self): + self.beamline_conditions = BeamlineConditionManager(self.connector) + def _start_alarm_handler(self): self.connector.register(MessageEndpoints.alarm(), cb=self._alarm_callback, parent=self) From 1d71019f8ff59ad8534f0ad2bd7ff58c4612c492 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Fri, 5 Dec 2025 17:17:45 +0100 Subject: [PATCH 2/3] refactor: deprecate bl_checks --- bec_lib/bec_lib/bl_checks.py | 2 +- bec_lib/bec_lib/bl_conditions.py | 89 ----------------------------- bec_lib/bec_lib/client.py | 6 -- bec_lib/tests/test_bl_conditions.py | 37 ------------ 4 files changed, 1 insertion(+), 133 deletions(-) delete mode 100644 bec_lib/bec_lib/bl_conditions.py delete mode 100644 bec_lib/tests/test_bl_conditions.py diff --git a/bec_lib/bec_lib/bl_checks.py b/bec_lib/bec_lib/bl_checks.py index 8bdc7c3bf..29c32163a 100644 --- a/bec_lib/bec_lib/bl_checks.py +++ b/bec_lib/bec_lib/bl_checks.py @@ -13,7 +13,7 @@ from typeguard import typechecked -from bec_lib.bl_conditions import BeamlineCondition +from bec_lib.beamline_states import BeamlineCondition from bec_lib.logger import bec_logger logger = bec_logger.logger diff --git a/bec_lib/bec_lib/bl_conditions.py b/bec_lib/bec_lib/bl_conditions.py deleted file mode 100644 index d1ae7fb98..000000000 --- a/bec_lib/bec_lib/bl_conditions.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -This module contains classes for beamline checks, used to check the beamline status. -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING - -if TYPE_CHECKING: # pragma: no cover - from bec_lib.device import Device - - -class BeamlineCondition(ABC): - """Abstract base class for beamline checks.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.enabled = True - - @property - @abstractmethod - def name(self) -> str: - """Return a name for the beamline check.""" - - @abstractmethod - def run(self) -> bool: - """Run the beamline check and return True if the beam is okay, False otherwise.""" - - @abstractmethod - def on_failure_msg(self) -> str: - """Return a message that will be displayed if the beamline check fails.""" - - -class ShutterCondition(BeamlineCondition): - """Check if the shutter is open.""" - - def __init__(self, shutter: Device): - super().__init__() - self.shutter = shutter - - @property - def name(self): - return "shutter" - - def run(self): - shutter_val = self.shutter.read(cached=True) - return shutter_val["value"].lower() == "open" - - def on_failure_msg(self): - return "Check beam failed: Shutter is closed." - - -class LightAvailableCondition(BeamlineCondition): - """Check if the light is available.""" - - def __init__(self, machine_status: Device): - super().__init__() - self.machine_status = machine_status - - @property - def name(self): - return "light_available" - - def run(self): - machine_status = self.machine_status.read(cached=True) - return machine_status["value"] in ["Light Available", "Light-Available"] - - def on_failure_msg(self): - return "Check beam failed: Light not available." - - -class FastOrbitFeedbackCondition(BeamlineCondition): - """Check if the fast orbit feedback is running.""" - - def __init__(self, sls_fast_orbit_feedback: Device): - super().__init__() - self.sls_fast_orbit_feedback = sls_fast_orbit_feedback - - @property - def name(self): - return "fast_orbit_feedback" - - def run(self): - fast_orbit_feedback = self.sls_fast_orbit_feedback.read(cached=True) - return fast_orbit_feedback["value"] == "running" - - def on_failure_msg(self): - return "Check beam failed: Fast orbit feedback is not running." diff --git a/bec_lib/bec_lib/client.py b/bec_lib/bec_lib/client.py index 4c2b17ced..d3e8b0390 100644 --- a/bec_lib/bec_lib/client.py +++ b/bec_lib/bec_lib/client.py @@ -21,7 +21,6 @@ from bec_lib.alarm_handler import AlarmHandler, Alarms from bec_lib.beamline_states import BeamlineConditionConfig from bec_lib.bec_service import BECService -from bec_lib.bl_checks import BeamlineChecks from bec_lib.callback_handler import CallbackHandler, EventType from bec_lib.config_helper import ConfigHelperUser from bec_lib.dap_plugins import DAPPlugins @@ -140,7 +139,6 @@ def __init__( self._live_updates = None self.dap = None self.device_monitor = None - self.bl_checks = None self.scans_namespace = SimpleNamespace() self._hli_funcs = {} self.metadata = {} @@ -225,8 +223,6 @@ def _start_services(self): self.config = ConfigHelperUser(self.device_manager) self.history = ScanHistory(client=self) self.dap = DAPPlugins(self) - self.bl_checks = BeamlineChecks(self) - self.bl_checks.start() self.device_monitor = DeviceMonitorPlugin(self.connector) self._update_username() self.beamline_conditions = BeamlineConditionConfig(client=self) @@ -325,8 +321,6 @@ def shutdown(self): self.queue.shutdown() if self.alarm_handler: self.alarm_handler.shutdown() - if self.bl_checks: - self.bl_checks.stop() if self.history is not None: # pylint: disable=protected-access self.history._shutdown() diff --git a/bec_lib/tests/test_bl_conditions.py b/bec_lib/tests/test_bl_conditions.py deleted file mode 100644 index 85e87df0b..000000000 --- a/bec_lib/tests/test_bl_conditions.py +++ /dev/null @@ -1,37 +0,0 @@ -from unittest import mock - -from bec_lib.bl_conditions import ( - FastOrbitFeedbackCondition, - LightAvailableCondition, - ShutterCondition, -) - - -def test_shutter_condition(): - device = mock.MagicMock() - shutter_condition = ShutterCondition(device) - shutter_condition.run() - device.read.assert_called_once() - assert shutter_condition.on_failure_msg() == "Check beam failed: Shutter is closed." - assert shutter_condition.name == "shutter" - - -def test_light_available_condition(): - device = mock.MagicMock() - light_available_condition = LightAvailableCondition(device) - light_available_condition.run() - device.read.assert_called_once() - assert light_available_condition.on_failure_msg() == "Check beam failed: Light not available." - assert light_available_condition.name == "light_available" - - -def test_fast_orbit_feedback_condition(): - device = mock.MagicMock() - fast_orbit_feedback_condition = FastOrbitFeedbackCondition(device) - fast_orbit_feedback_condition.run() - device.read.assert_called_once() - assert ( - fast_orbit_feedback_condition.on_failure_msg() - == "Check beam failed: Fast orbit feedback is not running." - ) - assert fast_orbit_feedback_condition.name == "fast_orbit_feedback" From ee406b9cb28324503911be2c1e1df3a113d4c049 Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Fri, 5 Dec 2025 17:18:48 +0100 Subject: [PATCH 3/3] refactor: rename beamline_states to bl_conditions --- bec_lib/bec_lib/bl_checks.py | 2 +- bec_lib/bec_lib/{beamline_states.py => bl_conditions.py} | 0 bec_lib/bec_lib/client.py | 2 +- bec_lib/tests/test_beamline_conditions.py | 2 +- .../bec_server/scan_server/beamline_condition_manager.py | 4 ++-- 5 files changed, 5 insertions(+), 5 deletions(-) rename bec_lib/bec_lib/{beamline_states.py => bl_conditions.py} (100%) diff --git a/bec_lib/bec_lib/bl_checks.py b/bec_lib/bec_lib/bl_checks.py index 29c32163a..8bdc7c3bf 100644 --- a/bec_lib/bec_lib/bl_checks.py +++ b/bec_lib/bec_lib/bl_checks.py @@ -13,7 +13,7 @@ from typeguard import typechecked -from bec_lib.beamline_states import BeamlineCondition +from bec_lib.bl_conditions import BeamlineCondition from bec_lib.logger import bec_logger logger = bec_logger.logger diff --git a/bec_lib/bec_lib/beamline_states.py b/bec_lib/bec_lib/bl_conditions.py similarity index 100% rename from bec_lib/bec_lib/beamline_states.py rename to bec_lib/bec_lib/bl_conditions.py diff --git a/bec_lib/bec_lib/client.py b/bec_lib/bec_lib/client.py index d3e8b0390..7325221a7 100644 --- a/bec_lib/bec_lib/client.py +++ b/bec_lib/bec_lib/client.py @@ -19,8 +19,8 @@ from rich.table import Table from bec_lib.alarm_handler import AlarmHandler, Alarms -from bec_lib.beamline_states import BeamlineConditionConfig from bec_lib.bec_service import BECService +from bec_lib.bl_conditions import BeamlineConditionConfig from bec_lib.callback_handler import CallbackHandler, EventType from bec_lib.config_helper import ConfigHelperUser from bec_lib.dap_plugins import DAPPlugins diff --git a/bec_lib/tests/test_beamline_conditions.py b/bec_lib/tests/test_beamline_conditions.py index 775cd7a0a..e453f80c7 100644 --- a/bec_lib/tests/test_beamline_conditions.py +++ b/bec_lib/tests/test_beamline_conditions.py @@ -3,7 +3,7 @@ import pytest from bec_lib import messages -from bec_lib.beamline_states import ( +from bec_lib.bl_conditions import ( BeamlineCondition, BeamlineConditionConfig, DeviceBeamlineCondition, diff --git a/bec_server/bec_server/scan_server/beamline_condition_manager.py b/bec_server/bec_server/scan_server/beamline_condition_manager.py index 825fd5e38..7e159ba3d 100644 --- a/bec_server/bec_server/scan_server/beamline_condition_manager.py +++ b/bec_server/bec_server/scan_server/beamline_condition_manager.py @@ -1,6 +1,6 @@ from __future__ import annotations -from bec_lib import beamline_states as bl_states +from bec_lib import bl_conditions as bl_states from bec_lib import messages from bec_lib.endpoints import MessageEndpoints from bec_lib.redis_connector import RedisConnector @@ -66,7 +66,7 @@ def create_condition_from_message( cls = getattr(bl_states, cond_info.condition_type, None) if cls is None: raise ValueError( - f"Condition type {cond_info.condition_type} not found in beamline_states." + f"Condition type {cond_info.condition_type} not found in beamline conditions." ) condition = cls(name=cond_info.name, redis_connector=self.connector, title=cond_info.title) condition.configure(**cond_info.parameters)