Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def __init__(self, robot_description : Union[str, Path]):
# compute main trees of the robot model
self.compute_subtrees()


# array to store all analyzed points
self.analyzed_points = np.array([], dtype=object)

# array to store all configurations for the robot model
self.configurations = np.array([], dtype=object)
Expand Down Expand Up @@ -422,11 +423,17 @@ def compute_all_configurations(self, range : int, resolution : int, end_joint_id

# Create an array to store all configurations
configurations = []

# restore analyzed points array
self.analyzed_points = np.array([], dtype=object)

# Iterate over the range to compute all configurations
for x in np.arange(-range, range , resolution):
for y in np.arange(-range, range , resolution):
for z in np.arange(-range/2, range , resolution):

self.analyzed_points = np.append(self.analyzed_points, {"position" : [x, y, z]})

target_position = pin.SE3(np.eye(3), np.array([x, y, z]))
new_q = self.compute_inverse_kinematics(q, target_position, end_joint_id)

Expand Down Expand Up @@ -873,6 +880,15 @@ def get_zero_acceleration(self) -> np.ndarray:
return a0


def get_analyzed_points(self) -> np.ndarray:
"""
Get the analyzed points during the computation of all configurations.

:return: Array of analyzed points.
"""

return self.analyzed_points

def get_random_configuration(self) -> tuple[np.ndarray, np.ndarray]:
"""
Get a random configuration for configuration and velocity vectors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ def __init__(self):
self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10)

# Pusblisher for point cloud workspace area
self.publisher_workspace_area = self.create_publisher(MarkerArray, '/workspace_area', 10)
self.publisher_workspace_area = self.create_publisher(MarkerArray, '/workspace_area', qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1))

# Pusblisher for point cloud of maximum payloads in the workspace area
self.publisher_maximum_payloads = self.create_publisher(MarkerArray, '/maximum_payloads', 10)
self.publisher_maximum_payloads = self.create_publisher(MarkerArray, '/maximum_payloads', qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1))

# Publisher for point cloud of all analyzed points
self.publisher_analyzed_points = self.create_publisher(MarkerArray, '/analyzed_points', qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1))

# subscription to joint states
self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10)
Expand Down Expand Up @@ -103,6 +106,9 @@ def __init__(self):
# variable to store if there is a selected configuration from the workspace menu to visualize
self.selected_configuration = None

# variable to store analyzed points of the workspace area
self.marker_analyzed_points = None

# timer to compute the valid workspace area
self.timer_workspace_calculation = self.create_timer(1.0, self.workspace_calculation)
# timer to publish the selected configuration in joint states
Expand Down Expand Up @@ -161,6 +167,53 @@ def update_payload_selection(self):

self.links = links # Update the links to the current ones


def publisher_analyzed_point_markers(self):
"""
Publish the analyzed points in the workspace area.
This will publish all the points in the workspace area where the robot can reach with the current configuration.
"""
if self.menu is not None:

if len(self.robot_handler.get_analyzed_points()) == 0:
return
else:
if self.marker_analyzed_points is None:

self.marker_analyzed_points = MarkerArray()

# publish the analyzed points
for i, analyzed_point in enumerate(self.robot_handler.get_analyzed_points()):
point = Marker()
point.header.frame_id = self.robot_handler.get_root_name()
point.header.stamp = Time()
point.ns = f"analyzed_points_ns"
point.id = i
point.type = Marker.SPHERE
point.action = Marker.ADD
point.scale.x = 0.01
point.scale.y = 0.01
point.scale.z = 0.01
point.pose.position.x = analyzed_point['position'][0]
point.pose.position.y = analyzed_point['position'][1]
point.pose.position.z = analyzed_point['position'][2]
point.pose.orientation.w = 1.0
point.color.a = 1.0 # Alpha
point.color.r = 1.0 # Red
point.color.g = 1.0 # Green
point.color.b = 1.0 # Blue

self.marker_analyzed_points.markers.append(point)

self.publisher_analyzed_points.publish(self.marker_analyzed_points)

else:
# update with current time
for marker in self.marker_analyzed_points.markers:
marker.header.stamp = Time()

self.publisher_analyzed_points.publish(self.marker_analyzed_points)

def publish_payload_force(self):
"""
Publish the gravity force on the frame with id `id_force`.
Expand Down Expand Up @@ -227,6 +280,9 @@ def workspace_calculation(self):

# compute the maximum payloads for the valid configurations
self.valid_configurations = self.robot_handler.compute_maximum_payloads(self.valid_configurations)

# publish the analyzed points in the workspace area
self.publisher_analyzed_point_markers()

# insert the valid configurations in the menu
self.menu.insert_dropdown_configuration(self.valid_configurations)
Expand Down