Skip to content

Attachment of orbit correction to simulator/control system and generic ResponseMatrix pyAML object#92

Merged
JeanLucPons merged 16 commits intomainfrom
feature/config-orbit-corr
Dec 12, 2025
Merged

Attachment of orbit correction to simulator/control system and generic ResponseMatrix pyAML object#92
JeanLucPons merged 16 commits intomainfrom
feature/config-orbit-corr

Conversation

@kparasch
Copy link
Copy Markdown
Contributor

No description provided.

return self.__get("Orbit tuning tool", name, self.__TUNING_TOOLS)

def add_orbit_tuning(self, orbit: Element):
orbit.element_holder = self
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeanLucPons do you think it is ok to pass the element_holder like this?

@JeanLucPons
Copy link
Copy Markdown
Contributor

JeanLucPons commented Dec 11, 2025

I did like you for the attachement process.
I put here my orbit.py which is working.
I let you check that I didn't make wrong assumption concerning hv corrector order.
It also use the fileloader patch for the json file.

Usage:

from pyaml.accelerator import Accelerator
sr = Accelerator.load("tests/config/EBSOrbit.yaml",use_fast_loader=False)
orbit = sr.live.get_orbit_tuning("ORBIT")
orbit.correct()

I added to EBSOrbit.yaml:

- type: pyaml.tuning_tools.orbit
  name: ORBIT
  bpm_array_name: BPM
  hcorr_array_name: HCorr
  vcorr_array_name: VCorr
  singular_values: 162
  response_matrix_file: ideal_orm_disp.json
import logging
from pathlib import Path
from typing import Literal, Optional
from typing import TYPE_CHECKING

from ..common.element import Element,ElementConfigModel
from ..external.pySC.pySC import ResponseMatrix
from ..external.pySC.pySC.apps import orbit_correction
from ..external.pySC_interface import pySCInterface
from ..configuration import get_root_folder
from ..arrays.magnet_array import MagnetArray

if TYPE_CHECKING:
    from ..common.element_holder import ElementHolder

try:
    from typing import Self  # Python 3.11+
except ImportError:
    from typing_extensions import Self  # Python 3.10 and earlier

logger = logging.getLogger(__name__)
logging.getLogger("pyaml.external.pySC").setLevel(logging.WARNING)

PYAMLCLASS = "Orbit"


class ConfigModel(ElementConfigModel):
    """
    Orbit tuning tool configuration

    Parameters
    ----------

    bpm_array_name: str
        Name of BPM array used to get the orbit
    hcorr_array_name: str
        Name of horizontal steerer array used to correct the orbit
    vcorr_array_name: str
        Name of vertical steerer array used to correct the orbit
    singular_values: int
        Number of singular value used for orbit correction
    response_matrix_file: str
        Orbit response matrix file to load    
    """

    bpm_array_name: str
    hcorr_array_name: str
    vcorr_array_name: str
    singular_values: int
    response_matrix_file: str


class Orbit(Element):

    def __init__(self, cfg: ConfigModel):

        super().__init__(cfg.name)
        self._cfg = cfg
        rm_file_name: Path = get_root_folder() / cfg.response_matrix_file
        self.response_matrix = ResponseMatrix.from_json(rm_file_name)
        self._hcorr:"MagnetArray" = None
        self._vcorr:"MagnetArray" = None
        self._hvcorr:"MagnetArray" = None

    def correct(
        self,
        reference=None,
        gain: float = 1.0,
        plane: Optional[Literal["H", "V"]] = None,
    ):        
        self.check_peer()

        # Lazilly attach magnet arrays
        if self._hcorr is None:
            self._hcorr = self._peer.get_magnets(self._cfg.hcorr_array_name)
            self._vcorr = self._peer.get_magnets(self._cfg.vcorr_array_name)
            hvElts = []
            hvElts.extend(self._hcorr)
            hvElts.extend(self._vcorr)
            self._hvcorr = MagnetArray("HVCorr",hvElts)


        interface = pySCInterface(
            element_holder=self._peer,
            bpm_array_name=self._cfg.bpm_array_name,
            hcorr_array_name=self._cfg.hcorr_array_name,
            vcorr_array_name=self._cfg.vcorr_array_name,
        )

        if plane is None or plane == "H":
            trims_h = orbit_correction(
                interface=interface,
                response_matrix=self.response_matrix,
                method="svd_values",
                parameter=self._cfg.singular_values,
                zerosum=True,
                apply=False,
                plane="H",
                reference=reference,
            )

        if plane is None or plane == "V":
            trims_v = orbit_correction(
                interface=interface,
                response_matrix=self.response_matrix,
                method="svd_values",
                parameter=self._cfg.singular_values,
                zerosum=False,
                apply=False,
                plane="V",
                reference=reference,
            )

        if plane is None:
            data_to_send_hv = self._hvcorr.strengths.get()
            for idx,name in enumerate(trims_h.keys()):
                data_to_send_hv[idx] += trims_h[name] * gain
            l = len(trims_h.keys())
            for idx,name in enumerate(trims_v.keys()):
                data_to_send_hv[idx+l] += trims_v[name] * gain
            self._hvcorr.strengths.set(data_to_send_hv)
        elif plane == "H":
            data_to_send_h = self._hcorr.strengths.get()
            for idx,name in enumerate(trims_h.keys()):
                data_to_send_h[idx] += trims_h[name] * gain
            self._hcorr.strengths.set(data_to_send_h)
        elif plane == "V":
            data_to_send_v = self._vcorr.strengths.get()
            for idx,name in enumerate(trims_v.keys()):
                data_to_send_v[idx] += trims_v[name] * gain
            self._vcorr.strengths.set(data_to_send_v)
    
    def attach(
        self,
        peer: "ElementHolder",
    ) -> Self:
        """
        Create a new reference to attach this tune object to a simulator
        or a control system.
        """
        obj = self.__class__(self._cfg)
        obj._peer = peer
        obj.response_matrix = self.response_matrix # Copy only ref
        return obj

@kparasch
Copy link
Copy Markdown
Contributor Author

kparasch commented Dec 11, 2025

@JeanLucPons
The assumption on the ordering will "usually" work depending how the response matrix was built, because the keys in trims comes from response_matrix.input_names.

A more robust way would be to iterate over names of the array:

# join h and v
trims = {}
for key in trims_h.keys():
    trims[key] = trims_h[key]
for key in trims_h.keys():
    trims[key] = trims_v[key]


arr_names = self._hvcorr.names()
data_to_send_hv = self._hvcorr.strengths.get()
for idx,name in enumerate(arr_names):
     data_to_send_hv[idx] += trims[name] * gain

and we should probably also make a check that all correctors from trims will be set in the arrays.

for name in trims.keys():
    assert name in arr_names

self._hvcorr.strengths.set(data_to_send_hv)

Do you want to merge the branches and one of us continues?

@JeanLucPons JeanLucPons force-pushed the feature/config-orbit-corr branch from 6ada92f to 27d4b01 Compare December 11, 2025 10:03
@JeanLucPons
Copy link
Copy Markdown
Contributor

I rebased and merged my mods.
I let you optimize the join of h and v.
The sample below is working:

from pyaml.accelerator import Accelerator
sr = Accelerator.load("tests/config/EBSOrbit.yaml",use_fast_loader=False)
sr.live.orbit.correct()

@kparasch
Copy link
Copy Markdown
Contributor Author

@JeanLucPons This is basically finished except one thing:

    def post_init(self):
        self._hcorr = self._peer.get_magnets(self._cfg.hcorr_array_name)
        self._vcorr = self._peer.get_magnets(self._cfg.vcorr_array_name)
        hvElts = []
        hvElts.extend(self._hcorr)
        hvElts.extend(self._vcorr)
        self._hvcorr = MagnetArray("HVCorr", hvElts)

Here we create a magnet family with a hardcoded name "HVCorr". I think we should not do this so that it doesn't break if someone decides to define a magnet family called "HVCorr".
I don't have any good solution.

@kparasch kparasch marked this pull request as ready for review December 11, 2025 17:20
@JeanLucPons
Copy link
Copy Markdown
Contributor

@JeanLucPons This is basically finished except one thing:

    def post_init(self):
        self._hcorr = self._peer.get_magnets(self._cfg.hcorr_array_name)
        self._vcorr = self._peer.get_magnets(self._cfg.vcorr_array_name)
        hvElts = []
        hvElts.extend(self._hcorr)
        hvElts.extend(self._vcorr)
        self._hvcorr = MagnetArray("HVCorr", hvElts)

Here we create a magnet family with a hardcoded name "HVCorr". I think we should not do this so that it doesn't break if someone decides to define a magnet family called "HVCorr". I don't have any good solution.

In fact this name does not conflict as this array is not added in the holder. This is as for dynamic array which have no name for the moment. You can also use "" if you prefer.

@kparasch
Copy link
Copy Markdown
Contributor Author

Ok great! I have then used "" since anyway the name is not supposed to be seen by anyone I think.

The PR is ready to be merged.

We can now do:

sr.live.orbit.correct()
sr.design.orbit.correct()

@JeanLucPons JeanLucPons self-requested a review December 12, 2025 09:21
Copy link
Copy Markdown
Contributor

@JeanLucPons JeanLucPons left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kostas.

@JeanLucPons JeanLucPons merged commit 2770f81 into main Dec 12, 2025
3 checks passed
@JeanLucPons JeanLucPons deleted the feature/config-orbit-corr branch December 12, 2025 09:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants