From feaee388be5bb134da51707f400d8083684e47c4 Mon Sep 17 00:00:00 2001 From: guillaumepichon Date: Fri, 26 Sep 2025 17:04:53 +0200 Subject: [PATCH 1/6] First implementation for direct access to hardware values in control system --- pyaml/control/abstract_impl.py | 20 +++-- pyaml/control/controlsystem.py | 2 +- pyaml/magnet/hcorrector.py | 1 - pyaml/magnet/identity_cfm_model.py | 84 +++++++++++++++++++ pyaml/magnet/identity_model.py | 82 ++++++++++++++++++ pyaml/magnet/linear_cfm_model.py | 2 +- pyaml/magnet/linear_model.py | 2 +- pyaml/magnet/magnet.py | 30 +++---- pyaml/magnet/model.py | 2 +- pyaml/magnet/octupole.py | 1 - pyaml/magnet/quadrupole.py | 1 - pyaml/magnet/sextupole.py | 1 - pyaml/magnet/skewoctu.py | 1 - pyaml/magnet/skewquad.py | 1 - pyaml/magnet/skewsext.py | 1 - pyaml/magnet/spline_model.py | 2 +- pyaml/magnet/vcorrector.py | 1 - .../pyaml_external/external_magnet_model.py | 2 +- 18 files changed, 197 insertions(+), 39 deletions(-) create mode 100644 pyaml/magnet/identity_cfm_model.py create mode 100644 pyaml/magnet/identity_model.py diff --git a/pyaml/control/abstract_impl.py b/pyaml/control/abstract_impl.py index 237096ba..8db19439 100644 --- a/pyaml/control/abstract_impl.py +++ b/pyaml/control/abstract_impl.py @@ -1,13 +1,16 @@ +from numpy import double + from pyaml.control import abstract from pyaml.magnet.model import MagnetModel import numpy as np #------------------------------------------------------------------------------ -class RWHardwareScalar(abstract.ReadFloatScalar): +class RWHardwareScalar(abstract.ReadWriteFloatScalar): """ Class providing read write access to a magnet of a control system (in hardware units) """ + def __init__(self, model:MagnetModel): self.model = model @@ -15,8 +18,11 @@ def get(self) -> float: return self.model.read_hardware_values()[0] def set(self, value:float): - self.model.send_harware_values([value]) - + self.model.send_hardware_values([value]) + + def set_and_wait(self, value: double): + raise NotImplementedError("Not implemented yet.") + def unit(self) -> str: return self.model.get_hardware_units()[0] @@ -40,8 +46,8 @@ def get(self) -> float: # Sets the value def set(self, value:float): - current = self.__model.compute_hardware_values([value]) - self.__model.send_harware_values(current) + current = self.__model.compute_hardware_values(np.array(value)) + self.__model.send_hardware_values(current) # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:float): @@ -69,7 +75,7 @@ def get(self) -> np.array: # Sets the value def set(self, value:np.array): - self.model.send_harware_values(value) + self.model.send_hardware_values(value) # Sets the value and waits that the read value reach the setpoint def set_and_wait(self, value:np.array): @@ -98,7 +104,7 @@ def get(self) -> np.array: # Sets the value def set(self, value:np.array): cur = self.model.compute_hardware_values(value) - self.model.send_harware_values(cur) + self.model.send_hardware_values(cur) # Sets the value and waits that the read value reach the setpoint def set_and_wait(self, value:np.array): diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index da2de346..5cbbc8f9 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -49,7 +49,7 @@ def fill_device(self,elements:list[Element]): current = RWHardwareScalar(e.model) strength = RWStrengthScalar(e.model) # Create a unique ref for this control system - m = e.attach(strength,current) + m = e.attach(strength, current) self.add_magnet(str(m),m) elif isinstance(e,CombinedFunctionMagnet): self.add_magnet(str(e),e) diff --git a/pyaml/magnet/hcorrector.py b/pyaml/magnet/hcorrector.py index 0f98e8ae..ae792cdb 100644 --- a/pyaml/magnet/hcorrector.py +++ b/pyaml/magnet/hcorrector.py @@ -13,7 +13,6 @@ class HCorrector(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/identity_cfm_model.py b/pyaml/magnet/identity_cfm_model.py new file mode 100644 index 00000000..a859a063 --- /dev/null +++ b/pyaml/magnet/identity_cfm_model.py @@ -0,0 +1,84 @@ +import numpy as np +from pydantic import BaseModel,ConfigDict + +from .model import MagnetModel +from .. import PyAMLException +from ..configuration.curve import Curve +from ..control.deviceaccess import DeviceAccess + +# Define the main class name for this module +PYAMLCLASS = "IdentityCFMagnetModel" + +class ConfigModel(BaseModel): + + model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid") + + multipoles: list[str] + """List of supported functions: A0,B0,A1,B1,etc (i.e. [B0,A1,B2])""" + powerconverters: list[DeviceAccess]|None = None + """Power converter device to apply current""" + magnets: list[DeviceAccess]|None = None + """Magnet device to apply strength""" + units: list[str] + """List of strength unit (i.e. ['rad','m-1','m-2'])""" + +class IdentityCFMagnetModel(MagnetModel): + """ + Class that handle magnet current/strength direct access for a single function magnet + """ + + def __init__(self, cfg: ConfigModel): + self._cfg = cfg + self.__strength_unit = cfg.unit + self.__ps = None + self.__hardware_unit = None + self.__magnets = None + self.__magnet_unit = None + if cfg.powerconverters is not None: + self.__ps = cfg.powerconverters + self.__hardware_unit = [powerconverter.unit() for powerconverter in cfg.powerconverters] + if cfg.magnets is not None: + self.__magnets = cfg.magnets + self.__magnet_unit = [magnet.unit() for magnet in cfg.magnets] + + def compute_hardware_values(self, strengths: np.array) -> np.array: + raise PyAMLException("The identity model does not support computation") + + def compute_strengths(self, currents: np.array) -> np.array: + raise PyAMLException("The identity model does not support computation") + + def get_strength_units(self) -> list[str]: + return [self.__magnet_unit] if self.__magnet_unit is not None else [""] + + def get_hardware_units(self) -> list[str]: + return [p.unit() for p in self._cfg.powerconverters] if self.__hardware_unit is not None else [""] + + def read_hardware_values(self) -> np.array: + if self.__ps is None: + raise PyAMLException(f"{str(self)} does not supports physics values") + return [self.__ps.get()] + + def readback_hardware_values(self) -> np.array: + if self.__ps is None: + raise PyAMLException(f"{str(self)} does not supports hardware values") + return [ps.readback() for ps in self.__ps] + + def send_hardware_values(self, currents: np.array): + if self.__ps is None: + raise PyAMLException(f"{str(self)} does not hardware physics values") + [ps.set(currents) for ps in self.__ps] + + def get_devices(self) -> list[DeviceAccess]: + devices = [] + if self.__ps is not None: + devices.extend(self.__ps) + if self.__magnets is not None: + devices.extend(self.__magnets) + return devices + + def __repr__(self): + return f"{self.__class__.__name__}(identity {("Magnet" if self.__magnets is not None else "Power supply")}, unit={self.__strength_unit})" + + def set_magnet_rigidity(self, brho: np.double): + pass + diff --git a/pyaml/magnet/identity_model.py b/pyaml/magnet/identity_model.py new file mode 100644 index 00000000..0f103b7d --- /dev/null +++ b/pyaml/magnet/identity_model.py @@ -0,0 +1,82 @@ +import numpy as np +from pydantic import BaseModel,ConfigDict + +from .model import MagnetModel +from .. import PyAMLException +from ..configuration.curve import Curve +from ..control.deviceaccess import DeviceAccess + +# Define the main class name for this module +PYAMLCLASS = "IdentityMagnetModel" + +class ConfigModel(BaseModel): + + model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid") + + powerconverter: DeviceAccess|None = None + """Power converter device to apply current""" + magnet: DeviceAccess|None = None + """Magnet device to apply strength""" + unit: str + """Unit of the strength (i.e. 1/m or m-1)""" + +class IdentityMagnetModel(MagnetModel): + """ + Class that handle magnet current/strength direct access for a single function magnet + """ + + def __init__(self, cfg: ConfigModel): + self._cfg = cfg + self.__strength_unit = cfg.unit + self.__ps = None + self.__hardware_unit = None + self.__magnet = None + self.__magnet_unit = None + if cfg.powerconverter is not None: + self.__ps = cfg.powerconverter + self.__hardware_unit = cfg.powerconverter.unit() + if cfg.magnet is not None: + self.__magnet = cfg.magnet + self.__magnet_unit = cfg.magnet.unit() + + def compute_hardware_values(self, strengths: np.array) -> np.array: + raise PyAMLException("The identity model does not support computation") + + def compute_strengths(self, currents: np.array) -> np.array: + raise PyAMLException("The identity model does not support computation") + + def get_strength_units(self) -> list[str]: + return [self.__magnet_unit] if self.__magnet_unit is not None else [""] + + def get_hardware_units(self) -> list[str]: + return [self.__hardware_unit] if self.__hardware_unit is not None else [""] + + def read_hardware_values(self) -> np.array: + if self.__ps is None: + raise PyAMLException(f"{str(self)} does not supports physics values") + return [self.__ps.get()] + + def readback_hardware_values(self) -> np.array: + if self.__ps is None: + raise PyAMLException(f"{str(self)} does not supports hardware values") + return [self.__ps.readback()] + + def send_hardware_values(self, currents: np.array): + if self.__ps is None: + raise PyAMLException(f"{str(self)} does not hardware physics values") + self.__ps.set(currents[0]) + + def get_devices(self) -> list[DeviceAccess]: + devices = [] + if self.__ps is not None: + devices.append(self.__ps) + if self.__magnet is not None: + devices.append(self.__magnet) + return devices + + def __repr__(self): + return f"{self.__class__.__name__}(identity {("Magnet" if self.__magnet is not None else "Power supply")}, unit={self.__strength_unit})" + + def set_magnet_rigidity(self, brho: np.double): + pass + diff --git a/pyaml/magnet/linear_cfm_model.py b/pyaml/magnet/linear_cfm_model.py index a5fa28be..d3e78933 100644 --- a/pyaml/magnet/linear_cfm_model.py +++ b/pyaml/magnet/linear_cfm_model.py @@ -149,7 +149,7 @@ def read_hardware_values(self) -> np.array: def readback_hardware_values(self) -> np.array: return np.array([p.readback() for p in self._cfg.powerconverters]) - def send_harware_values(self, currents: np.array): + def send_hardware_values(self, currents: np.array): for idx, p in enumerate(self._cfg.powerconverters): p.set(currents[idx]) diff --git a/pyaml/magnet/linear_model.py b/pyaml/magnet/linear_model.py index 67a1c45a..d5414529 100644 --- a/pyaml/magnet/linear_model.py +++ b/pyaml/magnet/linear_model.py @@ -66,7 +66,7 @@ def read_hardware_values(self) -> np.array: def readback_hardware_values(self) -> np.array: return [self.__ps.readback()] - def send_harware_values(self, currents: np.array): + def send_hardware_values(self, currents: np.array): self.__ps.set(currents[0]) def get_devices(self) -> list[DeviceAccess]: diff --git a/pyaml/magnet/magnet.py b/pyaml/magnet/magnet.py index ff37aacc..de60017e 100644 --- a/pyaml/magnet/magnet.py +++ b/pyaml/magnet/magnet.py @@ -1,14 +1,14 @@ from pyaml.lattice.element import Element,ElementConfigModel +from .. import PyAMLException from ..control.deviceaccess import DeviceAccess from ..control import abstract from .model import MagnetModel from scipy.constants import speed_of_light from typing import Self +import numpy as np class MagnetConfigModel(ElementConfigModel): - hardware: DeviceAccess | None = None - """Direct access to a magnet device that provides strength/current conversion""" model: MagnetModel | None = None """Object in charge of converting magnet strenghts to power supply values""" @@ -17,7 +17,7 @@ class Magnet(Element): Class providing access to one magnet of a physical or simulated lattice """ - def __init__(self, name:str, hardware:DeviceAccess = None, model:MagnetModel = None): + def __init__(self, name:str, model:MagnetModel = None): """ Construct a magnet @@ -25,30 +25,24 @@ def __init__(self, name:str, hardware:DeviceAccess = None, model:MagnetModel = N ---------- name : str Element name - hardware : DeviceAccess - Direct access to a hardware (bypass the magnet model) model : MagnetModel - Magnet model in charge of comutping coil(s) current + Magnet model in charge of computing coil(s) current """ super().__init__(name) self.__model = model self.__strength = None self.__hardware = None - if hardware is not None: - # TODO - # Direct access to a magnet device that supports strength/current conversion - raise Exception( - " %s, hardware access not implemented" % (self.__class__.__name__,name) - ) - + @property def strength(self) -> abstract.ReadWriteFloatScalar: + if self.__strength is None: + raise PyAMLException(f"{str(self)} has no model that supports physics units") return self.__strength @property def hardware(self) -> abstract.ReadWriteFloatScalar: if self.__hardware is None: - raise Exception(f"{str(self)} has no model that supports hardware units") + raise PyAMLException(f"{str(self)} has no model that supports hardware units") return self.__hardware @property @@ -56,12 +50,12 @@ def model(self) -> MagnetModel: return self.__model def attach(self, strength: abstract.ReadWriteFloatScalar, hardware: abstract.ReadWriteFloatScalar) -> Self: - # Attach strengh and current attribute and returns a new reference + # Attach strength and current attribute and returns a new reference obj = self.__class__(self._cfg) obj.__strength = strength obj.__hardware = hardware return obj - def set_energy(self,E:float): - if(self.__model is not None): - self.__model.set_magnet_rigidity(E/speed_of_light) + def set_energy(self, energy:float): + if self.__model is not None: + self.__model.set_magnet_rigidity(np.double(energy / speed_of_light)) diff --git a/pyaml/magnet/model.py b/pyaml/magnet/model.py index 56323391..98a27199 100644 --- a/pyaml/magnet/model.py +++ b/pyaml/magnet/model.py @@ -91,7 +91,7 @@ def readback_hardware_values(self) -> npt.NDArray[np.float64]: pass @abstractmethod - def send_harware_values(self, hardware_values: npt.NDArray[np.float64]): + def send_hardware_values(self, hardware_values: npt.NDArray[np.float64]): """ Send power supply value(s) to control system diff --git a/pyaml/magnet/octupole.py b/pyaml/magnet/octupole.py index 14fbad0c..49ec78f4 100644 --- a/pyaml/magnet/octupole.py +++ b/pyaml/magnet/octupole.py @@ -13,7 +13,6 @@ class Octupole(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/quadrupole.py b/pyaml/magnet/quadrupole.py index 08c1edb5..063ba226 100644 --- a/pyaml/magnet/quadrupole.py +++ b/pyaml/magnet/quadrupole.py @@ -13,7 +13,6 @@ class Quadrupole(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/sextupole.py b/pyaml/magnet/sextupole.py index 1c2e2b85..826ec0f9 100644 --- a/pyaml/magnet/sextupole.py +++ b/pyaml/magnet/sextupole.py @@ -13,7 +13,6 @@ class Sextupole(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/skewoctu.py b/pyaml/magnet/skewoctu.py index 985e6431..4d0cb375 100644 --- a/pyaml/magnet/skewoctu.py +++ b/pyaml/magnet/skewoctu.py @@ -13,7 +13,6 @@ class SkewOctu(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/skewquad.py b/pyaml/magnet/skewquad.py index 57cefb27..6c51ef94 100644 --- a/pyaml/magnet/skewquad.py +++ b/pyaml/magnet/skewquad.py @@ -13,7 +13,6 @@ class SkewQuad(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/skewsext.py b/pyaml/magnet/skewsext.py index eb00835d..e3f90c95 100644 --- a/pyaml/magnet/skewsext.py +++ b/pyaml/magnet/skewsext.py @@ -13,7 +13,6 @@ class SkewSext(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/pyaml/magnet/spline_model.py b/pyaml/magnet/spline_model.py index 28ba9b6f..0b0f6c48 100644 --- a/pyaml/magnet/spline_model.py +++ b/pyaml/magnet/spline_model.py @@ -67,7 +67,7 @@ def read_hardware_values(self) -> np.array: def readback_hardware_values(self) -> np.array: return [self.__ps.readback()] - def send_harware_values(self, currents: np.array): + def send_hardware_values(self, currents: np.array): self.__ps.set(currents[0]) def get_devices(self) -> list[DeviceAccess]: diff --git a/pyaml/magnet/vcorrector.py b/pyaml/magnet/vcorrector.py index bd2213bc..ea4b1d60 100644 --- a/pyaml/magnet/vcorrector.py +++ b/pyaml/magnet/vcorrector.py @@ -13,7 +13,6 @@ class VCorrector(Magnet): def __init__(self, cfg: ConfigModel): super().__init__( cfg.name, - cfg.hardware if hasattr(cfg, "hardware") else None, cfg.model if hasattr(cfg, "model") else None, ) self._cfg = cfg diff --git a/tests/external/pyaml_external/external_magnet_model.py b/tests/external/pyaml_external/external_magnet_model.py index 2f5aa86f..69ba69e9 100644 --- a/tests/external/pyaml_external/external_magnet_model.py +++ b/tests/external/pyaml_external/external_magnet_model.py @@ -66,7 +66,7 @@ def readback_hardware_values(self) -> np.array: pass # Send power supply current(s) to control system - def send_harware_values(self,currents:np.array): + def send_hardware_values(self, currents:np.array): self._ps.set(currents) pass From 530b6147fe9d4c3c4fdd181ee5aa5a43191c8eb5 Mon Sep 17 00:00:00 2001 From: guillaumepichon Date: Mon, 29 Sep 2025 16:47:55 +0200 Subject: [PATCH 2/6] Implementation for direct access to hardware values in control system. First tests but not complete yet. --- pyaml/control/abstract_impl.py | 17 ++++------ pyaml/magnet/identity_cfm_model.py | 34 ++++++++++++------- pyaml/magnet/identity_model.py | 33 +++++++++++------- pyaml/magnet/model.py | 10 ++++++ .../config/sr/quadrupoles/QF1AC01-IDENT.yaml | 10 ++++++ tests/conftest.py | 4 +++ tests/test_load_quad.py | 15 +++++--- 7 files changed, 83 insertions(+), 40 deletions(-) create mode 100644 tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml diff --git a/pyaml/control/abstract_impl.py b/pyaml/control/abstract_impl.py index 8db19439..8d78da2a 100644 --- a/pyaml/control/abstract_impl.py +++ b/pyaml/control/abstract_impl.py @@ -18,7 +18,7 @@ def get(self) -> float: return self.model.read_hardware_values()[0] def set(self, value:float): - self.model.send_hardware_values([value]) + self.model.send_hardware_values(np.array([value])) def set_and_wait(self, value: double): raise NotImplementedError("Not implemented yet.") @@ -41,13 +41,11 @@ def __init__(self, model:MagnetModel): # Gets the value def get(self) -> float: - currents = self.__model.read_hardware_values() - return self.__model.compute_strengths(currents)[0] + return self.__model.get_strengths()[0] # Sets the value def set(self, value:float): - current = self.__model.compute_hardware_values(np.array(value)) - self.__model.send_hardware_values(current) + self.__model.set_strengths([value]) # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:float): @@ -97,15 +95,12 @@ def __init__(self, model:MagnetModel): # Gets the value def get(self) -> np.array: - r = self.model.read_hardware_values() - str = self.model.compute_strengths(r) - return str + return self.model.get_strengths() # Sets the value def set(self, value:np.array): - cur = self.model.compute_hardware_values(value) - self.model.send_hardware_values(cur) - + self.model.set_strengths(value) + # Sets the value and waits that the read value reach the setpoint def set_and_wait(self, value:np.array): raise NotImplementedError("Not implemented yet.") diff --git a/pyaml/magnet/identity_cfm_model.py b/pyaml/magnet/identity_cfm_model.py index a859a063..518fd26a 100644 --- a/pyaml/magnet/identity_cfm_model.py +++ b/pyaml/magnet/identity_cfm_model.py @@ -17,7 +17,7 @@ class ConfigModel(BaseModel): """List of supported functions: A0,B0,A1,B1,etc (i.e. [B0,A1,B2])""" powerconverters: list[DeviceAccess]|None = None """Power converter device to apply current""" - magnets: list[DeviceAccess]|None = None + physics: list[DeviceAccess] | None = None """Magnet device to apply strength""" units: list[str] """List of strength unit (i.e. ['rad','m-1','m-2'])""" @@ -32,14 +32,14 @@ def __init__(self, cfg: ConfigModel): self.__strength_unit = cfg.unit self.__ps = None self.__hardware_unit = None - self.__magnets = None - self.__magnet_unit = None + self.__physics = None + self.__physics_unit = None if cfg.powerconverters is not None: self.__ps = cfg.powerconverters self.__hardware_unit = [powerconverter.unit() for powerconverter in cfg.powerconverters] - if cfg.magnets is not None: - self.__magnets = cfg.magnets - self.__magnet_unit = [magnet.unit() for magnet in cfg.magnets] + if cfg.physics is not None: + self.__physics = cfg.physics + self.__physics_unit = [magnet.unit() for magnet in cfg.physics] def compute_hardware_values(self, strengths: np.array) -> np.array: raise PyAMLException("The identity model does not support computation") @@ -48,14 +48,14 @@ def compute_strengths(self, currents: np.array) -> np.array: raise PyAMLException("The identity model does not support computation") def get_strength_units(self) -> list[str]: - return [self.__magnet_unit] if self.__magnet_unit is not None else [""] + return [self.__physics_unit] if self.__physics_unit is not None else [""] def get_hardware_units(self) -> list[str]: return [p.unit() for p in self._cfg.powerconverters] if self.__hardware_unit is not None else [""] def read_hardware_values(self) -> np.array: if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports physics values") + raise PyAMLException(f"{str(self)} does not supports hardware values") return [self.__ps.get()] def readback_hardware_values(self) -> np.array: @@ -65,20 +65,30 @@ def readback_hardware_values(self) -> np.array: def send_hardware_values(self, currents: np.array): if self.__ps is None: - raise PyAMLException(f"{str(self)} does not hardware physics values") + raise PyAMLException(f"{str(self)} does not hardware hardware values") [ps.set(currents) for ps in self.__ps] def get_devices(self) -> list[DeviceAccess]: devices = [] if self.__ps is not None: devices.extend(self.__ps) - if self.__magnets is not None: - devices.extend(self.__magnets) + if self.__physics is not None: + devices.extend(self.__physics) return devices def __repr__(self): - return f"{self.__class__.__name__}(identity {("Magnet" if self.__magnets is not None else "Power supply")}, unit={self.__strength_unit})" + return f"{self.__class__.__name__}(identity {("Magnet" if self.__physics is not None else "Power supply")}, unit={self.__strength_unit})" def set_magnet_rigidity(self, brho: np.double): pass + def get_strengths(self) -> np.array: + if self.__physics is None: + raise PyAMLException(f"{str(self)} does not supports physics values") + return np.array([np.float64(magnet.get()) for magnet in self.__physics]) + + def set_strengths(self, values:list[float]): + if self.__physics is None: + raise PyAMLException(f"{str(self)} does not supports physics values") + for value, magnet in zip(values, self.__physics): + magnet.set(value) diff --git a/pyaml/magnet/identity_model.py b/pyaml/magnet/identity_model.py index 0f103b7d..3ef33f97 100644 --- a/pyaml/magnet/identity_model.py +++ b/pyaml/magnet/identity_model.py @@ -15,7 +15,7 @@ class ConfigModel(BaseModel): powerconverter: DeviceAccess|None = None """Power converter device to apply current""" - magnet: DeviceAccess|None = None + physics: DeviceAccess|None = None """Magnet device to apply strength""" unit: str """Unit of the strength (i.e. 1/m or m-1)""" @@ -30,14 +30,14 @@ def __init__(self, cfg: ConfigModel): self.__strength_unit = cfg.unit self.__ps = None self.__hardware_unit = None - self.__magnet = None - self.__magnet_unit = None + self.__physics = None + self.__physics_unit = None if cfg.powerconverter is not None: self.__ps = cfg.powerconverter self.__hardware_unit = cfg.powerconverter.unit() - if cfg.magnet is not None: - self.__magnet = cfg.magnet - self.__magnet_unit = cfg.magnet.unit() + if cfg.physics is not None: + self.__physics = cfg.physics + self.__physics_unit = cfg.physics.unit() def compute_hardware_values(self, strengths: np.array) -> np.array: raise PyAMLException("The identity model does not support computation") @@ -46,14 +46,14 @@ def compute_strengths(self, currents: np.array) -> np.array: raise PyAMLException("The identity model does not support computation") def get_strength_units(self) -> list[str]: - return [self.__magnet_unit] if self.__magnet_unit is not None else [""] + return [self.__physics_unit] if self.__physics_unit is not None else [""] def get_hardware_units(self) -> list[str]: return [self.__hardware_unit] if self.__hardware_unit is not None else [""] def read_hardware_values(self) -> np.array: if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports physics values") + raise PyAMLException(f"{str(self)} does not supports hardware values") return [self.__ps.get()] def readback_hardware_values(self) -> np.array: @@ -63,20 +63,29 @@ def readback_hardware_values(self) -> np.array: def send_hardware_values(self, currents: np.array): if self.__ps is None: - raise PyAMLException(f"{str(self)} does not hardware physics values") + raise PyAMLException(f"{str(self)} does not supports hardware values") self.__ps.set(currents[0]) def get_devices(self) -> list[DeviceAccess]: devices = [] if self.__ps is not None: devices.append(self.__ps) - if self.__magnet is not None: - devices.append(self.__magnet) + if self.__physics is not None: + devices.append(self.__physics) return devices def __repr__(self): - return f"{self.__class__.__name__}(identity {("Magnet" if self.__magnet is not None else "Power supply")}, unit={self.__strength_unit})" + return f"{self.__class__.__name__}(identity {("Magnet" if self.__physics is not None else "Power supply")}, unit={self.__strength_unit})" def set_magnet_rigidity(self, brho: np.double): pass + def get_strengths(self) -> np.array: + if self.__physics is None: + raise PyAMLException(f"{str(self)} does not supports physics values") + return np.array([np.float64(self.__physics.get())]) + + def set_strengths(self, values:list[float]): + if self.__physics is None: + raise PyAMLException(f"{str(self)} does not supports physics values") + self.__physics.set(values[0]) diff --git a/pyaml/magnet/model.py b/pyaml/magnet/model.py index 98a27199..a1eb6644 100644 --- a/pyaml/magnet/model.py +++ b/pyaml/magnet/model.py @@ -137,3 +137,13 @@ def hasHardwareMapping(self) -> bool: True if the model supports hardware unit """ return True + + # Gets the value + def get_strengths(self) -> npt.NDArray[np.float64]: + currents = self.read_hardware_values() + return self.compute_strengths(currents) + + # Sets the value + def set_strengths(self, values:list[float]): + current = self.compute_hardware_values(np.array(values)) + self.send_hardware_values(current) diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml new file mode 100644 index 00000000..eb6a1512 --- /dev/null +++ b/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml @@ -0,0 +1,10 @@ +type: pyaml.magnet.quadrupole +name: QF1A-C01 +model: + type: pyaml.magnet.identity_model + unit: 1/m + physics: + type: pyaml.control.device + setpoint: sr/ps-qf1/c01-a/current + readback: sr/ps-qf1/c01-a/current + unit: A diff --git a/tests/conftest.py b/tests/conftest.py index e07c6420..6265f020 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,7 +27,11 @@ def install_test_package(request): def test_x(install_test_package): ... """ + info = request.param + if info is None: + yield None + return package_name = info["name"] package_path = None if info["path"] is not None: diff --git a/tests/test_load_quad.py b/tests/test_load_quad.py index 28dc45c6..0ad81438 100644 --- a/tests/test_load_quad.py +++ b/tests/test_load_quad.py @@ -30,11 +30,16 @@ def test_quad_external_model(install_test_package, config_root_dir): print(ref_corr.strength.get()) Factory.clear() -@pytest.mark.parametrize("magnet_file", [ - "sr/quadrupoles/QF1AC01.yaml", - "sr/quadrupoles/QF1AC01.json", -]) -def test_quad_linear(magnet_file, config_root_dir): +@pytest.mark.parametrize( + ("magnet_file", "install_test_package"), + [ + ("sr/quadrupoles/QF1AC01.yaml", None), + ("sr/quadrupoles/QF1AC01-IDENT.yaml", {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01.json", None), + ], + indirect=["install_test_package"], +) +def test_quad_linear(magnet_file, install_test_package, config_root_dir): set_root_folder(config_root_dir) cfg_quad = load(magnet_file) print(f"Current file: {config_root_dir}/{magnet_file}") From 7f82a9e71aa320a899fc0f584a0d7589d92676f1 Mon Sep 17 00:00:00 2001 From: guillaumepichon Date: Wed, 1 Oct 2025 10:20:04 +0200 Subject: [PATCH 3/6] Test for identity model with a computation outside pyaml. --- .../sr/quadrupoles/QF1AC01-IDENT-HW.yaml | 17 +++ .../config/sr/quadrupoles/QF1AC01-IDENT.yaml | 6 +- ...ith_tango_powersupply_mocking_behaviour.py | 144 ++++++++++++++++++ tests/test_load_quad.py | 32 ++-- 4 files changed, 185 insertions(+), 14 deletions(-) create mode 100644 tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml create mode 100644 tests/dummy_cs/tango/tango/pyaml/attribute_with_tango_powersupply_mocking_behaviour.py diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml new file mode 100644 index 00000000..a4885bcd --- /dev/null +++ b/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml @@ -0,0 +1,17 @@ +type: pyaml.magnet.quadrupole +name: QF1A-C01 +model: + type: pyaml.magnet.identity_model + unit: 1/m + physics: + type: tango.pyaml.attribute_with_tango_powersupply_mocking_behaviour + attribute: sr/ps-qf1/c01-a/strength + unit: 1/m + powerconverter: + type: tango.pyaml.attribute_with_tango_powersupply_mocking_behaviour + attribute: sr/ps-qf1/c01-a/current + calibration_factor: 1.00504 + calibration_offset: 0.0 + magnet_energy: 6e9 + curve: sr/magnet_models/QF1_strength.csv + unit: A diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml index eb6a1512..ae0de2bf 100644 --- a/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml +++ b/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml @@ -5,6 +5,6 @@ model: unit: 1/m physics: type: pyaml.control.device - setpoint: sr/ps-qf1/c01-a/current - readback: sr/ps-qf1/c01-a/current - unit: A + setpoint: sr/ps-qf1/c01-a/strength + readback: sr/ps-qf1/c01-a/strength + unit: 1/m diff --git a/tests/dummy_cs/tango/tango/pyaml/attribute_with_tango_powersupply_mocking_behaviour.py b/tests/dummy_cs/tango/tango/pyaml/attribute_with_tango_powersupply_mocking_behaviour.py new file mode 100644 index 00000000..846a1e1e --- /dev/null +++ b/tests/dummy_cs/tango/tango/pyaml/attribute_with_tango_powersupply_mocking_behaviour.py @@ -0,0 +1,144 @@ +from pathlib import Path + +import numpy as np +from pydantic import BaseModel,ConfigDict +from scipy.constants import speed_of_light + +from pyaml.configuration.curve import Curve +from pyaml.control.deviceaccess import DeviceAccess +from pyaml.control.readback_value import Value +from pyaml.configuration import get_root_folder + +PYAMLCLASS : str = "AttributeWithTangoMockingBehaviour" + +class ConfigModel(BaseModel): + + model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid") + + attribute: str + calibration_factor: float = 1.0 + """Correction factor applied to the curve""" + calibration_offset: float = 0.0 + """Correction offset applied to the curve")""" + crosstalk: float = 1.0 + """Crosstalk factor""" + magnet_energy: float = 0.0 + curve: str = None + unit: str = "" + +class TangoDevice: + def __init__(self, name: str): + self.name = name + self._current:float = 0.0 + self._strength:float = 0.0 + self._attributes:dict[str, float] = {} + self.__calibration_factor: float = 1.0 + self.__calibration_offset: float = 0.0 + self.__brho: float = np.nan + self._curve_file: str = "" + self.__curve = None + self.__rcurve = None + + def read(self, attr_name: str) -> float: + if attr_name == "current": + return self._current + elif attr_name == "strength": + return self._strength + else: + if attr_name in self._attributes: + return self._attributes[attr_name] + else: + return 0.0 + + def write(self, attr_name: str, value: float): + if attr_name == "current": + self._current = value + self.__compute_strength() + elif attr_name == "strength": + self._strength = value + self.__compute_current() + else: + self._attributes[attr_name] = value + + def __compute_strength(self): + if self.__curve is not None: + self._strength = self.compute_strengths(np.array([self._current]))[0] + + def __compute_current(self): + if self.__curve is not None: + self._current = self.compute_hardware_values(np.array([self._strength]))[0] + + def compute_hardware_values(self, strengths: np.array) -> np.array: + _current = np.interp( + strengths[0] * self.__brho, self.__rcurve[:, 0], self.__rcurve[:, 1] + ) + return np.array([_current]) + + def compute_strengths(self, currents: np.array) -> np.array: + _strength = ( + np.interp(currents[0], self.__curve[:, 0], self.__curve[:, 1]) / self.__brho + ) + return np.array([_strength]) + + def set_magnet_model_data(self: float, calibration_factor, calibration_offset: float, crosstalk: float, magnet_energy: float, curve_file: str): + self.__calibration_factor = calibration_factor + self.__calibration_offset = calibration_offset + self.__brho = magnet_energy / speed_of_light + self._curve_file = curve_file + path:Path = get_root_folder() / curve_file + self.__curve = np.genfromtxt(path, delimiter=",", dtype=float) + _s = np.shape(self.__curve) + if len(_s) != 2 or _s[1] != 2: + raise Exception(curve_file + " wrong dimension") + self.__curve[:, 1] = ( + self.__curve[:, 1] * calibration_factor * crosstalk + calibration_offset + ) + self.__rcurve = Curve.inverse(self.__curve) + + +TANGO_DEVICES:dict[str, TangoDevice] = {} + +def get_device(name: str) -> TangoDevice: + if name in TANGO_DEVICES: + device = TANGO_DEVICES[name] + else: + device = TangoDevice(name) + TANGO_DEVICES[name] = device + return device + +class AttributeWithTangoMockingBehaviour(DeviceAccess): + """ + Class that implements a default device class that just prints out + values (Debugging purpose) + """ + def __init__(self, cfg: ConfigModel): + super().__init__() + self._cfg = cfg + self._setpoint = cfg.attribute + self._readback = cfg.attribute + self._unit = cfg.unit + self._device_name, self._attribute_name = cfg.attribute.rsplit("/", 1) + self._device = get_device(self._device_name) + if cfg.curve is not None: + self._device.set_magnet_model_data(cfg.calibration_factor, cfg.calibration_offset, cfg.crosstalk, cfg.magnet_energy, cfg.curve) + + def name(self) -> str: + return self._setpoint + + def measure_name(self) -> str: + return self._readback + + def set(self, value: float): + self._device.write(self._attribute_name, value) + + def set_and_wait(self, value: float): + self._device.write(self._attribute_name, value) + + def get(self) -> float: + return self._device.read(self._attribute_name) + + def readback(self) -> Value: + return Value(self.get()) + + def unit(self) -> str: + return self._unit diff --git a/tests/test_load_quad.py b/tests/test_load_quad.py index 0ad81438..490ef5db 100644 --- a/tests/test_load_quad.py +++ b/tests/test_load_quad.py @@ -3,6 +3,7 @@ import numpy as np from scipy.constants import speed_of_light +from pyaml import PyAMLException from pyaml.configuration import load,set_root_folder from pyaml.configuration import Factory from pyaml.magnet.hcorrector import HCorrector @@ -31,15 +32,16 @@ def test_quad_external_model(install_test_package, config_root_dir): Factory.clear() @pytest.mark.parametrize( - ("magnet_file", "install_test_package"), + ("magnet_file", "test_hardware", "compute_hardware", "install_test_package"), [ - ("sr/quadrupoles/QF1AC01.yaml", None), - ("sr/quadrupoles/QF1AC01-IDENT.yaml", {"name": "tango", "path": "tests/dummy_cs/tango"}), - ("sr/quadrupoles/QF1AC01.json", None), + ("sr/quadrupoles/QF1AC01.yaml", True, True, None), + ("sr/quadrupoles/QF1AC01-IDENT.yaml", False, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01-IDENT-HW.yaml", True, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01.json", True, True, None), ], indirect=["install_test_package"], ) -def test_quad_linear(magnet_file, install_test_package, config_root_dir): +def test_quad_linear(magnet_file, test_hardware, compute_hardware, install_test_package, config_root_dir): set_root_folder(config_root_dir) cfg_quad = load(magnet_file) print(f"Current file: {config_root_dir}/{magnet_file}") @@ -49,14 +51,22 @@ def test_quad_linear(magnet_file, install_test_package, config_root_dir): ref_quad = quad.attach(strength,hardware) ref_quad.model.set_magnet_rigidity(6e9 / speed_of_light) ref_quad.strength.set(0.7962) - current = ref_quad.hardware.get() - assert( np.abs(current-80.423276) < 1e-4 ) + if test_hardware: + current = ref_quad.hardware.get() + print(current) + assert( np.abs(current-80.423276) < 1e-4 ) + hunit = ref_quad.hardware.unit() + assert( hunit == "A" ) + if compute_hardware: + strength = ref_quad.model.compute_strengths([current]) + else: + strength = ref_quad.model.get_strengths() + else: + strength = ref_quad.model.get_strengths() + sunit = ref_quad.strength.unit() - hunit = ref_quad.hardware.unit() assert( sunit == "1/m" ) - assert( hunit == "A" ) - str = ref_quad.model.compute_strengths([current]) - assert( np.abs(str-0.7962) < 1e-6 ) + assert( np.abs(strength-0.7962) < 1e-6 ) Factory.clear() @pytest.mark.parametrize("magnet_file", [ From 3fb4ed008dddaf25216b8769bb83f127d50eaea8 Mon Sep 17 00:00:00 2001 From: guillaumepichon Date: Thu, 2 Oct 2025 15:56:19 +0200 Subject: [PATCH 4/6] New tests. For now, they don't end well... --- tests/config/sr-ident-hw-only.yaml | 21 ++++++++ tests/config/sr-ident-strgth-only.yaml | 21 ++++++++ ...T-HW.yaml => QF1AC01-IDENT-HW-STRGTH.yaml} | 0 ...1-IDENT.yaml => QF1AC01-IDENT-STRGTH.yaml} | 0 tests/test_load_quad.py | 54 +++++++++++++++++-- 5 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 tests/config/sr-ident-hw-only.yaml create mode 100644 tests/config/sr-ident-strgth-only.yaml rename tests/config/sr/quadrupoles/{QF1AC01-IDENT-HW.yaml => QF1AC01-IDENT-HW-STRGTH.yaml} (100%) rename tests/config/sr/quadrupoles/{QF1AC01-IDENT.yaml => QF1AC01-IDENT-STRGTH.yaml} (100%) diff --git a/tests/config/sr-ident-hw-only.yaml b/tests/config/sr-ident-hw-only.yaml new file mode 100644 index 00000000..8e5ae9f8 --- /dev/null +++ b/tests/config/sr-ident-hw-only.yaml @@ -0,0 +1,21 @@ +type: pyaml.pyaml +instruments: + - type: pyaml.instrument + name: sr + energy: 6e9 + simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design + data_folder: /data/store + devices: + - type: pyaml.magnet.quadrupole + name: QF1A-C01 + model: + type: pyaml.magnet.identity_model + unit: A + powerconverter: + type: pyaml.control.device + setpoint: srmag/vps-qf1/c05-a/current + readback: srmag/vps-qf1/c05-a/current + unit: A diff --git a/tests/config/sr-ident-strgth-only.yaml b/tests/config/sr-ident-strgth-only.yaml new file mode 100644 index 00000000..365f81db --- /dev/null +++ b/tests/config/sr-ident-strgth-only.yaml @@ -0,0 +1,21 @@ +type: pyaml.pyaml +instruments: + - type: pyaml.instrument + name: sr + energy: 6e9 + simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design + data_folder: /data/store + devices: + - type: pyaml.magnet.quadrupole + name: QF1A-C01 + model: + type: pyaml.magnet.identity_model + unit: 1/m + physics: + type: pyaml.control.device + setpoint: srmag/m-qf1/c05-a/strength + readback: srmag/m-qf1/c05-a/strength + unit: 1/m diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml similarity index 100% rename from tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml rename to tests/config/sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT-STRGTH.yaml similarity index 100% rename from tests/config/sr/quadrupoles/QF1AC01-IDENT.yaml rename to tests/config/sr/quadrupoles/QF1AC01-IDENT-STRGTH.yaml diff --git a/tests/test_load_quad.py b/tests/test_load_quad.py index 490ef5db..ff8220c7 100644 --- a/tests/test_load_quad.py +++ b/tests/test_load_quad.py @@ -4,13 +4,16 @@ from scipy.constants import speed_of_light from pyaml import PyAMLException -from pyaml.configuration import load,set_root_folder +from pyaml.configuration import load, set_root_folder, get_root_folder from pyaml.configuration import Factory +from pyaml.lattice.element_holder import MagnetType from pyaml.magnet.hcorrector import HCorrector from pyaml.magnet.quadrupole import Quadrupole from pyaml.magnet.quadrupole import ConfigModel as QuadrupoleConfigModel from pyaml.magnet.cfm_magnet import CombinedFunctionMagnet from pyaml.control.abstract_impl import RWHardwareScalar,RWStrengthScalar,RWHardwareArray,RWStrengthArray +from pyaml.pyaml import pyaml,PyAML +from pyaml.instrument import Instrument # TODO: Generate JSON pydantic schema for MetaConfigurator #def test_json(): @@ -35,8 +38,8 @@ def test_quad_external_model(install_test_package, config_root_dir): ("magnet_file", "test_hardware", "compute_hardware", "install_test_package"), [ ("sr/quadrupoles/QF1AC01.yaml", True, True, None), - ("sr/quadrupoles/QF1AC01-IDENT.yaml", False, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), - ("sr/quadrupoles/QF1AC01-IDENT-HW.yaml", True, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01-IDENT-STRGTH.yaml", False, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml", True, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), ("sr/quadrupoles/QF1AC01.json", True, True, None), ], indirect=["install_test_package"], @@ -69,6 +72,51 @@ def test_quad_linear(magnet_file, test_hardware, compute_hardware, install_test_ assert( np.abs(strength-0.7962) < 1e-6 ) Factory.clear() + +@pytest.mark.parametrize( + "magnet_file", + [ + "sr-ident-hw-only.yaml", + ]) +def test_quad_ident_hardware_only(magnet_file, config_root_dir): + set_root_folder(config_root_dir) + ml:PyAML = pyaml(get_root_folder() / magnet_file) + sr:Instrument = ml.get('sr') + with pytest.raises(PyAMLException): + sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) # throw an exception + with pytest.raises(PyAMLException): + sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(82) # throw an exception + + with pytest.raises(PyAMLException): + sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) # throw an exception + sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(82) # Write 82A + + with pytest.raises(PyAMLException): + sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").strength.unit() # throw an exception + assert sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").hardware.unit() == "A" # A + + +@pytest.mark.parametrize( + "magnet_file", + [ + "sr-ident-strgth-only.yaml", + ]) +def test_quad_ident_strength_only(magnet_file, config_root_dir): + set_root_folder(config_root_dir) + ml:PyAML = pyaml(get_root_folder() / magnet_file) + sr:Instrument = ml.get('sr') + # 2 following lines are equivalent + sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) + sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(0.000010) + + # 2 following lines are equivalent + sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) + sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(0.000010) + + assert sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").strength.unit() == "1/m" + assert sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").hardware.unit() == "1/m" + + @pytest.mark.parametrize("magnet_file", [ "sr/correctors/SH1AC01.yaml", ]) From 88374c8777387df7a465d70c19e9df1acd149e61 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 9 Oct 2025 17:39:51 +0200 Subject: [PATCH 5/6] Mods for identity magnet models --- pyaml/arrays/magnet_array.py | 2 +- pyaml/control/abstract_impl.py | 33 ++++--- pyaml/control/controlsystem.py | 8 +- pyaml/lattice/simulator.py | 9 +- pyaml/magnet/cfm_magnet.py | 2 +- pyaml/magnet/identity_cfm_model.py | 83 ++++++++--------- pyaml/magnet/identity_model.py | 70 ++++++--------- pyaml/magnet/linear_cfm_model.py | 2 +- pyaml/magnet/model.py | 21 ++--- .../quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml | 17 ---- .../sr/quadrupoles/QF1AC01-IDENT-HW.yaml | 11 +++ .../pyaml_external/external_magnet_model.py | 2 +- tests/test_load_quad.py | 89 ++++++------------- 13 files changed, 147 insertions(+), 202 deletions(-) delete mode 100644 tests/config/sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml create mode 100644 tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml diff --git a/pyaml/arrays/magnet_array.py b/pyaml/arrays/magnet_array.py index b3fbfc82..bd4c009a 100644 --- a/pyaml/arrays/magnet_array.py +++ b/pyaml/arrays/magnet_array.py @@ -102,7 +102,7 @@ def unit(self) -> list[str]: def set_aggregator(self,agg:DeviceAccessList): self.aggregator = agg for m in self.__magnets: - self.hasHardwareMapping |= m.model.hasHardwareMapping() + self.hasHardwareMapping |= m.model.has_hardware() class MagnetArray(list[Magnet]): """ diff --git a/pyaml/control/abstract_impl.py b/pyaml/control/abstract_impl.py index 8d78da2a..77df4b5c 100644 --- a/pyaml/control/abstract_impl.py +++ b/pyaml/control/abstract_impl.py @@ -12,19 +12,19 @@ class RWHardwareScalar(abstract.ReadWriteFloatScalar): """ def __init__(self, model:MagnetModel): - self.model = model + self.__model = model def get(self) -> float: - return self.model.read_hardware_values()[0] + return self.__model.read_hardware_values()[0] def set(self, value:float): - self.model.send_hardware_values(np.array([value])) + self.__model.send_hardware_values(np.array([value])) def set_and_wait(self, value: double): raise NotImplementedError("Not implemented yet.") def unit(self) -> str: - return self.model.get_hardware_units()[0] + return self.__model.get_hardware_units()[0] def index(self) -> int: return 0 @@ -41,11 +41,13 @@ def __init__(self, model:MagnetModel): # Gets the value def get(self) -> float: - return self.__model.get_strengths()[0] + currents = self.__model.read_hardware_values() + return self.__model.compute_strengths(currents) # Sets the value def set(self, value:float): - self.__model.set_strengths([value]) + current = self.__model.compute_hardware_values([value]) + self.__model.send_hardware_values(current) # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:float): @@ -65,15 +67,15 @@ class RWHardwareArray(abstract.ReadWriteFloatArray): Class providing read write access to a magnet array of a control system (in hardware units) """ def __init__(self, model:MagnetModel): - self.model = model + self.__model = model # Gets the value def get(self) -> np.array: - return self.model.read_hardware_values() + return self.__model.read_hardware_values() # Sets the value def set(self, value:np.array): - self.model.send_hardware_values(value) + self.__model.send_hardware_values(value) # Sets the value and waits that the read value reach the setpoint def set_and_wait(self, value:np.array): @@ -82,7 +84,7 @@ def set_and_wait(self, value:np.array): # Gets the unit of the value def unit(self) -> list[str]: - return self.model.get_hardware_units() + return self.__model.get_hardware_units() #------------------------------------------------------------------------------ @@ -91,15 +93,18 @@ class RWStrengthArray(abstract.ReadWriteFloatArray): Class providing read write access to magnet strengths of a control system """ def __init__(self, model:MagnetModel): - self.model = model + self.__model = model # Gets the value def get(self) -> np.array: - return self.model.get_strengths() + r = self.__model.read_hardware_values() + str = self.__model.compute_strengths(r) + return str # Sets the value def set(self, value:np.array): - self.model.set_strengths(value) + cur = self.__model.compute_hardware_values(value) + self.__model.send_hardware_values(cur) # Sets the value and waits that the read value reach the setpoint def set_and_wait(self, value:np.array): @@ -107,7 +112,7 @@ def set_and_wait(self, value:np.array): # Gets the unit of the value def unit(self) -> list[str]: - return self.model.get_strength_units() + return self.__model.get_strength_units() diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index 5cbbc8f9..6ec5605a 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -46,15 +46,15 @@ def fill_device(self,elements:list[Element]): """ for e in elements: if isinstance(e,Magnet): - current = RWHardwareScalar(e.model) - strength = RWStrengthScalar(e.model) + current = RWHardwareScalar(e.model) if e.model.has_hardware() else None + strength = RWStrengthScalar(e.model) if e.model.has_physics() else None # Create a unique ref for this control system m = e.attach(strength, current) self.add_magnet(str(m),m) elif isinstance(e,CombinedFunctionMagnet): self.add_magnet(str(e),e) - currents = RWHardwareArray(e.model) - strengths = RWStrengthArray(e.model) + currents = RWHardwareArray(e.model) if e.model.has_hardware() else None + strengths = RWStrengthArray(e.model) if e.model.has_physics() else None # Create unique refs of each function for this control system ms = e.attach(strengths,currents) for m in ms: diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index 4056a534..33bad69c 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -52,16 +52,17 @@ def set_energy(self,E:float): def fill_device(self,elements:list[Element]): for e in elements: + # Need conversion to physics unit to work with simulator if isinstance(e,Magnet): - current = RWHardwareScalar(self.get_at_elems(e.name),e.polynom,e.model) - strength = RWStrengthScalar(self.get_at_elems(e.name),e.polynom,e.model) + current = RWHardwareScalar(self.get_at_elems(e.name),e.polynom,e.model) if e.model.has_physics() else None + strength = RWStrengthScalar(self.get_at_elems(e.name),e.polynom,e.model) if e.model.has_physics() else None # Create a unique ref for this simulator m = e.attach(strength,current) self.add_magnet(str(m),m) elif isinstance(e,CombinedFunctionMagnet): self.add_magnet(str(e),e) - currents = RWHardwareArray(self.get_at_elems(e.name),e.polynoms,e.model) - strengths = RWStrengthArray(self.get_at_elems(e.name),e.polynoms,e.model) + currents = RWHardwareArray(self.get_at_elems(e.name),e.polynoms,e.model) if e.model.has_physics() else None + strengths = RWStrengthArray(self.get_at_elems(e.name),e.polynoms,e.model) if e.model.has_physics() else None # Create unique refs of each function for this simulator ms = e.attach(strengths,currents) for m in ms: diff --git a/pyaml/magnet/cfm_magnet.py b/pyaml/magnet/cfm_magnet.py index 67f325bc..d279fd34 100644 --- a/pyaml/magnet/cfm_magnet.py +++ b/pyaml/magnet/cfm_magnet.py @@ -68,7 +68,7 @@ def attach(self, strengths: abstract.ReadWriteFloatArray, hardwares: abstract.Re args = {"name":m[1]} mclass:Magnet = _fmap[m[0]](ElementConfigModel(**args)) strength = RWMapper(strengths,idx) - hardware = RWMapper(hardwares,idx) if self.model.hasHardwareMapping() else None + hardware = RWMapper(hardwares,idx) if self.model.has_hardware() else None l.append(mclass.attach(strength,hardware)) return l diff --git a/pyaml/magnet/identity_cfm_model.py b/pyaml/magnet/identity_cfm_model.py index 518fd26a..6583993b 100644 --- a/pyaml/magnet/identity_cfm_model.py +++ b/pyaml/magnet/identity_cfm_model.py @@ -15,7 +15,7 @@ class ConfigModel(BaseModel): multipoles: list[str] """List of supported functions: A0,B0,A1,B1,etc (i.e. [B0,A1,B2])""" - powerconverters: list[DeviceAccess]|None = None + powerconverters: list[DeviceAccess] | None = None """Power converter device to apply current""" physics: list[DeviceAccess] | None = None """Magnet device to apply strength""" @@ -24,71 +24,66 @@ class ConfigModel(BaseModel): class IdentityCFMagnetModel(MagnetModel): """ - Class that handle magnet current/strength direct access for a single function magnet + Class that map values to underlying devices without conversion """ def __init__(self, cfg: ConfigModel): self._cfg = cfg - self.__strength_unit = cfg.unit - self.__ps = None - self.__hardware_unit = None - self.__physics = None - self.__physics_unit = None - if cfg.powerconverters is not None: - self.__ps = cfg.powerconverters - self.__hardware_unit = [powerconverter.unit() for powerconverter in cfg.powerconverters] - if cfg.physics is not None: - self.__physics = cfg.physics - self.__physics_unit = [magnet.unit() for magnet in cfg.physics] + + # Check config + self.__nbFunction: int = len(cfg.multipoles) + + if cfg.physics is None and cfg.powerconverter is None: + raise Exception("Invalid IdentityCFMagnetModel configuration, physics or powerconverters device required") + if cfg.physics is not None and cfg.powerconverter is not None: + raise Exception("Invalid IdentityCFMagnetModel configuration, physics or powerconverters device required but not both") + if cfg.physics: + self.__devices = cfg.physics + else: + self.__devices = cfg.powerconverter + + self.__nbDev: int = len(self.__devices) + + self.__check_len(cfg.units,"units",self.__nbFunction) + + def __check_len(self,obj,name,expected_len): + lgth = len(obj) + if lgth != expected_len: + raise Exception( + f"{name} does not have the expected " + f"number of items ({expected_len} items expected but got {lgth})" + ) def compute_hardware_values(self, strengths: np.array) -> np.array: - raise PyAMLException("The identity model does not support computation") + return strengths def compute_strengths(self, currents: np.array) -> np.array: - raise PyAMLException("The identity model does not support computation") + return currents def get_strength_units(self) -> list[str]: - return [self.__physics_unit] if self.__physics_unit is not None else [""] + return self._cfg.units def get_hardware_units(self) -> list[str]: - return [p.unit() for p in self._cfg.powerconverters] if self.__hardware_unit is not None else [""] + return self._cfg.units def read_hardware_values(self) -> np.array: - if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports hardware values") - return [self.__ps.get()] + return np.array([p.get() for p in self.__devices]) def readback_hardware_values(self) -> np.array: - if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports hardware values") - return [ps.readback() for ps in self.__ps] + return np.array([p.readback() for p in self.__devices]) def send_hardware_values(self, currents: np.array): - if self.__ps is None: - raise PyAMLException(f"{str(self)} does not hardware hardware values") - [ps.set(currents) for ps in self.__ps] + for idx, p in enumerate(self.__devices): + p.set(currents[idx]) def get_devices(self) -> list[DeviceAccess]: - devices = [] - if self.__ps is not None: - devices.extend(self.__ps) - if self.__physics is not None: - devices.extend(self.__physics) - return devices - - def __repr__(self): - return f"{self.__class__.__name__}(identity {("Magnet" if self.__physics is not None else "Power supply")}, unit={self.__strength_unit})" + return self.__devices def set_magnet_rigidity(self, brho: np.double): pass - def get_strengths(self) -> np.array: - if self.__physics is None: - raise PyAMLException(f"{str(self)} does not supports physics values") - return np.array([np.float64(magnet.get()) for magnet in self.__physics]) + def has_physics_mapping(self) -> bool: + return self._cfg.physics is not None - def set_strengths(self, values:list[float]): - if self.__physics is None: - raise PyAMLException(f"{str(self)} does not supports physics values") - for value, magnet in zip(values, self.__physics): - magnet.set(value) + def has_hardware(self) -> bool: + return self._cfg.powerconverters is not None diff --git a/pyaml/magnet/identity_model.py b/pyaml/magnet/identity_model.py index 3ef33f97..599d446a 100644 --- a/pyaml/magnet/identity_model.py +++ b/pyaml/magnet/identity_model.py @@ -22,70 +22,56 @@ class ConfigModel(BaseModel): class IdentityMagnetModel(MagnetModel): """ - Class that handle magnet current/strength direct access for a single function magnet + Class that map value to underlying device without conversion """ def __init__(self, cfg: ConfigModel): self._cfg = cfg - self.__strength_unit = cfg.unit - self.__ps = None - self.__hardware_unit = None - self.__physics = None - self.__physics_unit = None - if cfg.powerconverter is not None: - self.__ps = cfg.powerconverter - self.__hardware_unit = cfg.powerconverter.unit() - if cfg.physics is not None: - self.__physics = cfg.physics - self.__physics_unit = cfg.physics.unit() + self.__unit = cfg.unit + if cfg.physics is None and cfg.powerconverter is None: + raise Exception("Invalid IdentityMagnetModel configuration, physics or powerconverter device required") + if cfg.physics is not None and cfg.powerconverter is not None: + raise Exception("Invalid IdentityMagnetModel configuration, physics or powerconverter device required but not both") + if cfg.physics: + self.__device = cfg.physics + else: + self.__device = cfg.powerconverter def compute_hardware_values(self, strengths: np.array) -> np.array: - raise PyAMLException("The identity model does not support computation") + return strengths def compute_strengths(self, currents: np.array) -> np.array: - raise PyAMLException("The identity model does not support computation") + return currents def get_strength_units(self) -> list[str]: - return [self.__physics_unit] if self.__physics_unit is not None else [""] + return [self.__unit] def get_hardware_units(self) -> list[str]: - return [self.__hardware_unit] if self.__hardware_unit is not None else [""] + return [self.__unit] def read_hardware_values(self) -> np.array: - if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports hardware values") - return [self.__ps.get()] + return [self.__device.get()] def readback_hardware_values(self) -> np.array: - if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports hardware values") - return [self.__ps.readback()] + return [self.__device.readback()] def send_hardware_values(self, currents: np.array): - if self.__ps is None: - raise PyAMLException(f"{str(self)} does not supports hardware values") - self.__ps.set(currents[0]) + self.__device.set(currents[0]) def get_devices(self) -> list[DeviceAccess]: - devices = [] - if self.__ps is not None: - devices.append(self.__ps) - if self.__physics is not None: - devices.append(self.__physics) - return devices - - def __repr__(self): - return f"{self.__class__.__name__}(identity {("Magnet" if self.__physics is not None else "Power supply")}, unit={self.__strength_unit})" + return [self.__device] def set_magnet_rigidity(self, brho: np.double): pass - def get_strengths(self) -> np.array: - if self.__physics is None: - raise PyAMLException(f"{str(self)} does not supports physics values") - return np.array([np.float64(self.__physics.get())]) + def has_physics(self) -> bool: + return self._cfg.physics is not None - def set_strengths(self, values:list[float]): - if self.__physics is None: - raise PyAMLException(f"{str(self)} does not supports physics values") - self.__physics.set(values[0]) + def has_hardware(self) -> bool: + return self._cfg.powerconverter is not None + + def __repr__(self): + return "%s(unit=%s)" % ( + self.__class__.__name__, + self.__unit, + ) diff --git a/pyaml/magnet/linear_cfm_model.py b/pyaml/magnet/linear_cfm_model.py index d3e78933..6daf3b1a 100644 --- a/pyaml/magnet/linear_cfm_model.py +++ b/pyaml/magnet/linear_cfm_model.py @@ -159,6 +159,6 @@ def get_devices(self) -> list[DeviceAccess]: def set_magnet_rigidity(self, brho: np.double): self._brho = brho - def hasHardwareMapping(self) -> bool: + def has_hardware(self) -> bool: return (self.__nbPS == self.__nbFunction) and np.allclose(self.__matrix, np.eye(self.__nbFunction)) diff --git a/pyaml/magnet/model.py b/pyaml/magnet/model.py index a1eb6644..4df7192b 100644 --- a/pyaml/magnet/model.py +++ b/pyaml/magnet/model.py @@ -127,7 +127,7 @@ def set_magnet_rigidity(self, brho: np.double): pass - def hasHardwareMapping(self) -> bool: + def has_hardware(self) -> bool: """ Tells if the model allows to work in hardware unit. @@ -137,13 +137,14 @@ def hasHardwareMapping(self) -> bool: True if the model supports hardware unit """ return True + + def has_physics(self) -> bool: + """ + Tells if the model allows to work in physics unit. - # Gets the value - def get_strengths(self) -> npt.NDArray[np.float64]: - currents = self.read_hardware_values() - return self.compute_strengths(currents) - - # Sets the value - def set_strengths(self, values:list[float]): - current = self.compute_hardware_values(np.array(values)) - self.send_hardware_values(current) + Returns + ---------- + bool + True if the model supports physics unit + """ + return True \ No newline at end of file diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml deleted file mode 100644 index a4885bcd..00000000 --- a/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml +++ /dev/null @@ -1,17 +0,0 @@ -type: pyaml.magnet.quadrupole -name: QF1A-C01 -model: - type: pyaml.magnet.identity_model - unit: 1/m - physics: - type: tango.pyaml.attribute_with_tango_powersupply_mocking_behaviour - attribute: sr/ps-qf1/c01-a/strength - unit: 1/m - powerconverter: - type: tango.pyaml.attribute_with_tango_powersupply_mocking_behaviour - attribute: sr/ps-qf1/c01-a/current - calibration_factor: 1.00504 - calibration_offset: 0.0 - magnet_energy: 6e9 - curve: sr/magnet_models/QF1_strength.csv - unit: A diff --git a/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml b/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml new file mode 100644 index 00000000..0532f794 --- /dev/null +++ b/tests/config/sr/quadrupoles/QF1AC01-IDENT-HW.yaml @@ -0,0 +1,11 @@ +type: pyaml.magnet.quadrupole +name: QF1A-C01 +model: + type: pyaml.magnet.identity_model + unit: A + powerconverter: + type: pyaml.control.device + setpoint: sr/ps-qf1/c01-a/strength + readback: sr/ps-qf1/c01-a/strength + unit: 1/m + diff --git a/tests/external/pyaml_external/external_magnet_model.py b/tests/external/pyaml_external/external_magnet_model.py index 69ba69e9..162221ec 100644 --- a/tests/external/pyaml_external/external_magnet_model.py +++ b/tests/external/pyaml_external/external_magnet_model.py @@ -73,7 +73,7 @@ def send_hardware_values(self, currents:np.array): def get_devices(self) -> list[DeviceAccess]: return [self._ps,self.id] - def hasHardwareMapping(self) -> bool: + def has_hardware(self) -> bool: # No trivial conversion between strength and hardware unit return False diff --git a/tests/test_load_quad.py b/tests/test_load_quad.py index ff8220c7..d6ccfa06 100644 --- a/tests/test_load_quad.py +++ b/tests/test_load_quad.py @@ -10,6 +10,7 @@ from pyaml.magnet.hcorrector import HCorrector from pyaml.magnet.quadrupole import Quadrupole from pyaml.magnet.quadrupole import ConfigModel as QuadrupoleConfigModel +from pyaml.magnet.identity_model import IdentityMagnetModel from pyaml.magnet.cfm_magnet import CombinedFunctionMagnet from pyaml.control.abstract_impl import RWHardwareScalar,RWStrengthScalar,RWHardwareArray,RWStrengthArray from pyaml.pyaml import pyaml,PyAML @@ -35,88 +36,50 @@ def test_quad_external_model(install_test_package, config_root_dir): Factory.clear() @pytest.mark.parametrize( - ("magnet_file", "test_hardware", "compute_hardware", "install_test_package"), + ("magnet_file", "install_test_package"), [ - ("sr/quadrupoles/QF1AC01.yaml", True, True, None), - ("sr/quadrupoles/QF1AC01-IDENT-STRGTH.yaml", False, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), - ("sr/quadrupoles/QF1AC01-IDENT-HW-STRGTH.yaml", True, False, {"name": "tango", "path": "tests/dummy_cs/tango"}), - ("sr/quadrupoles/QF1AC01.json", True, True, None), + ("sr/quadrupoles/QF1AC01.yaml", None), + ("sr/quadrupoles/QF1AC01-IDENT-STRGTH.yaml", {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01-IDENT-HW.yaml", {"name": "tango", "path": "tests/dummy_cs/tango"}), + ("sr/quadrupoles/QF1AC01.json", None), ], indirect=["install_test_package"], ) -def test_quad_linear(magnet_file, test_hardware, compute_hardware, install_test_package, config_root_dir): +def test_quad_linear(magnet_file, install_test_package, config_root_dir): set_root_folder(config_root_dir) cfg_quad = load(magnet_file) print(f"Current file: {config_root_dir}/{magnet_file}") quad:Quadrupole = Factory.depth_first_build(cfg_quad) - strength = RWStrengthScalar(quad.model) - hardware = RWHardwareScalar(quad.model) + hardware = RWHardwareScalar(quad.model) if quad.model.has_hardware() else None + strength = RWStrengthScalar(quad.model) if quad.model.has_physics() else None ref_quad = quad.attach(strength,hardware) ref_quad.model.set_magnet_rigidity(6e9 / speed_of_light) - ref_quad.strength.set(0.7962) - if test_hardware: + + try: + ref_quad.strength.set(0.7962) + sunit = ref_quad.strength.unit() + assert( sunit == "1/m" ) + except Exception as ex: + if not quad.model.has_physics(): + assert( "has no model that supports physics units" in str(ex) ) + ref_quad.hardware.set(80.423276) + else: + raise ex + + try: current = ref_quad.hardware.get() - print(current) assert( np.abs(current-80.423276) < 1e-4 ) hunit = ref_quad.hardware.unit() assert( hunit == "A" ) - if compute_hardware: - strength = ref_quad.model.compute_strengths([current]) + except Exception as ex: + if not quad.model.has_hardware(): + assert( "has no model that supports hardware units" in str(ex) ) else: - strength = ref_quad.model.get_strengths() - else: - strength = ref_quad.model.get_strengths() + raise ex - sunit = ref_quad.strength.unit() - assert( sunit == "1/m" ) - assert( np.abs(strength-0.7962) < 1e-6 ) Factory.clear() -@pytest.mark.parametrize( - "magnet_file", - [ - "sr-ident-hw-only.yaml", - ]) -def test_quad_ident_hardware_only(magnet_file, config_root_dir): - set_root_folder(config_root_dir) - ml:PyAML = pyaml(get_root_folder() / magnet_file) - sr:Instrument = ml.get('sr') - with pytest.raises(PyAMLException): - sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) # throw an exception - with pytest.raises(PyAMLException): - sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(82) # throw an exception - - with pytest.raises(PyAMLException): - sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) # throw an exception - sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(82) # Write 82A - - with pytest.raises(PyAMLException): - sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").strength.unit() # throw an exception - assert sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").hardware.unit() == "A" # A - - -@pytest.mark.parametrize( - "magnet_file", - [ - "sr-ident-strgth-only.yaml", - ]) -def test_quad_ident_strength_only(magnet_file, config_root_dir): - set_root_folder(config_root_dir) - ml:PyAML = pyaml(get_root_folder() / magnet_file) - sr:Instrument = ml.get('sr') - # 2 following lines are equivalent - sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) - sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(0.000010) - - # 2 following lines are equivalent - sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").strength.set(0.000010) - sr.live.get_magnet(MagnetType.QUADRUPOLE, "QF1A-C01").hardware.set(0.000010) - - assert sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").strength.unit() == "1/m" - assert sr.design.get_magnet(MagnetType.QUADRUPOLE, "QF1E-C04").hardware.unit() == "1/m" - - @pytest.mark.parametrize("magnet_file", [ "sr/correctors/SH1AC01.yaml", ]) From 126032e5f6ecc0ab1ea7dc33d50586c3d7427a8b Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 9 Oct 2025 17:55:29 +0200 Subject: [PATCH 6/6] Typo --- pyaml/magnet/identity_cfm_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyaml/magnet/identity_cfm_model.py b/pyaml/magnet/identity_cfm_model.py index 6583993b..7328f4b2 100644 --- a/pyaml/magnet/identity_cfm_model.py +++ b/pyaml/magnet/identity_cfm_model.py @@ -82,7 +82,7 @@ def get_devices(self) -> list[DeviceAccess]: def set_magnet_rigidity(self, brho: np.double): pass - def has_physics_mapping(self) -> bool: + def has_physics(self) -> bool: return self._cfg.physics is not None def has_hardware(self) -> bool: