Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions socs/agents/hwp_supervisor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,28 @@ def __post_init__(self):
"Must be in ['iboot', 'synaccess']"
)

@dataclass
class PowerCycleGripper:
"""
Power Cycle Gripper electronics.

Attributes
-----------
driver_power_agent_type: Literal['iboot, synaccess']
Type of agent used for controlling the gripper power
outlets: List[int]
List of outlets to power cycle in this order
"""
gripper_power_agent_type: Literal['iboot, synaccess']
outlets: List[int]

def __post_init__(self):
if self.gripper_power_agent_type not in ['iboot', 'synaccess']:
raise ValueError(
f"Invalid gripper_power_agent_type: {self.gripper_power_agent_type}. "
"Must be in ['iboot', 'synaccess']"
)

completed_states = (Done, Error, Abort, Idle)


Expand Down Expand Up @@ -1383,9 +1405,24 @@ def set_outlet_state(outlet: int, outlet_state: bool):
else:
kw = {'outlet': outlet, 'on': outlet_state}
self.run_and_validate(clients.driver_iboot.set_outlet, kwargs=kw)
for outlet in state.outlets:
set_outlet_state(outlet, False)
self.action.set_state(ControlState.Done(success=True))
return

elif isinstance(state, ControlState.PowerCycleGripper):
def set_outlet_state(outlet: int, outlet_state: bool):
if state.gripper_power_agent_type == 'iboot':
kw = {'outlet': outlet, 'state': 'on' if outlet_state else 'off'}
else:
kw = {'outlet': outlet, 'on': outlet_state}
self.run_and_validate(clients.gripper_iboot.set_outlet, kwargs=kw)
for outlet in state.outlets:
set_outlet_state(outlet, False)
time.sleep(2)
for outlet in state.outlets:
set_outlet_state(outlet, True)
time.sleep(2)
self.action.set_state(ControlState.Done(success=True))
return

Expand Down Expand Up @@ -1460,6 +1497,8 @@ def __init__(self, agent, args):
self.driver_iboot_outlets = args.driver_iboot_outlets
self.driver_power_cycle_twice = args.driver_power_cycle_twice
self.driver_power_cycle_wait_time = args.driver_power_cycle_wait_time
self.gripper_power_agent_type = args.gripper_power_agent_type
self.gripper_iboot_outlets = args.gripper_iboot_outlets

self.shutdown_no_data_timeout = args.shutdown_no_data_timeout
self.shutdown_mode = False
Expand Down Expand Up @@ -2004,6 +2043,34 @@ def disable_driver_board(self, session, params):
action.sleep_until_complete(session=session)
return action.success, f"Completed with state: {action.cur_state_info.state}"

def power_cycle_gripper(self, session, params):
"""power_cycle_gripper()

**Task** - Power cycle gripper electronics

Notes
--------

Example of ``session.data``::

>>> session['data']
{'action':
{'action_id': 3,
'completed': True,
'cur_state': {'class': 'Done', 'msg': None, 'success': True},
'state_history': List[ConrolState],
'success': False}
}
"""
kw = {
'gripper_power_agent_type': self.gripper_power_agent_type,
'outlets': self.gripper_iboot_outlets,
}
state = ControlState.PowerCycleGripper(**kw)
action = self.control_state_machine.request_new_action(state)
action.sleep_until_complete(session=session)
return action.success, f"Completed with state: {action.cur_state_info.state}"

def cancel_shutdown(self, session, params):
"""cancel_shutdown()
**Task** - Cancels shutdown mode
Expand Down Expand Up @@ -2145,6 +2212,7 @@ def main(args=None):
agent.register_task('ungrip_hwp', hwp.ungrip_hwp)
agent.register_task('enable_driver_board', hwp.enable_driver_board)
agent.register_task('disable_driver_board', hwp.disable_driver_board)
agent.register_task('power_cycle_gripper', hwp.power_cycle_gripper)
agent.register_task('abort_action', hwp.abort_action)
agent.register_task('cancel_shutdown', hwp.cancel_shutdown)

Expand Down