diff --git a/socs/agents/hwp_supervisor/agent.py b/socs/agents/hwp_supervisor/agent.py index bc7cca84d..a161cca3f 100644 --- a/socs/agents/hwp_supervisor/agent.py +++ b/socs/agents/hwp_supervisor/agent.py @@ -915,6 +915,28 @@ def __post_init__(self): "Must be in ['iboot', 'synaccess']" ) + @dataclass + class PowerCycleGripper: + """ + Power Cycle Gripper electronics. + + Attributes + ----------- + driver_power_agent_type: Literal['iboot, synaccess'] + Type of agent used for controlling the gripper power + outlets: List[int] + List of outlets to power cycle in this order + """ + gripper_power_agent_type: Literal['iboot, synaccess'] + outlets: List[int] + + def __post_init__(self): + if self.gripper_power_agent_type not in ['iboot', 'synaccess']: + raise ValueError( + f"Invalid gripper_power_agent_type: {self.gripper_power_agent_type}. " + "Must be in ['iboot', 'synaccess']" + ) + completed_states = (Done, Error, Abort, Idle) @@ -1383,9 +1405,24 @@ def set_outlet_state(outlet: int, outlet_state: bool): else: kw = {'outlet': outlet, 'on': outlet_state} self.run_and_validate(clients.driver_iboot.set_outlet, kwargs=kw) + for outlet in state.outlets: + set_outlet_state(outlet, False) + self.action.set_state(ControlState.Done(success=True)) + return + elif isinstance(state, ControlState.PowerCycleGripper): + def set_outlet_state(outlet: int, outlet_state: bool): + if state.gripper_power_agent_type == 'iboot': + kw = {'outlet': outlet, 'state': 'on' if outlet_state else 'off'} + else: + kw = {'outlet': outlet, 'on': outlet_state} + self.run_and_validate(clients.gripper_iboot.set_outlet, kwargs=kw) for outlet in state.outlets: set_outlet_state(outlet, False) + time.sleep(2) + for outlet in state.outlets: + set_outlet_state(outlet, True) + time.sleep(2) self.action.set_state(ControlState.Done(success=True)) return @@ -1460,6 +1497,8 @@ def __init__(self, agent, args): self.driver_iboot_outlets = args.driver_iboot_outlets self.driver_power_cycle_twice = args.driver_power_cycle_twice self.driver_power_cycle_wait_time = args.driver_power_cycle_wait_time + self.gripper_power_agent_type = args.gripper_power_agent_type + self.gripper_iboot_outlets = args.gripper_iboot_outlets self.shutdown_no_data_timeout = args.shutdown_no_data_timeout self.shutdown_mode = False @@ -2004,6 +2043,34 @@ def disable_driver_board(self, session, params): action.sleep_until_complete(session=session) return action.success, f"Completed with state: {action.cur_state_info.state}" + def power_cycle_gripper(self, session, params): + """power_cycle_gripper() + + **Task** - Power cycle gripper electronics + + Notes + -------- + + Example of ``session.data``:: + + >>> session['data'] + {'action': + {'action_id': 3, + 'completed': True, + 'cur_state': {'class': 'Done', 'msg': None, 'success': True}, + 'state_history': List[ConrolState], + 'success': False} + } + """ + kw = { + 'gripper_power_agent_type': self.gripper_power_agent_type, + 'outlets': self.gripper_iboot_outlets, + } + state = ControlState.PowerCycleGripper(**kw) + action = self.control_state_machine.request_new_action(state) + action.sleep_until_complete(session=session) + return action.success, f"Completed with state: {action.cur_state_info.state}" + def cancel_shutdown(self, session, params): """cancel_shutdown() **Task** - Cancels shutdown mode @@ -2145,6 +2212,7 @@ def main(args=None): agent.register_task('ungrip_hwp', hwp.ungrip_hwp) agent.register_task('enable_driver_board', hwp.enable_driver_board) agent.register_task('disable_driver_board', hwp.disable_driver_board) + agent.register_task('power_cycle_gripper', hwp.power_cycle_gripper) agent.register_task('abort_action', hwp.abort_action) agent.register_task('cancel_shutdown', hwp.cancel_shutdown)