Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pyaml/arrays/bpm_array.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ..common.abstract import ReadFloatArray
from ..bpm.bpm import BPM
from ..control.deviceaccesslist import DeviceAccessList
from ..common.exception import PyAMLException
from .element_array import get_peer_from_array

import numpy as np

Expand Down Expand Up @@ -72,13 +72,10 @@ def __init__(self,arrayName:str,bpms:list[BPM],use_aggregator = True):
use_aggregator : bool
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in bpms)
holder = bpms[0]._peer if len(bpms)>0 else None
if holder is None or any([m._peer!=holder for m in bpms]):
raise PyAMLException(f"BPMArray {arrayName} : All elements must be attached to the same instance of either a Simulator or a ControlSystem")

super().__init__(i for i in bpms)
self.__name = arrayName
holder = get_peer_from_array(self)

self.__hvpos = RWBPMPosition(arrayName,bpms)
self.__hpos = RWBPMSinglePosition(arrayName,bpms,0)
self.__vpos = RWBPMSinglePosition(arrayName,bpms,1)
Expand All @@ -89,6 +86,9 @@ def __init__(self,arrayName:str,bpms:list[BPM],use_aggregator = True):
self.__hpos.set_aggregator(aggs[1])
self.__vpos.set_aggregator(aggs[2])

def get_name(self) -> str:
return self.__name

@property
def positions(self) -> RWBPMPosition:
"""
Expand Down
15 changes: 15 additions & 0 deletions pyaml/arrays/cfm_magnet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .array import ArrayConfigModel,ArrayConfig
from ..common.element_holder import ElementHolder

# Define the main class name for this module
PYAMLCLASS = "CombinedFunctionMagnet"

class ConfigModel(ArrayConfigModel):...

class CombinedFunctionMagnet(ArrayConfig):

def __init__(self, cfg: ArrayConfigModel):
super().__init__(cfg)

def fill_array(self,holder:ElementHolder):
holder.fill_cfm_magnet_array(self._cfg.name,self._cfg.elements)
129 changes: 129 additions & 0 deletions pyaml/arrays/cfm_magnet_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from ..common.abstract import ReadWriteFloatArray
from ..magnet.cfm_magnet import CombinedFunctionMagnet
from .element_array import get_peer_from_array
import numpy as np

#TODO handle aggregator for CFM

class RWMagnetStrengths(ReadWriteFloatArray):

def __init__(self, name:str, magnets:list[CombinedFunctionMagnet]):
self.__name = name
self.__magnets = magnets
self.__nb = sum(m.nb_multipole() for m in magnets)

# Gets the values
def get(self) -> np.array:
r = np.zeros(self.__nb)
idx = 0
for m in self.__magnets:
r[idx:idx+m.nb_multipole()] = m.strengths.get()
idx+=m.nb_multipole()
return r

# Sets the values
def set(self, value:np.array):
nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value
idx = 0
for m in self.__magnets:
m.strengths.set(nvalue[idx:idx+m.nb_multipole()])
idx+=m.nb_multipole()

# Sets the values and waits that the read values reach their setpoint
def set_and_wait(self, value:np.array):
raise NotImplementedError("Not implemented yet.")

# Gets the unit of the values
def unit(self) -> list[str]:
r = []
for m in self.__magnets:
r.extend(m.strengths.unit())
return r

class RWMagnetHardwares(ReadWriteFloatArray):

def __init__(self, name:str, magnets:list[CombinedFunctionMagnet]):
self.__name = name
self.__magnets = magnets
self.__nb = sum(m.nb_multipole() for m in magnets)

# Gets the values
def get(self) -> np.array:
r = np.zeros(self.__nb)
idx = 0
for m in self.__magnets:
r[idx:idx+m.nb_multipole()] = m.hardwares.get()
idx+=m.nb_multipole()
return r

# Sets the values
def set(self, value:np.array):
nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value
idx = 0
for m in self.__magnets:
m.hardwares.set(nvalue[idx:idx+m.nb_multipole()])
idx+=m.nb_multipole()

# Sets the values and waits that the read values reach their setpoint
def set_and_wait(self, value:np.array):
raise NotImplementedError("Not implemented yet.")

# Gets the unit of the values
def unit(self) -> list[str]:
r = []
for m in self.__magnets:
r.extend(m.hardwares.unit())
return r


class CombinedFunctionMagnetArray(list[CombinedFunctionMagnet]):
"""
Class that implements access to a magnet array
"""

def __init__(self,arrayName:str,magnets:list[CombinedFunctionMagnet],use_aggregator = False):
"""
Construct a magnet array

Parameters
----------
arrayName : str
Array name
magnets: list[Magnet]
Magnet list, all elements must be attached to the same instance of
either a Simulator or a ControlSystem.
use_aggregator : bool
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in magnets)
self.__name = arrayName
holder = get_peer_from_array(self)

self.__rwstrengths = RWMagnetStrengths(arrayName,magnets)
self.__rwhardwares = RWMagnetHardwares(arrayName,magnets)

if use_aggregator:
raise("Aggregator not implemented for CombinedFunctionMagnetArray")

def get_name(self) -> str:
return self.__name

@property
def strengths(self) -> RWMagnetStrengths:
"""
Give access to strength of each magnet of this array
"""
return self.__rwstrengths

@property
def hardwares(self) -> RWMagnetHardwares:
"""
Give access to hardware value of each magnet of this array
"""
return self.__rwhardwares






15 changes: 15 additions & 0 deletions pyaml/arrays/element.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .array import ArrayConfigModel,ArrayConfig
from ..common.element_holder import ElementHolder

# Define the main class name for this module
PYAMLCLASS = "Element"

class ConfigModel(ArrayConfigModel):...

class Element(ArrayConfig):

def __init__(self, cfg: ArrayConfigModel):
super().__init__(cfg)

def fill_array(self,holder:ElementHolder):
holder.fill_element_array(self._cfg.name,self._cfg.elements)
45 changes: 45 additions & 0 deletions pyaml/arrays/element_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from ..common.element import Element
from ..common.exception import PyAMLException

def get_peer_from_array(array):
"""
Returns the peer (Simulator or ControlSystem) of an element list
"""
peer = array[0]._peer if len(array)>0 else None
if peer is None or any([m._peer!=peer for m in array]):
raise PyAMLException(f"{array.__class__.__name__} {array.get_name()}: All elements must be attached to the same instance of either a Simulator or a ControlSystem")
return peer

class ElementArray(list[Element]):
"""
Class that implements access to a magnet array
"""

def __init__(self,arrayName:str,elements:list[Element]):
"""
Construct an element array

Parameters
----------
arrayName : str
Array name
elements: list[Element]
Element list, all elements must be attached to the same instance of
either a Simulator or a ControlSystem.
"""
super().__init__(i for i in elements)
self.__name = arrayName
holder = get_peer_from_array(self)
self.__name = arrayName

def get_name(self) -> str:
return self.__name

def names(self) -> list[str]:
return [e.get_name() for e in self]






12 changes: 6 additions & 6 deletions pyaml/arrays/magnet_array.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from ..common.abstract import ReadWriteFloatArray
from ..magnet.magnet import Magnet
from ..common.abstract_aggregator import ScalarAggregator
from ..common.exception import PyAMLException

from .element_array import get_peer_from_array
import numpy as np

class RWMagnetStrength(ReadWriteFloatArray):
Expand Down Expand Up @@ -97,11 +96,9 @@ def __init__(self,arrayName:str,magnets:list[Magnet],use_aggregator = True):
Use aggregator to increase performance by using paralell access to underlying devices.
"""
super().__init__(i for i in magnets)
holder = magnets[0]._peer if len(magnets)>0 else None
if holder is None or any([m._peer!=holder for m in magnets]):
raise PyAMLException(f"MagnetArray {arrayName} : All elements must be attached to the same instance of either a Simulator or a ControlSystem")

self.__name = arrayName
holder = get_peer_from_array(self)

self.__rwstrengths = RWMagnetStrength(arrayName,magnets)
self.__rwhardwares = RWMagnetHardware(arrayName,magnets)

Expand All @@ -111,6 +108,9 @@ def __init__(self,arrayName:str,magnets:list[Magnet],use_aggregator = True):
self.__rwstrengths.set_aggregator(aggs)
self.__rwhardwares.set_aggregator(aggh)

def get_name(self) -> str:
return self.__name

@property
def strengths(self) -> RWMagnetStrength:
"""
Expand Down
Loading
Loading