Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyaml/arrays/magnet_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def unit(self) -> list[str]:
def set_aggregator(self,agg:DeviceAccessList):
self.aggregator = agg
for m in self.__magnets:
self.hasHardwareMapping |= m.model.hasHardwareMapping()
self.hasHardwareMapping |= m.model.has_hardware()

class MagnetArray(list[Magnet]):
"""
Expand Down
44 changes: 25 additions & 19 deletions pyaml/control/abstract_impl.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
from numpy import double

from pyaml.control import abstract
from pyaml.magnet.model import MagnetModel
import numpy as np

#------------------------------------------------------------------------------

class RWHardwareScalar(abstract.ReadFloatScalar):
class RWHardwareScalar(abstract.ReadWriteFloatScalar):
"""
Class providing read write access to a magnet of a control system (in hardware units)
"""

def __init__(self, model:MagnetModel):
self.model = model
self.__model = model

def get(self) -> float:
return self.model.read_hardware_values()[0]
return self.__model.read_hardware_values()[0]

def set(self, value:float):
self.model.send_harware_values([value])

self.__model.send_hardware_values(np.array([value]))

def set_and_wait(self, value: double):
raise NotImplementedError("Not implemented yet.")

def unit(self) -> str:
return self.model.get_hardware_units()[0]
return self.__model.get_hardware_units()[0]

def index(self) -> int:
return 0
Expand All @@ -36,12 +42,12 @@ def __init__(self, model:MagnetModel):
# Gets the value
def get(self) -> float:
currents = self.__model.read_hardware_values()
return self.__model.compute_strengths(currents)[0]
return self.__model.compute_strengths(currents)

# Sets the value
def set(self, value:float):
current = self.__model.compute_hardware_values([value])
self.__model.send_harware_values(current)
self.__model.send_hardware_values(current)

# Sets the value and wait that the read value reach the setpoint
def set_and_wait(self, value:float):
Expand All @@ -61,15 +67,15 @@ class RWHardwareArray(abstract.ReadWriteFloatArray):
Class providing read write access to a magnet array of a control system (in hardware units)
"""
def __init__(self, model:MagnetModel):
self.model = model
self.__model = model

# Gets the value
def get(self) -> np.array:
return self.model.read_hardware_values()
return self.__model.read_hardware_values()

# Sets the value
def set(self, value:np.array):
self.model.send_harware_values(value)
self.__model.send_hardware_values(value)

# Sets the value and waits that the read value reach the setpoint
def set_and_wait(self, value:np.array):
Expand All @@ -78,7 +84,7 @@ def set_and_wait(self, value:np.array):

# Gets the unit of the value
def unit(self) -> list[str]:
return self.model.get_hardware_units()
return self.__model.get_hardware_units()

#------------------------------------------------------------------------------

Expand All @@ -87,26 +93,26 @@ class RWStrengthArray(abstract.ReadWriteFloatArray):
Class providing read write access to magnet strengths of a control system
"""
def __init__(self, model:MagnetModel):
self.model = model
self.__model = model

# Gets the value
def get(self) -> np.array:
r = self.model.read_hardware_values()
str = self.model.compute_strengths(r)
r = self.__model.read_hardware_values()
str = self.__model.compute_strengths(r)
return str

# Sets the value
def set(self, value:np.array):
cur = self.model.compute_hardware_values(value)
self.model.send_harware_values(cur)
cur = self.__model.compute_hardware_values(value)
self.__model.send_hardware_values(cur)

# Sets the value and waits that the read value reach the setpoint
def set_and_wait(self, value:np.array):
raise NotImplementedError("Not implemented yet.")

# Gets the unit of the value
def unit(self) -> list[str]:
return self.model.get_strength_units()
return self.__model.get_strength_units()



Expand Down
10 changes: 5 additions & 5 deletions pyaml/control/controlsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ def fill_device(self,elements:list[Element]):
"""
for e in elements:
if isinstance(e,Magnet):
current = RWHardwareScalar(e.model)
strength = RWStrengthScalar(e.model)
current = RWHardwareScalar(e.model) if e.model.has_hardware() else None
strength = RWStrengthScalar(e.model) if e.model.has_physics() else None
# Create a unique ref for this control system
m = e.attach(strength,current)
m = e.attach(strength, current)
self.add_magnet(str(m),m)
elif isinstance(e,CombinedFunctionMagnet):
self.add_magnet(str(e),e)
currents = RWHardwareArray(e.model)
strengths = RWStrengthArray(e.model)
currents = RWHardwareArray(e.model) if e.model.has_hardware() else None
strengths = RWStrengthArray(e.model) if e.model.has_physics() else None
# Create unique refs of each function for this control system
ms = e.attach(strengths,currents)
for m in ms:
Expand Down
9 changes: 5 additions & 4 deletions pyaml/lattice/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ def set_energy(self,E:float):

def fill_device(self,elements:list[Element]):
for e in elements:
# Need conversion to physics unit to work with simulator
if isinstance(e,Magnet):
current = RWHardwareScalar(self.get_at_elems(e.name),e.polynom,e.model)
strength = RWStrengthScalar(self.get_at_elems(e.name),e.polynom,e.model)
current = RWHardwareScalar(self.get_at_elems(e.name),e.polynom,e.model) if e.model.has_physics() else None
strength = RWStrengthScalar(self.get_at_elems(e.name),e.polynom,e.model) if e.model.has_physics() else None
# Create a unique ref for this simulator
m = e.attach(strength,current)
self.add_magnet(str(m),m)
elif isinstance(e,CombinedFunctionMagnet):
self.add_magnet(str(e),e)
currents = RWHardwareArray(self.get_at_elems(e.name),e.polynoms,e.model)
strengths = RWStrengthArray(self.get_at_elems(e.name),e.polynoms,e.model)
currents = RWHardwareArray(self.get_at_elems(e.name),e.polynoms,e.model) if e.model.has_physics() else None
strengths = RWStrengthArray(self.get_at_elems(e.name),e.polynoms,e.model) if e.model.has_physics() else None
# Create unique refs of each function for this simulator
ms = e.attach(strengths,currents)
for m in ms:
Expand Down
2 changes: 1 addition & 1 deletion pyaml/magnet/cfm_magnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def attach(self, strengths: abstract.ReadWriteFloatArray, hardwares: abstract.Re
args = {"name":m[1]}
mclass:Magnet = _fmap[m[0]](ElementConfigModel(**args))
strength = RWMapper(strengths,idx)
hardware = RWMapper(hardwares,idx) if self.model.hasHardwareMapping() else None
hardware = RWMapper(hardwares,idx) if self.model.has_hardware() else None
l.append(mclass.attach(strength,hardware))
return l

Expand Down
1 change: 0 additions & 1 deletion pyaml/magnet/hcorrector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class HCorrector(Magnet):
def __init__(self, cfg: ConfigModel):
super().__init__(
cfg.name,
cfg.hardware if hasattr(cfg, "hardware") else None,
cfg.model if hasattr(cfg, "model") else None,
)
self._cfg = cfg
Expand Down
89 changes: 89 additions & 0 deletions pyaml/magnet/identity_cfm_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import numpy as np
from pydantic import BaseModel,ConfigDict

from .model import MagnetModel
from .. import PyAMLException
from ..configuration.curve import Curve
from ..control.deviceaccess import DeviceAccess

# Define the main class name for this module
PYAMLCLASS = "IdentityCFMagnetModel"

class ConfigModel(BaseModel):

model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid")

multipoles: list[str]
"""List of supported functions: A0,B0,A1,B1,etc (i.e. [B0,A1,B2])"""
powerconverters: list[DeviceAccess] | None = None
"""Power converter device to apply current"""
physics: list[DeviceAccess] | None = None
"""Magnet device to apply strength"""
units: list[str]
"""List of strength unit (i.e. ['rad','m-1','m-2'])"""

class IdentityCFMagnetModel(MagnetModel):
"""
Class that map values to underlying devices without conversion
"""

def __init__(self, cfg: ConfigModel):
self._cfg = cfg

# Check config
self.__nbFunction: int = len(cfg.multipoles)

if cfg.physics is None and cfg.powerconverter is None:
raise Exception("Invalid IdentityCFMagnetModel configuration, physics or powerconverters device required")
if cfg.physics is not None and cfg.powerconverter is not None:
raise Exception("Invalid IdentityCFMagnetModel configuration, physics or powerconverters device required but not both")
if cfg.physics:
self.__devices = cfg.physics
else:
self.__devices = cfg.powerconverter

self.__nbDev: int = len(self.__devices)

self.__check_len(cfg.units,"units",self.__nbFunction)

def __check_len(self,obj,name,expected_len):
lgth = len(obj)
if lgth != expected_len:
raise Exception(
f"{name} does not have the expected "
f"number of items ({expected_len} items expected but got {lgth})"
)

def compute_hardware_values(self, strengths: np.array) -> np.array:
return strengths

def compute_strengths(self, currents: np.array) -> np.array:
return currents

def get_strength_units(self) -> list[str]:
return self._cfg.units

def get_hardware_units(self) -> list[str]:
return self._cfg.units

def read_hardware_values(self) -> np.array:
return np.array([p.get() for p in self.__devices])

def readback_hardware_values(self) -> np.array:
return np.array([p.readback() for p in self.__devices])

def send_hardware_values(self, currents: np.array):
for idx, p in enumerate(self.__devices):
p.set(currents[idx])

def get_devices(self) -> list[DeviceAccess]:
return self.__devices

def set_magnet_rigidity(self, brho: np.double):
pass

def has_physics(self) -> bool:
return self._cfg.physics is not None

def has_hardware(self) -> bool:
return self._cfg.powerconverters is not None
77 changes: 77 additions & 0 deletions pyaml/magnet/identity_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import numpy as np
from pydantic import BaseModel,ConfigDict

from .model import MagnetModel
from .. import PyAMLException
from ..configuration.curve import Curve
from ..control.deviceaccess import DeviceAccess

# Define the main class name for this module
PYAMLCLASS = "IdentityMagnetModel"

class ConfigModel(BaseModel):

model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid")

powerconverter: DeviceAccess|None = None
"""Power converter device to apply current"""
physics: DeviceAccess|None = None
"""Magnet device to apply strength"""
unit: str
"""Unit of the strength (i.e. 1/m or m-1)"""

class IdentityMagnetModel(MagnetModel):
"""
Class that map value to underlying device without conversion
"""

def __init__(self, cfg: ConfigModel):
self._cfg = cfg
self.__unit = cfg.unit
if cfg.physics is None and cfg.powerconverter is None:
raise Exception("Invalid IdentityMagnetModel configuration, physics or powerconverter device required")
if cfg.physics is not None and cfg.powerconverter is not None:
raise Exception("Invalid IdentityMagnetModel configuration, physics or powerconverter device required but not both")
if cfg.physics:
self.__device = cfg.physics
else:
self.__device = cfg.powerconverter

def compute_hardware_values(self, strengths: np.array) -> np.array:
return strengths

def compute_strengths(self, currents: np.array) -> np.array:
return currents

def get_strength_units(self) -> list[str]:
return [self.__unit]

def get_hardware_units(self) -> list[str]:
return [self.__unit]

def read_hardware_values(self) -> np.array:
return [self.__device.get()]

def readback_hardware_values(self) -> np.array:
return [self.__device.readback()]

def send_hardware_values(self, currents: np.array):
self.__device.set(currents[0])

def get_devices(self) -> list[DeviceAccess]:
return [self.__device]

def set_magnet_rigidity(self, brho: np.double):
pass

def has_physics(self) -> bool:
return self._cfg.physics is not None

def has_hardware(self) -> bool:
return self._cfg.powerconverter is not None

def __repr__(self):
return "%s(unit=%s)" % (
self.__class__.__name__,
self.__unit,
)
4 changes: 2 additions & 2 deletions pyaml/magnet/linear_cfm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def read_hardware_values(self) -> np.array:
def readback_hardware_values(self) -> np.array:
return np.array([p.readback() for p in self._cfg.powerconverters])

def send_harware_values(self, currents: np.array):
def send_hardware_values(self, currents: np.array):
for idx, p in enumerate(self._cfg.powerconverters):
p.set(currents[idx])

Expand All @@ -159,6 +159,6 @@ def get_devices(self) -> list[DeviceAccess]:
def set_magnet_rigidity(self, brho: np.double):
self._brho = brho

def hasHardwareMapping(self) -> bool:
def has_hardware(self) -> bool:
return (self.__nbPS == self.__nbFunction) and np.allclose(self.__matrix, np.eye(self.__nbFunction))

2 changes: 1 addition & 1 deletion pyaml/magnet/linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def read_hardware_values(self) -> np.array:
def readback_hardware_values(self) -> np.array:
return [self.__ps.readback()]

def send_harware_values(self, currents: np.array):
def send_hardware_values(self, currents: np.array):
self.__ps.set(currents[0])

def get_devices(self) -> list[DeviceAccess]:
Expand Down
Loading
Loading