From 9245ca53a09dc0fc90bea564e38a302c350d584c Mon Sep 17 00:00:00 2001 From: PONS Date: Wed, 5 Nov 2025 14:49:53 +0100 Subject: [PATCH 1/9] Added Element array + fix in array handling --- pyaml/arrays/bpm_array.py | 12 ++-- pyaml/arrays/element.py | 15 +++++ pyaml/arrays/element_array.py | 45 +++++++++++++ pyaml/arrays/magnet_array.py | 12 ++-- pyaml/common/element_holder.py | 92 +++++++++++++------------- pyaml/configuration/factory.py | 3 +- pyaml/control/controlsystem.py | 20 +++--- pyaml/lattice/simulator.py | 21 +++--- tests/config/bad_conf_duplicate_1.yaml | 26 ++++++++ tests/config/bad_conf_duplicate_2.yaml | 59 +++++++++++++++++ tests/config/bad_conf_duplicate_3.yaml | 70 ++++++++++++++++++++ tests/config/sr.yaml | 7 ++ tests/test_arrays.py | 5 +- tests/test_errors.py | 48 ++++++++++++++ tests/test_rf.py | 13 +++- 15 files changed, 365 insertions(+), 83 deletions(-) create mode 100644 pyaml/arrays/element.py create mode 100644 pyaml/arrays/element_array.py create mode 100644 tests/config/bad_conf_duplicate_1.yaml create mode 100644 tests/config/bad_conf_duplicate_2.yaml create mode 100644 tests/config/bad_conf_duplicate_3.yaml create mode 100644 tests/test_errors.py diff --git a/pyaml/arrays/bpm_array.py b/pyaml/arrays/bpm_array.py index f47145d8..508b475c 100644 --- a/pyaml/arrays/bpm_array.py +++ b/pyaml/arrays/bpm_array.py @@ -1,7 +1,7 @@ from ..common.abstract import ReadFloatArray from ..bpm.bpm import BPM from ..control.deviceaccesslist import DeviceAccessList -from ..common.exception import PyAMLException +from .element_array import get_peer_from_array import numpy as np @@ -72,13 +72,10 @@ def __init__(self,arrayName:str,bpms:list[BPM],use_aggregator = True): use_aggregator : bool Use aggregator to increase performance by using paralell access to underlying devices. """ - super().__init__(i for i in bpms) - holder = bpms[0]._peer if len(bpms)>0 else None - if holder is None or any([m._peer!=holder for m in bpms]): - raise PyAMLException(f"BPMArray {arrayName} : All elements must be attached to the same instance of either a Simulator or a ControlSystem") - super().__init__(i for i in bpms) self.__name = arrayName + holder = get_peer_from_array(self) + self.__hvpos = RWBPMPosition(arrayName,bpms) self.__hpos = RWBPMSinglePosition(arrayName,bpms,0) self.__vpos = RWBPMSinglePosition(arrayName,bpms,1) @@ -89,6 +86,9 @@ def __init__(self,arrayName:str,bpms:list[BPM],use_aggregator = True): self.__hpos.set_aggregator(aggs[1]) self.__vpos.set_aggregator(aggs[2]) + def get_name(self) -> str: + return self.__name + @property def positions(self) -> RWBPMPosition: """ diff --git a/pyaml/arrays/element.py b/pyaml/arrays/element.py new file mode 100644 index 00000000..0d4ced0f --- /dev/null +++ b/pyaml/arrays/element.py @@ -0,0 +1,15 @@ +from .array import ArrayConfigModel,ArrayConfig +from ..common.element_holder import ElementHolder + +# Define the main class name for this module +PYAMLCLASS = "Element" + +class ConfigModel(ArrayConfigModel):... + +class Element(ArrayConfig): + + def __init__(self, cfg: ArrayConfigModel): + super().__init__(cfg) + + def fill_array(self,holder:ElementHolder): + holder.fill_element_array(self._cfg.name,self._cfg.elements) diff --git a/pyaml/arrays/element_array.py b/pyaml/arrays/element_array.py new file mode 100644 index 00000000..baf9f3f5 --- /dev/null +++ b/pyaml/arrays/element_array.py @@ -0,0 +1,45 @@ +from ..common.element import Element +from ..common.exception import PyAMLException + +def get_peer_from_array(array): + """ + Returns the peer (Simulator or ControlSystem) of an element list + """ + peer = array[0]._peer if len(array)>0 else None + if peer is None or any([m._peer!=peer for m in array]): + raise PyAMLException(f"{array.__class__.__name__} {array.get_name()}: All elements must be attached to the same instance of either a Simulator or a ControlSystem") + return peer + +class ElementArray(list[Element]): + """ + Class that implements access to a magnet array + """ + + def __init__(self,arrayName:str,elements:list[Element]): + """ + Construct an element array + + Parameters + ---------- + arrayName : str + Array name + elements: list[Element] + Element list, all elements must be attached to the same instance of + either a Simulator or a ControlSystem. + """ + super().__init__(i for i in elements) + self.__name = arrayName + holder = get_peer_from_array(self) + self.__name = arrayName + + def get_name(self) -> str: + return self.__name + + def names(self) -> list[str]: + return [e.get_name() for e in self] + + + + + + \ No newline at end of file diff --git a/pyaml/arrays/magnet_array.py b/pyaml/arrays/magnet_array.py index 63540625..288870b1 100644 --- a/pyaml/arrays/magnet_array.py +++ b/pyaml/arrays/magnet_array.py @@ -1,8 +1,7 @@ from ..common.abstract import ReadWriteFloatArray from ..magnet.magnet import Magnet from ..common.abstract_aggregator import ScalarAggregator -from ..common.exception import PyAMLException - +from .element_array import get_peer_from_array import numpy as np class RWMagnetStrength(ReadWriteFloatArray): @@ -97,11 +96,9 @@ def __init__(self,arrayName:str,magnets:list[Magnet],use_aggregator = True): Use aggregator to increase performance by using paralell access to underlying devices. """ super().__init__(i for i in magnets) - holder = magnets[0]._peer if len(magnets)>0 else None - if holder is None or any([m._peer!=holder for m in magnets]): - raise PyAMLException(f"MagnetArray {arrayName} : All elements must be attached to the same instance of either a Simulator or a ControlSystem") - self.__name = arrayName + holder = get_peer_from_array(self) + self.__rwstrengths = RWMagnetStrength(arrayName,magnets) self.__rwhardwares = RWMagnetHardware(arrayName,magnets) @@ -111,6 +108,9 @@ def __init__(self,arrayName:str,magnets:list[Magnet],use_aggregator = True): self.__rwstrengths.set_aggregator(aggs) self.__rwhardwares.set_aggregator(aggh) + def get_name(self) -> str: + return self.__name + @property def strengths(self) -> RWMagnetStrength: """ diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index f8aea27b..7aab8460 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -7,6 +7,7 @@ from ..rf.rf_transmitter import RFTransmitter from ..arrays.magnet_array import MagnetArray from ..arrays.bpm_array import BPMArray +from ..arrays.element_array import ElementArray from ..common.exception import PyAMLException from ..diagnostics.tune_monitor import BetatronTuneMonitor @@ -21,12 +22,13 @@ def __init__(self): self.__BPMS: dict = {} self.__RFPLANT: dict = {} self.__RFTRANSMITTER: dict = {} - self.__OTHERS: dict = {} self.__DIAG: dict = {} + self.__ALL: dict = {} # Array handle self.__MAGNET_ARRAYS: dict = {} self.__BPM_ARRAYS: dict = {} + self.__ELEMENT_ARRAYS: dict = {} def fill_device(self,elements:list[Element]): raise "ElementHolder.fill_device() is not subclassed" @@ -41,80 +43,80 @@ def fill_array(self,arrayName:str,elementNames:list[str],get_func,constructor,AR if m in a: raise PyAMLException(f"{constructor.__name__} {arrayName} : duplicate name {name} @index {len(a)}") from None a.append(m) - ARR[arrayName] = constructor(arrayName,a,self) + ARR[arrayName] = constructor(arrayName,a) + + + def __add(self,array,element:Element): + if element.get_name() in self.__ALL: # Ensure name unicity + raise PyAMLException(f"Duplicate element {element.__class__.__name__} name {element.get_name()}") from None + array[element.get_name()] = element + self.__ALL[element.get_name()] = element + + def __get(self,what,name,array) -> Element: + if name not in array: + raise PyAMLException(f"{what} {name} not defined") + return array[name] + + # Generic elements + def fill_element_array(self,arrayName:str,elementNames:list[str]): + self.fill_array(arrayName,elementNames,self.get_element,ElementArray,self.__ELEMENT_ARRAYS) + + def get_element(self,name:str) -> Element: + return self.__get("Element",name,self.__ALL) + + def get_elemens(self,name:str) -> ElementArray: + return self.__get("Element array",name,self.__ELEMENT_ARRAYS) + + def get_all_elements(self) -> list[Element]: + return [value for key, value in self.__ALL.items()] # Magnets def fill_magnet_array(self,arrayName:str,elementNames:list[str]): - self.fill_array(arrayName,elementNames,self.get_magnet,MagnetArray,self.__MAGNET_ARRAYS) + self.fill_array(arrayName,elementNames,self.get_magnet,MagnetArray,self.__MAGNET_ARRAYS) def get_magnet(self,name:str) -> Magnet: - if name not in self.__MAGNETS: - raise PyAMLException(f"Magnet {name} not defined") - return self.__MAGNETS[name] + return self.__get("Magnet",name,self.__MAGNETS) - def add_magnet(self,name:str,m:Magnet): - if name in self.__MAGNETS: - print(self.__MAGNETS) - raise PyAMLException(f"Duplicate magnet name {name}") from None - self.__MAGNETS[name] = m + def add_magnet(self,m:Magnet): + self.__add(self.__MAGNETS,m) def get_magnets(self,name:str) -> MagnetArray: - if name not in self.__MAGNET_ARRAYS: - raise PyAMLException(f"Magnet array {name} not defined") - return self.__MAGNET_ARRAYS[name] + return self.__get("Magnet array",name,self.__MAGNET_ARRAYS) - def get_all_magnets(self) -> dict: - return self.__MAGNETS - # BPMs def fill_bpm_array(self,arrayName:str,elementNames:list[str]): self.fill_array(arrayName,elementNames,self.get_bpm,BPMArray,self.__BPM_ARRAYS) def get_bpm(self,name:str) -> Element: - if name not in self.__BPMS: - raise PyAMLException(f"BPM {name} not defined") - return self.__BPMS[name] + return self.__get("BPM",name,self.__BPMS) - def add_bpm(self,name:str,bpm:Element): - self.__BPMS[name] = bpm + def add_bpm(self,bpm:Element): + self.__add(self.__BPMS,bpm) def get_bpms(self,name:str) -> BPMArray: - if name not in self.__BPM_ARRAYS: - raise PyAMLException(f"BPM array {name} not defined") - return self.__BPM_ARRAYS[name] + return self.__get("BPM array",name,self.__BPM_ARRAYS) # RF def get_rf_plant(self,name:str) -> RFPlant: - if name not in self.__RFPLANT: - raise PyAMLException(f"RFPlant {name} not defined") - return self.__RFPLANT[name] + return self.__get("RFPlant",name,self.__RFPLANT) - def add_rf_plant(self,name:str,rf:RFPlant): - self.__RFPLANT[name] = rf - - def get_rf_plant(self,name:str) -> RFPlant: - if name not in self.__RFPLANT: - raise PyAMLException(f"RFPlant {name} not defined") - return self.__RFPLANT[name] + def add_rf_plant(self,rf:RFPlant): + self.__add(self.__RFPLANT,rf) - def add_rf_transnmitter(self,name:str,rf:RFTransmitter): - self.__RFTRANSMITTER[name] = rf + def add_rf_transnmitter(self,rf:RFTransmitter): + self.__add(self.__RFTRANSMITTER,rf) def get_rf_trasnmitter(self,name:str) -> RFTransmitter: - if name not in self.__RFTRANSMITTER: - raise PyAMLException(f"RFTransmitter {name} not defined") - return self.__RFTRANSMITTER[name] + return self.__get("RFTransmitter",name,self.__RFTRANSMITTER) # Tune monitor def get_betatron_tune_monitor(self, name:str) -> BetatronTuneMonitor: - if name not in self.__DIAG: - raise Exception(f"Diagnostic devices array does not contain {name}") - return self.__DIAG[name] + return self.__get("Diagnostic",name,self.__DIAG) - def add_betatron_tune_monitor(self, name:str, tune_monitor:Element): - self.__DIAG[name] = tune_monitor + def add_betatron_tune_monitor(self, tune_monitor:Element): + self.__add(self.__DIAG,tune_monitor) diff --git a/pyaml/configuration/factory.py b/pyaml/configuration/factory.py index 7c821147..ee20edc5 100644 --- a/pyaml/configuration/factory.py +++ b/pyaml/configuration/factory.py @@ -116,10 +116,10 @@ def build_object(self, d:dict): try: obj = elem_cls(cfg) + self.register_element(obj) except Exception as e: raise PyAMLConfigException(f"{str(e)} when creating '{type_str}.{cls_name}' {location_str}") - self.register_element(obj) return obj @@ -154,7 +154,6 @@ def register_element(self, elt): if isinstance(elt,Element): name = elt.get_name() if name in self._elements: - print(self._elements) raise PyAMLConfigException(f"element {name} already defined") self._elements[name] = elt diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index 306d22cc..32346df5 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -86,8 +86,9 @@ def set_energy(self,E:float): E : float Energy in eV """ - for m in self.get_all_magnets().items(): - m[1].set_energy(E) + # Needed by energy dependant element (i.e. magnet coil current calculation) + for m in self.get_all_elements(): + m.set_energy(E) def fill_device(self,elements:list[Element]): """ @@ -104,41 +105,40 @@ def fill_device(self,elements:list[Element]): strength = RWStrengthScalar(e.model) if e.model.has_physics() else None # Create a unique ref for this control system m = e.attach(self,strength, current) - self.add_magnet(m.get_name(),m) + self.add_magnet(m) elif isinstance(e,CombinedFunctionMagnet): - self.add_magnet(e.get_name(),e) + self.add_magnet(e) currents = RWHardwareArray(e.model) if e.model.has_hardware() else None strengths = RWStrengthArray(e.model) if e.model.has_physics() else None # Create unique refs of each function for this control system ms = e.attach(self,strengths,currents) for m in ms: - self.add_magnet(m.get_name(),m) + self.add_magnet(m) elif isinstance(e,BPM): tilt = RWBpmTiltScalar(e.model) offsets = RWBpmOffsetArray(e.model) positions = RBpmArray(e.model) e = e.attach(self,positions, offsets, tilt) - self.add_bpm(e.get_name(),e) + self.add_bpm(e) elif isinstance(e,RFPlant): - self.add_rf_plant(e.get_name(),e) attachedTrans: list[RFTransmitter] = [] for t in e._cfg.transmitters: voltage = RWRFVoltageScalar(t) phase = RWRFPhaseScalar(t) nt = t.attach(self,voltage,phase) - self.add_rf_transnmitter(nt.get_name(),nt) + self.add_rf_transnmitter(nt) attachedTrans.append(nt) frequency = RWRFFrequencyScalar(e) voltage = RWTotalVoltage(attachedTrans) ne = e.attach(self,frequency,voltage) - self.add_rf_plant(ne.get_name(),ne) + self.add_rf_plant(ne) elif isinstance(e,BetatronTuneMonitor): betatron_tune = RBetatronTuneArray(e) e = e.attach(self,betatron_tune) - self.add_betatron_tune_monitor(e.get_name(), e) + self.add_betatron_tune_monitor(e) diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index 642a19b1..81470d1e 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -64,9 +64,9 @@ def get_lattice(self) -> at.Lattice: def set_energy(self,E:float): self.ring.energy = E - # For current calculation - for m in self.get_all_magnets().items(): - m[1].set_energy(E) + # Needed by energy dependant element (i.e. magnet coil current calculation) + for m in self.get_all_elements(): + m.set_energy(E) def create_magnet_strength_aggregator(self,magnets:list[Magnet]) -> ScalarAggregator: # No magnet aggregator for simulator @@ -95,16 +95,16 @@ def fill_device(self,elements:list[Element]): strength = RWStrengthScalar(self.get_at_elems(e),e.polynom,e.model) if e.model.has_physics() else None # Create a unique ref for this simulator m = e.attach(self,strength,current) - self.add_magnet(m.get_name(),m) + self.add_magnet(m) elif isinstance(e,CombinedFunctionMagnet): - self.add_magnet(e.get_name(),e) + self.add_magnet(e) currents = RWHardwareArray(self.get_at_elems(e),e.polynoms,e.model) if e.model.has_physics() else None strengths = RWStrengthArray(self.get_at_elems(e),e.polynoms,e.model) if e.model.has_physics() else None # Create unique refs of each function for this simulator ms = e.attach(self,strengths,currents) for m in ms: - self.add_magnet(m.get_name(), m) + self.add_magnet(m) elif isinstance(e,BPM): # This assumes unique BPM names in the pyAT lattice @@ -112,10 +112,9 @@ def fill_device(self,elements:list[Element]): offsets = RWBpmOffsetArray(self.get_at_elems(e)[0]) positions = RBpmArray(self.get_at_elems(e)[0],self.ring) e = e.attach(self,positions, offsets, tilt) - self.add_bpm(e.get_name(),e) + self.add_bpm(e) elif isinstance(e,RFPlant): - self.add_rf_plant(e.get_name(),e) cavs: list[at.Element] = [] harmonics: list[float] = [] attachedTrans: list[RFTransmitter] = [] @@ -135,18 +134,18 @@ def fill_device(self,elements:list[Element]): phase = RWRFPhaseScalar(cavsPerTrans,t) nt = t.attach(self,voltage,phase) attachedTrans.append(nt) - self.add_rf_transnmitter(nt.get_name(),nt) + self.add_rf_transnmitter(nt) cavs.extend(cavsPerTrans) frequency = RWRFFrequencyScalar(cavs,harmonics,e) voltage = RWTotalVoltage(attachedTrans) ne = e.attach(self,frequency,voltage) - self.add_rf_plant(ne.get_name(),ne) + self.add_rf_plant(ne) elif isinstance(e, BetatronTuneMonitor): betatron_tune = RBetatronTuneArray(self.ring) e = e.attach(self,betatron_tune) - self.add_betatron_tune_monitor(e.get_name(), e) + self.add_betatron_tune_monitor(e) def get_at_elems(self,element:Element) -> list[at.Element]: diff --git a/tests/config/bad_conf_duplicate_1.yaml b/tests/config/bad_conf_duplicate_1.yaml new file mode 100644 index 00000000..63062260 --- /dev/null +++ b/tests/config/bad_conf_duplicate_1.yaml @@ -0,0 +1,26 @@ +type: pyaml.pyaml +instruments: + - type: pyaml.instrument + name: sr + energy: 6e9 + simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design + data_folder: /data/store + arrays: + - type: pyaml.arrays.magnet + name: HCORR + elements: + - SH1A-C01-H + - SH1A-C02-H + - SH1A-C02-H # duplicate name in array + - type: pyaml.arrays.magnet + name: VCORR + elements: + - SH1A-C01-V + - SH1A-C02-V + devices: + - sr/quadrupoles/QF1AC01.yaml + - sr/correctors/SH1AC01.yaml + - sr/correctors/SH1AC02.yaml diff --git a/tests/config/bad_conf_duplicate_2.yaml b/tests/config/bad_conf_duplicate_2.yaml new file mode 100644 index 00000000..0684a478 --- /dev/null +++ b/tests/config/bad_conf_duplicate_2.yaml @@ -0,0 +1,59 @@ +type: pyaml.pyaml +instruments: + - type: pyaml.instrument + name: sr + energy: 6e9 + simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design + controls: + - type: tango.pyaml.controlsystem + tango_host: ebs-simu-3:10000 + name: live + data_folder: /data/store + arrays: + - type: pyaml.arrays.bpm + name: BPM + elements: + - BPM_C04-04 + - BPM_C04-05 + - BPM_C04-06 + - BPM_C04-06 # duplicate name in array + devices: + - type: pyaml.bpm.bpm + name: BPM_C04-04 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-04/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-04/SA_VPosition + unit: m + - type: pyaml.bpm.bpm + name: BPM_C04-05 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-05/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-05/SA_VPosition + unit: m + - type: pyaml.bpm.bpm + name: BPM_C04-06 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-06/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-06/SA_VPosition + unit: m diff --git a/tests/config/bad_conf_duplicate_3.yaml b/tests/config/bad_conf_duplicate_3.yaml new file mode 100644 index 00000000..50661c47 --- /dev/null +++ b/tests/config/bad_conf_duplicate_3.yaml @@ -0,0 +1,70 @@ +type: pyaml.pyaml +instruments: + - type: pyaml.instrument + name: sr + energy: 6e9 + simulators: + - type: pyaml.lattice.simulator + lattice: sr/lattices/ebs.mat + name: design + controls: + - type: tango.pyaml.controlsystem + tango_host: ebs-simu-3:10000 + name: live + data_folder: /data/store + arrays: + - type: pyaml.arrays.bpm + name: BPM + elements: + - BPM_C04-04 + - BPM_C04-05 + - BPM_C04-06 + devices: + - type: pyaml.bpm.bpm + name: BPM_C04-04 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-04/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-04/SA_VPosition + unit: m + - type: pyaml.bpm.bpm + name: BPM_C04-05 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-05/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-05/SA_VPosition + unit: m + - type: pyaml.bpm.bpm + name: BPM_C04-06 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-06/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-06/SA_VPosition + unit: m + - type: pyaml.bpm.bpm # duplicate device + name: BPM_C04-06 + model: + type: pyaml.bpm.bpm_simple_model + x_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-06/SA_HPosition + unit: m + y_pos: + type: tango.pyaml.attribute_read_only + attribute: srdiag/bpm/c04-06/SA_VPosition + unit: m diff --git a/tests/config/sr.yaml b/tests/config/sr.yaml index 8267dc37..29b83173 100644 --- a/tests/config/sr.yaml +++ b/tests/config/sr.yaml @@ -35,6 +35,13 @@ instruments: elements: - BPM_C04-01 - BPM_C04-02 + - type: pyaml.arrays.element + name: ElArray + elements: + - BPM_C04-01 + - BPM_C04-02 + - SH1A-C01-V + - SH1A-C02-H devices: - sr/quadrupoles/QF1AC01.yaml - sr/correctors/SH1AC01.yaml diff --git a/tests/test_arrays.py b/tests/test_arrays.py index e82d1887..6770d8bb 100644 --- a/tests/test_arrays.py +++ b/tests/test_arrays.py @@ -108,7 +108,6 @@ def test_arrays(install_test_package): assert(np.abs(ps2[1] - 0.06600571179092833)<1e-10) assert(np.abs(ps2[2] + 0.0634854407797858)<1e-10) - # Test BPMs array # Using aggragtor @@ -130,6 +129,10 @@ def test_arrays(install_test_package): assert(np.abs(pos[1][0] - 1.1681211772781844e-04)<1e-10) assert(np.abs(pos[1][1] - 7.072972488250373e-06)<1e-10) + # Radom array + elts = sr.design.get_elemens("ElArray") + print(elts.names()) + Factory.clear() diff --git a/tests/test_errors.py b/tests/test_errors.py new file mode 100644 index 00000000..9a90676d --- /dev/null +++ b/tests/test_errors.py @@ -0,0 +1,48 @@ +from pyaml.pyaml import pyaml,PyAML +from pyaml.common.exception import PyAMLConfigException,PyAMLException +from pyaml.configuration.factory import Factory +from pyaml.instrument import Instrument +from pyaml.arrays.magnet_array import MagnetArray +import pytest + +@pytest.mark.parametrize("install_test_package", [{ + "name": "tango-pyaml", + "path": "tests/dummy_cs/tango-pyaml" +}], indirect=True) +def test_tune(install_test_package): + + with pytest.raises(PyAMLConfigException) as exc: + ml:PyAML = pyaml("tests/config/bad_conf_duplicate_1.yaml") + assert("MagnetArray HCORR : duplicate name SH1A-C02-H @index 2" in str(exc)) + Factory.clear() + + with pytest.raises(PyAMLConfigException) as exc: + ml:PyAML = pyaml("tests/config/bad_conf_duplicate_2.yaml") + assert("BPMArray BPM : duplicate name BPM_C04-06 @index 3" in str(exc)) + Factory.clear() + + with pytest.raises(PyAMLConfigException) as exc: + ml:PyAML = pyaml("tests/config/bad_conf_duplicate_3.yaml") + assert("element BPM_C04-06 already defined" in str(exc)) + assert("line 59, column 7" in str(exc)) + Factory.clear() + + ml:PyAML = pyaml("tests/config/EBSTune.yaml") + sr:Instrument = ml.get('sr') + m1 = sr.live.get_magnet("QF1E-C04") + m2 = sr.design.get_magnet("QF1A-C05") + with pytest.raises(PyAMLException) as exc: + ma = MagnetArray("Test",[m1,m2]) + assert("MagnetArray Test: All elements must be attached to the same instance" in str(exc)) + + with pytest.raises(PyAMLException) as exc: + m2 = sr.design.get_magnet("QF1A-C05XX") + assert("Magnet QF1A-C05XX not defined" in str(exc)) + + with pytest.raises(PyAMLException) as exc: + m2 = sr.design.get_bpm("QF1A-C05XX") + assert("BPM QF1A-C05XX not defined" in str(exc)) + + Factory.clear() + + diff --git a/tests/test_rf.py b/tests/test_rf.py index 34da3164..11caf129 100644 --- a/tests/test_rf.py +++ b/tests/test_rf.py @@ -2,8 +2,13 @@ from pyaml.instrument import Instrument from pyaml.configuration.factory import Factory import numpy as np +import pytest -def test_rf(): +@pytest.mark.parametrize("install_test_package", [{ + "name": "tango-pyaml", + "path": "tests/dummy_cs/tango-pyaml" +}], indirect=True) +def test_rf(install_test_package): ml:PyAML = pyaml("tests/config/EBS_rf.yaml") sr:Instrument = ml.get('sr') @@ -35,7 +40,11 @@ def test_rf(): Factory.clear() -def test_rf_multi(): +@pytest.mark.parametrize("install_test_package", [{ + "name": "tango-pyaml", + "path": "tests/dummy_cs/tango-pyaml" +}], indirect=True) +def test_rf_multi(install_test_package): ml:PyAML = pyaml("tests/config/EBS_rf_multi.yaml") sr:Instrument = ml.get('sr') From 4a11ad5d1ba5b2ce4992068be59991a8fc5c4138 Mon Sep 17 00:00:00 2001 From: PONS Date: Wed, 5 Nov 2025 15:09:07 +0100 Subject: [PATCH 2/9] Fix + tests --- pyaml/common/element_holder.py | 9 ++++++++- pyaml/control/controlsystem.py | 1 - pyaml/lattice/simulator.py | 2 +- tests/test_arrays.py | 14 +++++++++++++- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 7aab8460..9a8647f6 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -3,6 +3,7 @@ """ from .element import Element from ..magnet.magnet import Magnet +from ..bpm.bpm import BPM from ..rf.rf_plant import RFPlant from ..rf.rf_transmitter import RFTransmitter from ..arrays.magnet_array import MagnetArray @@ -83,6 +84,9 @@ def add_magnet(self,m:Magnet): def get_magnets(self,name:str) -> MagnetArray: return self.__get("Magnet array",name,self.__MAGNET_ARRAYS) + + def get_all_magnets(self) -> list[Magnet]: + return [value for key, value in self.__MAGNETS.items()] # BPMs @@ -92,12 +96,15 @@ def fill_bpm_array(self,arrayName:str,elementNames:list[str]): def get_bpm(self,name:str) -> Element: return self.__get("BPM",name,self.__BPMS) - def add_bpm(self,bpm:Element): + def add_bpm(self,bpm:BPM): self.__add(self.__BPMS,bpm) def get_bpms(self,name:str) -> BPMArray: return self.__get("BPM array",name,self.__BPM_ARRAYS) + def get_all_bpms(self) -> list[BPM]: + return [value for key, value in self.__BPMS.items()] + # RF def get_rf_plant(self,name:str) -> RFPlant: diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index 32346df5..7e8759e0 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -108,7 +108,6 @@ def fill_device(self,elements:list[Element]): self.add_magnet(m) elif isinstance(e,CombinedFunctionMagnet): - self.add_magnet(e) currents = RWHardwareArray(e.model) if e.model.has_hardware() else None strengths = RWStrengthArray(e.model) if e.model.has_physics() else None # Create unique refs of each function for this control system diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index 81470d1e..474a984a 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -98,7 +98,7 @@ def fill_device(self,elements:list[Element]): self.add_magnet(m) elif isinstance(e,CombinedFunctionMagnet): - self.add_magnet(e) + #self.add_magnet(e) TODO handle attached combined function manget currents = RWHardwareArray(self.get_at_elems(e),e.polynoms,e.model) if e.model.has_physics() else None strengths = RWStrengthArray(self.get_at_elems(e),e.polynoms,e.model) if e.model.has_physics() else None # Create unique refs of each function for this simulator diff --git a/tests/test_arrays.py b/tests/test_arrays.py index 6770d8bb..f78b3331 100644 --- a/tests/test_arrays.py +++ b/tests/test_arrays.py @@ -1,6 +1,7 @@ from pyaml.pyaml import pyaml,PyAML from pyaml.configuration.factory import Factory from pyaml.instrument import Instrument +from pyaml.arrays.element_array import ElementArray from pyaml.arrays.magnet_array import MagnetArray from pyaml.arrays.bpm_array import BPMArray import importlib @@ -131,7 +132,18 @@ def test_arrays(install_test_package): # Radom array elts = sr.design.get_elemens("ElArray") - print(elts.names()) + + # Create an array that contains all elements + allElts = ElementArray("AllElements",sr.design.get_all_elements()) + assert(len(allElts)==9) + + # Create an array that contains all elements + allMags = MagnetArray("AllMagnets",sr.design.get_all_magnets()) + assert(len(allMags)==7) + + # Create an array that contains all BPM + allBpms = BPMArray("AllBPMs",sr.design.get_all_bpms()) + assert(len(allBpms)==2) Factory.clear() From defbb00ce5818a860e90fa9da0c24332b40642d8 Mon Sep 17 00:00:00 2001 From: PONS Date: Wed, 5 Nov 2025 15:18:39 +0100 Subject: [PATCH 3/9] Removed wrong unattached CombinedFunctionMangettest --- tests/test_ident_models.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_ident_models.py b/tests/test_ident_models.py index 9e68f27f..e09fb3f1 100644 --- a/tests/test_ident_models.py +++ b/tests/test_ident_models.py @@ -21,10 +21,10 @@ def test_cfm_magnets(magnet_file, install_test_package): ml:PyAML = pyaml(magnet_file) sr:Instrument = ml.get('sr') sr.design.get_lattice().disable_6d() - magnet_design = sr.design.get_magnet("SH1A-C01") - magnet_live = sr.live.get_magnet("SH1A-C01") - assert isinstance(magnet_design, CombinedFunctionMagnet) - assert isinstance(magnet_live, CombinedFunctionMagnet) + #magnet_design = sr.design.get_magnet("SH1A-C01") + #magnet_live = sr.live.get_magnet("SH1A-C01") + #assert isinstance(magnet_design, CombinedFunctionMagnet) + #assert isinstance(magnet_live, CombinedFunctionMagnet) magnet_h_design = sr.design.get_magnet("SH1A-C01-H") magnet_v_design = sr.design.get_magnet("SH1A-C01-V") magnet_h_live = sr.live.get_magnet("SH1A-C01-H") From d97f670f369cf99aa90602b9eb872b89b559da80 Mon Sep 17 00:00:00 2001 From: PONS Date: Wed, 5 Nov 2025 22:19:17 +0100 Subject: [PATCH 4/9] Fix repr bug --- pyaml/magnet/magnet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyaml/magnet/magnet.py b/pyaml/magnet/magnet.py index 6559357d..2554dd81 100644 --- a/pyaml/magnet/magnet.py +++ b/pyaml/magnet/magnet.py @@ -88,7 +88,7 @@ def set_model_name(self, name:str): """ self.__modelName = name - def __str__(self): + def __repr__(self): return "%s(peer='%s', name='%s', model='%s', magnet_model=%s)" % ( self.__class__.__name__, self.get_peer(), From cef56cf58769b1be1163397073430fa9b5833ac1 Mon Sep 17 00:00:00 2001 From: PONS Date: Wed, 5 Nov 2025 22:20:12 +0100 Subject: [PATCH 5/9] Added Factory.get_elements_by_name() and Factory.get_elements_by_type() --- pyaml/configuration/factory.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pyaml/configuration/factory.py b/pyaml/configuration/factory.py index ee20edc5..ea517d8f 100644 --- a/pyaml/configuration/factory.py +++ b/pyaml/configuration/factory.py @@ -1,6 +1,7 @@ # PyAML factory (construct AML objects from config files) import importlib from threading import Lock +import fnmatch from ..common.exception import PyAMLConfigException from ..common.element import Element @@ -162,6 +163,12 @@ def get_element(self, name:str): if name not in self._elements: raise PyAMLConfigException(f"element {name} not defined") return self._elements[name] + + def get_elements_by_name(self,wildcard:str) -> list[Element]: + return [e for k,e in self._elements.items() if fnmatch.fnmatch(k, wildcard)] + + def get_elements_by_type(self,type) -> list[Element]: + return [e for k,e in self._elements.items() if isinstance(e,type)] def clear(self): self._elements.clear() From 986d785a9ffe4ec87c23d9976c0d961d22c8dba2 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 6 Nov 2025 09:09:46 +0100 Subject: [PATCH 6/9] Renamed private variable --- pyaml/lattice/abstract_impl.py | 137 +++++++++++++++++---------------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/pyaml/lattice/abstract_impl.py b/pyaml/lattice/abstract_impl.py index b998a0fc..29509eea 100644 --- a/pyaml/lattice/abstract_impl.py +++ b/pyaml/lattice/abstract_impl.py @@ -22,24 +22,24 @@ class RWHardwareScalar(abstract.ReadWriteFloatScalar): """ def __init__(self, elements:list[at.Element], poly:PolynomInfo, model:MagnetModel): - self.model = model - self.elements = elements - self.poly = elements[0].__getattribute__(poly.attName) - self.polyIdx = poly.index + self.__model = model + self.__elements = elements + self.__poly = elements[0].__getattribute__(poly.attName) + self.__polyIdx = poly.index def get(self) -> float: - s = self.poly[self.polyIdx] * self.elements[0].Length - return self.model.compute_hardware_values([s])[0] + s = self.__poly[self.__polyIdx] * self.__elements[0].Length + return self.__model.compute_hardware_values([s])[0] def set(self,value:float): - s = self.model.compute_strengths([value])[0] - self.poly[self.polyIdx] = s / self.elements[0].Length + s = self.__model.compute_strengths([value])[0] + self.__poly[self.__polyIdx] = s / self.__elements[0].Length def set_and_wait(self, value:float): raise NotImplementedError("Not implemented yet.") def unit(self) -> str: - return self.model.get_hardware_units()[0] + return self.__model.get_hardware_units()[0] #------------------------------------------------------------------------------ @@ -49,18 +49,18 @@ class RWStrengthScalar(abstract.ReadWriteFloatScalar): """ def __init__(self, elements:list[at.Element], poly:PolynomInfo, model:MagnetModel): - self.unitconv = model - self.elements = elements - self.poly = elements[0].__getattribute__(poly.attName) - self.polyIdx = poly.index + self.__model = model + self.__elements = elements + self.__poly = elements[0].__getattribute__(poly.attName) + self.__polyIdx = poly.index # Gets the value def get(self) -> float: - return self.poly[self.polyIdx] * self.elements[0].Length + return self.__poly[self.__polyIdx] * self.__elements[0].Length # Sets the value def set(self, value:float): - self.poly[self.polyIdx] = value / self.elements[0].Length + self.__poly[self.__polyIdx] = value / self.__elements[0].Length # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:float): @@ -68,7 +68,7 @@ def set_and_wait(self, value:float): # Gets the unit of the value def unit(self) -> str: - return self.unitconv.get_strength_units()[0] + return self.__model.get_strength_units()[0] #------------------------------------------------------------------------------ @@ -79,28 +79,28 @@ class RWHardwareArray(abstract.ReadWriteFloatArray): """ def __init__(self, elements:list[at.Element], poly:list[PolynomInfo], model:MagnetModel): - self.elements = elements - self.poly = [] - self.polyIdx = [] - self.model = model + self.__elements = elements + self.__poly = [] + self.__polyIdx = [] + self.__model = model for p in poly: - self.poly.append(elements[0].__getattribute__(p.attName)) - self.polyIdx.append(p.index) + self.__poly.append(elements[0].__getattribute__(p.attName)) + self.__polyIdx.append(p.index) # Gets the value def get(self) -> np.array: - nbStrength = len(self.poly) + nbStrength = len(self.__poly) s = np.zeros(nbStrength) for i in range(nbStrength): - s[i] = self.poly[i][self.polyIdx[i]] * self.elements[0].Length - return self.model.compute_hardware_values(s) + s[i] = self.__poly[i][self.__polyIdx[i]] * self.__elements[0].Length + return self.__model.compute_hardware_values(s) # Sets the value def set(self, value:np.array): - nbStrength = len(self.poly) - s = self.model.compute_strengths(value) + nbStrength = len(self.__poly) + s = self.__model.compute_strengths(value) for i in range(nbStrength): - self.poly[i][self.polyIdx[i]] = s[i] / self.elements[0].Length + self.__poly[i][self.__polyIdx[i]] = s[i] / self.__elements[0].Length # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:np.array): @@ -108,7 +108,7 @@ def set_and_wait(self, value:np.array): # Gets the unit of the value def unit(self) -> list[str]: - return self.model.get_hardware_units() + return self.__model.get_hardware_units() #------------------------------------------------------------------------------ @@ -118,28 +118,28 @@ class RWStrengthArray(abstract.ReadWriteFloatArray): """ def __init__(self, elements:list[at.Element], poly:list[PolynomInfo], model:MagnetModel): - self.elements = elements - self.poly = [] - self.polyIdx = [] - self.unitconv = model + self.__elements = elements + self.__poly = [] + self.__polyIdx = [] + self.__model = model for p in poly: - self.poly.append(elements[0].__getattribute__(p.attName)) - self.polyIdx.append(p.index) + self.__poly.append(elements[0].__getattribute__(p.attName)) + self.__polyIdx.append(p.index) # Gets the value def get(self) -> np.array: - nbStrength = len(self.poly) + nbStrength = len(self.__poly) s = np.zeros(nbStrength) for i in range(nbStrength): - s[i] = self.poly[i][self.polyIdx[i]] * self.elements[0].Length + s[i] = self.__poly[i][self.__polyIdx[i]] * self.__elements[0].Length return s # Sets the value def set(self, value:np.array): - nbStrength = len(self.poly) + nbStrength = len(self.__poly) s = np.zeros(nbStrength) for i in range(nbStrength): - self.poly[i][self.polyIdx[i]] = value[i] / self.elements[0].Length + self.__poly[i][self.__polyIdx[i]] = value[i] / self.__elements[0].Length # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:np.array): @@ -147,7 +147,8 @@ def set_and_wait(self, value:np.array): # Gets the unit of the value def unit(self) -> list[str]: - return self.unitconv.get_strength_units() + return self.__model.get_strength_units() + #------------------------------------------------------------------------------ @@ -157,11 +158,11 @@ class BPMScalarAggregator(ScalarAggregator): """ def __init__(self, ring:at.Lattice): - self.lattice = ring - self.refpts = [] + self.__lattice = ring + self.__refpts = [] def add_elem(self,elem:at.Element): - self.refpts.append(self.lattice.index(elem)) + self.__refpts.append(self.__lattice.index(elem)) def set(self, value: NDArray[np.float64]): pass @@ -170,7 +171,7 @@ def set_and_wait(self, value: NDArray[np.float64]): pass def get(self) -> np.array: - _, orbit = at.find_orbit(self.lattice, refpts=self.refpts) + _, orbit = at.find_orbit(self.__lattice, refpts=self.__refpts) return orbit[:, [0, 2]].flatten() def readback(self) -> np.array: @@ -187,7 +188,7 @@ class BPMVScalarAggregator(BPMScalarAggregator): """ def get(self) -> np.array: - _, orbit = at.find_orbit(self.lattice, refpts=self.refpts) + _, orbit = at.find_orbit(self.__lattice, refpts=self.__refpts) return orbit[:, 0] #------------------------------------------------------------------------------ @@ -198,7 +199,7 @@ class BPMHScalarAggregator(BPMScalarAggregator): """ def get(self) -> np.array: - _, orbit = at.find_orbit(self.lattice, refpts=self.refpts) + _, orbit = at.find_orbit(self.__lattice, refpts=self.__refpts) return orbit[:, 2] #------------------------------------------------------------------------------ @@ -212,14 +213,14 @@ class RBpmArray(abstract.ReadFloatArray): """ def __init__(self, element: at.Element, lattice: at.Lattice): - self.element = element - self.lattice = lattice + self.__element = element + self.__lattice = lattice # Gets the value def get(self) -> np.array: - index = self.lattice.index(self.element) - _, orbit = at.find_orbit(self.lattice, refpts=index) + index = self.__lattice.index(self.__element) + _, orbit = at.find_orbit(self.__lattice, refpts=index) return orbit[0, [0, 2]] # Gets the unit of the value @@ -235,25 +236,25 @@ class RWBpmOffsetArray(abstract.ReadWriteFloatArray): """ def __init__(self, element:at.Element): - self.element = element + self.__element = element try: - self.offset = element.__getattribute__('Offset') + self.__offset = element.__getattribute__('Offset') except AttributeError: - self.offset = None + self.__offset = None # Gets the value def get(self) -> np.array: - if self.offset is None: + if self.__offset is None: raise PyAMLException("Element does not have an Offset attribute.") - return self.offset + return self.__offset # Sets the value def set(self, value:np.array): - if self.offset is None: + if self.__offset is None: raise PyAMLException("Element does not have an Offset attribute.") if len(value) != 2: raise PyAMLException("BPM offset must be a 2-element array.") - self.offset = value + self.__offset = value # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:np.array): @@ -272,22 +273,22 @@ class RWBpmTiltScalar(abstract.ReadWriteFloatScalar): """ def __init__(self, element:at.Element): - self.element = element + self.__element = element try: - self.tilt = element.__getattribute__('Rotation')[0] + self.__tilt = element.__getattribute__('Rotation')[0] except AttributeError: - self.tilt = None + self.__tilt = None # Gets the value def get(self) -> float: - if self.tilt is None: + if self.__tilt is None: raise ValueError("Element does not have a Tilt attribute.") - return self.tilt + return self.__tilt # Sets the value def set(self, value:float, ): - self.tilt = value - self.element.__setattr__('Rotation', [value, None, None]) + self.__tilt = value + self.__element.__setattr__('Rotation', [value, None, None]) # Sets the value and wait that the read value reach the setpoint def set_and_wait(self, value:float): @@ -305,18 +306,18 @@ class RWRFVoltageScalar(abstract.ReadWriteFloatScalar): """ def __init__(self, elements:list[at.Element], transmitter:RFTransmitter): - self.elements = elements + self.__elements = elements self.__transmitter = transmitter def get(self) -> float: sum = 0 - for idx,e in enumerate(self.elements): + for idx,e in enumerate(self.__elements): sum += e.Voltage return sum def set(self,value:float): - v = value / len(self.elements) - for e in self.elements: + v = value / len(self.__elements) + for e in self.__elements: e.Voltage = v def set_and_wait(self, value:float): From 47c116fb161c868d7f67aabfb5d68b06a96f94f2 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 6 Nov 2025 09:10:28 +0100 Subject: [PATCH 7/9] Implemented CombinedFunctionMagnet array + test --- pyaml/arrays/cfm_magnet.py | 15 ++++ pyaml/arrays/cfm_magnet_array.py | 128 +++++++++++++++++++++++++++++++ pyaml/common/element_holder.py | 21 +++++ pyaml/control/controlsystem.py | 5 +- pyaml/lattice/simulator.py | 4 +- pyaml/magnet/cfm_magnet.py | 90 +++++++++++++++------- tests/config/sr.yaml | 5 ++ tests/test_arrays.py | 29 ++++++- 8 files changed, 266 insertions(+), 31 deletions(-) create mode 100644 pyaml/arrays/cfm_magnet.py create mode 100644 pyaml/arrays/cfm_magnet_array.py diff --git a/pyaml/arrays/cfm_magnet.py b/pyaml/arrays/cfm_magnet.py new file mode 100644 index 00000000..e0aabab7 --- /dev/null +++ b/pyaml/arrays/cfm_magnet.py @@ -0,0 +1,15 @@ +from .array import ArrayConfigModel,ArrayConfig +from ..common.element_holder import ElementHolder + +# Define the main class name for this module +PYAMLCLASS = "CombinedFunctionMagnet" + +class ConfigModel(ArrayConfigModel):... + +class CombinedFunctionMagnet(ArrayConfig): + + def __init__(self, cfg: ArrayConfigModel): + super().__init__(cfg) + + def fill_array(self,holder:ElementHolder): + holder.fill_cfm_magnet_array(self._cfg.name,self._cfg.elements) diff --git a/pyaml/arrays/cfm_magnet_array.py b/pyaml/arrays/cfm_magnet_array.py new file mode 100644 index 00000000..87435c5b --- /dev/null +++ b/pyaml/arrays/cfm_magnet_array.py @@ -0,0 +1,128 @@ +from ..common.abstract import ReadWriteFloatArray +from ..magnet.cfm_magnet import CombinedFunctionMagnet +from .element_array import get_peer_from_array +import numpy as np + +#TODO handle aggregator for CFM + +class RWMagnetStrengths(ReadWriteFloatArray): + + def __init__(self, name:str, magnets:list[CombinedFunctionMagnet]): + self.__name = name + self.__magnets = magnets + self.__nb = sum(m.nb_multipole() for m in magnets) + + # Gets the values + def get(self) -> np.array: + r = np.zeros(self.__nb) + idx = 0 + for m in self.__magnets: + r[idx:idx+m.nb_multipole()] = m.strengths.get() + idx+=m.nb_multipole() + return r + + # Sets the values + def set(self, value:np.array): + nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value + idx = 0 + for m in self.__magnets: + m.strengths.set(nvalue[idx:idx+m.nb_multipole()]) + idx+=m.nb_multipole() + + # Sets the values and waits that the read values reach their setpoint + def set_and_wait(self, value:np.array): + raise NotImplementedError("Not implemented yet.") + + # Gets the unit of the values + def unit(self) -> list[str]: + r = [] + for m in self.__magnets: + r.extend(m.strengths.unit()) + return r + +class RWMagnetHardwares(ReadWriteFloatArray): + + def __init__(self, name:str, magnets:list[CombinedFunctionMagnet]): + self.__name = name + self.__magnets = magnets + self.__nb = sum(m.nb_multipole() for m in magnets) + + # Gets the values + def get(self) -> np.array: + r = np.zeros(self.__nb) + idx = 0 + for m in self.__magnets: + r[idx:idx+m.nb_multipole()] = m.hardwares.get() + idx+=m.nb_multipole() + return r + + # Sets the values + def set(self, value:np.array): + nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value + for m in self.__magnets: + m.hardwares.set(nvalue[idx:idx+m.nb_multipole()]) + idx+=m.nb_multipole() + + # Sets the values and waits that the read values reach their setpoint + def set_and_wait(self, value:np.array): + raise NotImplementedError("Not implemented yet.") + + # Gets the unit of the values + def unit(self) -> list[str]: + r = [] + for m in self.__magnets: + r.extend(m.hardwares.unit()) + return r + + +class CombinedFunctionMagnetArray(list[CombinedFunctionMagnet]): + """ + Class that implements access to a magnet array + """ + + def __init__(self,arrayName:str,magnets:list[CombinedFunctionMagnet],use_aggregator = False): + """ + Construct a magnet array + + Parameters + ---------- + arrayName : str + Array name + magnets: list[Magnet] + Magnet list, all elements must be attached to the same instance of + either a Simulator or a ControlSystem. + use_aggregator : bool + Use aggregator to increase performance by using paralell access to underlying devices. + """ + super().__init__(i for i in magnets) + self.__name = arrayName + holder = get_peer_from_array(self) + + self.__rwstrengths = RWMagnetStrengths(arrayName,magnets) + self.__rwhardwares = RWMagnetHardwares(arrayName,magnets) + + if use_aggregator: + raise("Aggregator not implemented for CombinedFunctionMagnetArray") + + def get_name(self) -> str: + return self.__name + + @property + def strengths(self) -> RWMagnetStrengths: + """ + Give access to strength of each magnet of this array + """ + return self.__rwstrengths + + @property + def hardwares(self) -> RWMagnetHardwares: + """ + Give access to hardware value of each magnet of this array + """ + return self.__rwhardwares + + + + + + \ No newline at end of file diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 9a8647f6..81b4eaba 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -3,10 +3,12 @@ """ from .element import Element from ..magnet.magnet import Magnet +from ..magnet.cfm_magnet import CombinedFunctionMagnet from ..bpm.bpm import BPM from ..rf.rf_plant import RFPlant from ..rf.rf_transmitter import RFTransmitter from ..arrays.magnet_array import MagnetArray +from ..arrays.cfm_magnet_array import CombinedFunctionMagnetArray from ..arrays.bpm_array import BPMArray from ..arrays.element_array import ElementArray from ..common.exception import PyAMLException @@ -20,6 +22,7 @@ class ElementHolder(object): def __init__(self): # Device handle self.__MAGNETS: dict = {} + self.__CFM_MAGNETS: dict = {} self.__BPMS: dict = {} self.__RFPLANT: dict = {} self.__RFTRANSMITTER: dict = {} @@ -28,6 +31,7 @@ def __init__(self): # Array handle self.__MAGNET_ARRAYS: dict = {} + self.__CFM_MAGNET_ARRAYS: dict = {} self.__BPM_ARRAYS: dict = {} self.__ELEMENT_ARRAYS: dict = {} @@ -87,6 +91,23 @@ def get_magnets(self,name:str) -> MagnetArray: def get_all_magnets(self) -> list[Magnet]: return [value for key, value in self.__MAGNETS.items()] + + # Combined Function Magnets + + def fill_cfm_magnet_array(self,arrayName:str,elementNames:list[str]): + self.fill_array(arrayName,elementNames,self.get_cfm_magnet,CombinedFunctionMagnetArray,self.__CFM_MAGNET_ARRAYS) + + def get_cfm_magnet(self,name:str) -> Magnet: + return self.__get("CombinedFunctionMagnet",name,self.__CFM_MAGNETS) + + def add_cfm_magnet(self,m:Magnet): + self.__add(self.__CFM_MAGNETS,m) + + def get_cfm_magnets(self,name:str) -> CombinedFunctionMagnetArray: + return self.__get("CombinedFunctionMagnet array",name,self.__CFM_MAGNET_ARRAYS) + + def get_all_cfm_magnets(self) -> list[CombinedFunctionMagnet]: + return [value for key, value in self.__CFM_MAGNET_ARRAYS.items()] # BPMs diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index 7e8759e0..cb851c0b 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -110,9 +110,10 @@ def fill_device(self,elements:list[Element]): elif isinstance(e,CombinedFunctionMagnet): currents = RWHardwareArray(e.model) if e.model.has_hardware() else None strengths = RWStrengthArray(e.model) if e.model.has_physics() else None - # Create unique refs of each function for this control system + # Create unique refs the cfm and each of its function for this control system ms = e.attach(self,strengths,currents) - for m in ms: + self.add_cfm_magnet(ms[0]) + for m in ms[1:]: self.add_magnet(m) elif isinstance(e,BPM): diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index 474a984a..908f240e 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -98,12 +98,12 @@ def fill_device(self,elements:list[Element]): self.add_magnet(m) elif isinstance(e,CombinedFunctionMagnet): - #self.add_magnet(e) TODO handle attached combined function manget currents = RWHardwareArray(self.get_at_elems(e),e.polynoms,e.model) if e.model.has_physics() else None strengths = RWStrengthArray(self.get_at_elems(e),e.polynoms,e.model) if e.model.has_physics() else None # Create unique refs of each function for this simulator ms = e.attach(self,strengths,currents) - for m in ms: + self.add_cfm_magnet(ms[0]) + for m in ms[1:]: self.add_magnet(m) elif isinstance(e,BPM): diff --git a/pyaml/magnet/cfm_magnet.py b/pyaml/magnet/cfm_magnet.py index 100499aa..393cd617 100644 --- a/pyaml/magnet/cfm_magnet.py +++ b/pyaml/magnet/cfm_magnet.py @@ -41,47 +41,85 @@ class ConfigModel(ElementConfigModel): class CombinedFunctionMagnet(Element): """CombinedFunctionMagnet class""" - def __init__(self, cfg: ConfigModel): + def __init__(self, cfg: ConfigModel, peer = None): super().__init__(cfg.name) self._cfg = cfg self.model = cfg.model - self.virtuals:list[Magnet] = [] - - if self.model is not None and not hasattr(self.model._cfg,"multipoles"): - raise PyAMLException(f"{cfg.name} model: mutipoles field required for combined function magnet") - - idx = 0 - self.polynoms = [] - for idx,m in enumerate(cfg.mapping): - # Check mapping validity - if len(m)!=2: - raise PyAMLException("Invalid CombinedFunctionMagnet mapping for {m}") - if not m[0] in _fmap: - raise PyAMLException(m[0] + " not implemented for combined function magnet") - if m[0] not in self.model._cfg.multipoles: - raise PyAMLException(m[0] + " not found in underlying magnet model") - self.polynoms.append(_fmap[m[0]].polynom) - # Create the virtual magnet for the correspoding multipole - vm = self.create_virutal_manget(m[1],m[0]) - self.virtuals.append(vm) - # Register the virtual element in the factory to have a coherent factory and improve error reporting - Factory.register_element(vm) - - def create_virutal_manget(self,name:str,idx:int) -> Magnet: + self.__virtuals:list[Magnet] = [] + self.__strengths:abstract.ReadWriteFloatArray = None + self.__hardwares:abstract.ReadWriteFloatArray = None + + if peer is None: + + # Configuration part + if self.model is not None and not hasattr(self.model._cfg,"multipoles"): + raise PyAMLException(f"{cfg.name} model: mutipoles field required for combined function magnet") + + idx = 0 + self.polynoms = [] + for idx,m in enumerate(cfg.mapping): + # Check mapping validity + if len(m)!=2: + raise PyAMLException("Invalid CombinedFunctionMagnet mapping for {m}") + if not m[0] in _fmap: + raise PyAMLException(m[0] + " not implemented for combined function magnet") + if m[0] not in self.model._cfg.multipoles: + raise PyAMLException(m[0] + " not found in underlying magnet model") + self.polynoms.append(_fmap[m[0]].polynom) + # Create the virtual magnet for the correspoding multipole + vm = self.__create_virutal_manget(m[1],m[0]) + self.__virtuals.append(vm) + # Register the virtual element in the factory to have a coherent factory and improve error reporting + Factory.register_element(vm) + + else: + + # Attach + self._peer = peer + + def __create_virutal_manget(self,name:str,idx:int) -> Magnet: args = {"name":name,"model":self.model} mVirtual:Magnet = _fmap[idx](MagnetConfigModel(**args)) mVirtual.set_model_name(self.get_name()) return mVirtual + + def nb_multipole(self) -> int: + return len(self._cfg.mapping) def attach(self, peer, strengths: abstract.ReadWriteFloatArray, hardwares: abstract.ReadWriteFloatArray) -> list[Magnet]: - # Construct a single function magnet for each multipole of this combined function magnet l = [] + # Attached the CombinedFunctionMagnet itself + nCFM = CombinedFunctionMagnet(self._cfg,peer) + nCFM.__strengths = strengths + nCFM.__hardwares = hardwares + l.append(nCFM) + # Construct a single function magnet for each multipole of this combined function magnet for idx,m in enumerate(self._cfg.mapping): strength = RWMapper(strengths,idx) hardware = RWMapper(hardwares,idx) if self.model.has_hardware() else None - l.append(self.virtuals[idx].attach(peer,strength,hardware)) + l.append(self.__virtuals[idx].attach(peer,strength,hardware)) return l + @property + def strengths(self) -> abstract.ReadWriteFloatScalar: + """ + Gives access to the strengths of this combined function magnet in physics unit + """ + self.check_peer() + if self.__strengths is None: + raise PyAMLException(f"{str(self)} has no model that supports physics units") + return self.__strengths + + @property + def hardwares(self) -> abstract.ReadWriteFloatScalar: + """ + Gives access to the strengths of this combined function magnet in hardware unit when possible + """ + self.check_peer() + if self.__hardwares is None: + raise PyAMLException(f"{str(self)} has no model that supports hardware units") + return self.__hardwares + def set_energy(self,E:float): if(self.model is not None): self.model.set_magnet_rigidity(E/speed_of_light) diff --git a/tests/config/sr.yaml b/tests/config/sr.yaml index 29b83173..b6751ee3 100644 --- a/tests/config/sr.yaml +++ b/tests/config/sr.yaml @@ -30,6 +30,11 @@ instruments: - SH1A-C02-H - SH1A-C01-V - SH1A-C02-V + - type: pyaml.arrays.cfm_magnet + name: CFM + elements: + - SH1A-C01 + - SH1A-C02 - type: pyaml.arrays.bpm name: BPMS elements: diff --git a/tests/test_arrays.py b/tests/test_arrays.py index f78b3331..58a860f5 100644 --- a/tests/test_arrays.py +++ b/tests/test_arrays.py @@ -4,6 +4,7 @@ from pyaml.arrays.element_array import ElementArray from pyaml.arrays.magnet_array import MagnetArray from pyaml.arrays.bpm_array import BPMArray +from pyaml.arrays.cfm_magnet_array import CombinedFunctionMagnet import importlib import numpy as np @@ -135,7 +136,7 @@ def test_arrays(install_test_package): # Create an array that contains all elements allElts = ElementArray("AllElements",sr.design.get_all_elements()) - assert(len(allElts)==9) + assert(len(allElts)==11) # Create an array that contains all elements allMags = MagnetArray("AllMagnets",sr.design.get_all_magnets()) @@ -145,6 +146,32 @@ def test_arrays(install_test_package): allBpms = BPMArray("AllBPMs",sr.design.get_all_bpms()) assert(len(allBpms)==2) + cfm = sr.design.get_cfm_magnets("CFM") + strHVSQ = cfm.strengths.get() + assert(np.abs(strHVSQ[0] - 0.000010)<1e-10) # H + assert(np.abs(strHVSQ[1] - 0.000015)<1e-10) # V + assert(np.abs(strHVSQ[2] + 0.0)<1e-10) # SQ + assert(np.abs(strHVSQ[3] + 0.000008)<1e-10) # H + assert(np.abs(strHVSQ[4] + 0.000017)<1e-10) # V + assert(np.abs(strHVSQ[5] + 0.0)<1e-10) # SQ + + strHVSQu = cfm.strengths.unit() + assert(strHVSQu[0] == "rad") + assert(strHVSQu[1] == "rad") + assert(strHVSQu[2] == "m-1") + assert(strHVSQu[3] == "rad") + assert(strHVSQu[4] == "rad") + assert(strHVSQu[5] == "m-1") + + cfm.strengths.set([.000010,0.000015,1e-6,-0.000008,-0.000017,1e-6]) + strHVSQ = cfm.strengths.get() + assert(np.abs(strHVSQ[0] - 0.000010)<1e-10) # H + assert(np.abs(strHVSQ[1] - 0.000015)<1e-10) # V + assert(np.abs(strHVSQ[2] - 1.e-6)<1e-10) # SQ + assert(np.abs(strHVSQ[3] + 0.000008)<1e-10) # H + assert(np.abs(strHVSQ[4] + 0.000017)<1e-10) # V + assert(np.abs(strHVSQ[5] - 1e-6)<1e-10) # SQ + Factory.clear() From c1bf72b1cff294ea7722048068b41067a5c27116 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 6 Nov 2025 09:14:10 +0100 Subject: [PATCH 8/9] Updated CombinedFunctionMagnet attachement test --- tests/test_load_quad.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_load_quad.py b/tests/test_load_quad.py index d7278d94..1cc27570 100644 --- a/tests/test_load_quad.py +++ b/tests/test_load_quad.py @@ -100,9 +100,9 @@ def test_combined_function_magnets(magnet_file, config_root_dir): assert( sUnits[0] == 'rad' and sUnits[1] == 'rad' and sUnits[2] == 'm-1' ) assert( hUnits[0] == 'A' and hUnits[1] == 'A' and hUnits[2] == 'A') ms = sh.attach(DummyPeer(),strengths,currents) - hCorr = ms[0] - vCorr = ms[1] - sqCorr = ms[2] + hCorr = ms[1] + vCorr = ms[2] + sqCorr = ms[3] hCorr.strength.set(0.000020) vCorr.strength.set(-0.000015) sqCorr.strength.set(0.000100) From 1011bd9dcf55aebd16d458db330b902523a2f926 Mon Sep 17 00:00:00 2001 From: PONS Date: Thu, 6 Nov 2025 09:17:21 +0100 Subject: [PATCH 9/9] Fix for hardwares unit --- pyaml/arrays/cfm_magnet_array.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyaml/arrays/cfm_magnet_array.py b/pyaml/arrays/cfm_magnet_array.py index 87435c5b..b43eab42 100644 --- a/pyaml/arrays/cfm_magnet_array.py +++ b/pyaml/arrays/cfm_magnet_array.py @@ -58,7 +58,8 @@ def get(self) -> np.array: # Sets the values def set(self, value:np.array): - nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value + nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value + idx = 0 for m in self.__magnets: m.hardwares.set(nvalue[idx:idx+m.nb_multipole()]) idx+=m.nb_multipole()