Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions apsuite/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""."""
import logging as _log
import sys as _sys
import os as _os
from copy import deepcopy as _dcopy
from threading import Event as _Event

Expand Down Expand Up @@ -153,6 +154,85 @@ def wait_for_connection(self, timeout=None):
return False
return True

def _get_equipments_state(self, equipments):
"""."""
equipments_state = dict()
for name, equip in equipments.items():
state = {
attr: getattr(equip, attr) for attr in dir(equip) if
(
not attr.startswith("_")
and not callable(getattr(equip, attr))
)
}
equipments_state[name] = state
return equipments_state

def get_machine_state(self):
"""."""
if not self.isonline:
return
devs_state = self._get_equipments_state(self.devices)
pvs_state = self._get_equipments_state(self.pvs)
machine_state = {"devices": devs_state,
"pvs" : pvs_state}
return machine_state

def save_machine_state(self, fname):
"""."""
state = self.get_machine_state()
_save(data=state, fname=fname, overwrite=False)

def set_machine_state(self, machine_state):
"""."""
if not self.isonline:
print("Failed. Not online.")
return

for key, entries in machine_state.items():
if key == "devices":
equipments = self.devices
elif key == "pvs":
equipments = self.pvs
else:
raise ValueError(f"Unexpected key in saved_dict: {key}")
# Update attributes for each device or PV
for equip_name, attributes in entries.items():
equipment = equipments[equip_name] # Retrieve device/PV
for attr_name, attr_value in attributes.items():
try:
setattr(equipment, attr_name, attr_value)
except AttributeError:
# no writing access PV
pass

def compare_states(self, state1, state2):
"""."""
if state1 == state2:
return True
# TODO: if not equal, return the diff
raise NotImplementedError

def _finish(self, fname):
try:
init_state = _load(fname)
except FileNotFoundError:
print(f"File {fname} does not exist.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
current_state = self.get_machine_state()
res = self.compare_states(current_state, init_state)
if not res:
print("Restoring init state.")
self.set_machine_state(init_state)
# TODO: wait and checking methods
try:
_os.remove(fname)
print(f"File '{fname}' successfully deleted.")
except OSError as e:
print(f"Error: {e.strerror} - Unable to delete file '{fname}'.")
return


class ThreadedMeasBaseClass(MeasBaseClass):
"""."""
Expand Down