From e03770b5b730ea95280417c1939dca934077c73e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Mon, 27 Nov 2023 17:23:31 +0100 Subject: [PATCH 01/10] Adds a simple pyplot visualizer for kinematic chains --- src/pytorch_kinematics/chain.py | 7 ++++ src/pytorch_kinematics/visualize.py | 63 +++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 src/pytorch_kinematics/visualize.py diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index d837567..2902f01 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -505,3 +505,10 @@ def convert_serial_inputs_to_chain_inputs(self, th, end_only: bool): if frame.joint.joint_type != 'fixed': th[..., jnt_idx] = partial_th_i return frame_indices, th + + def visualize(self, th: torch.Tensor): + """Visualize the robot chain in joint configuration th.""" + from pytorch_kinematics.visualize import visualize + fk = self.forward_kinematics(th, end_only=False) + arr = np.vstack([fk[f.name].get_matrix().cpu().detach().numpy() for f in self._serial_frames]) + visualize(arr) diff --git a/src/pytorch_kinematics/visualize.py b/src/pytorch_kinematics/visualize.py new file mode 100644 index 0000000..e512cc4 --- /dev/null +++ b/src/pytorch_kinematics/visualize.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# Author: Jonathan Külz +# Date: 27.11.23 +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import axes3d +import numpy as np + + +def visualize(frames: np.ndarray): + """ + Visualizes a sequence of frames. + Args: + frames: An nx4x4 array of frame placements. Assumes the first frame is the base frame (no implicit world frame + is assumed). + + Returns: None + """ + axis_size = np.max(frames[:, :3, 3]) - np.min(frames[:, :3, 3]) * .8 + center = np.mean(frames[:, :3, 3], axis=0) + frame_scale = axis_size / 15 + ax = plt.figure(figsize=(12, 12)).add_subplot(projection='3d') + frames = frames.reshape(-1, 4, 4) + num_frames = frames.shape[0] + for i, frame in enumerate(frames): + draw_frame(ax, frame, scale=frame_scale) + if i < num_frames - 1: + draw_link(ax, frame[:3, 3], frames[i + 1][:3, 3]) + ax.set_xlim(center[0] - axis_size, center[0] + axis_size) + ax.set_ylim(center[1] - axis_size, center[1] + axis_size) + ax.set_zlim(center[2] - axis_size, center[2] + axis_size) + plt.show() + + +def draw_frame(ax: axes3d.Axes3D, frame: np.ndarray, scale: float = .1): + """ + Draws a frame. + Args: + ax: The axis to draw on. + frame: The frame placement. + scale: The length of the frame axes. + + Returns: None + """ + origin = frame[:3, 3] + x = frame[:3, 0] + y = frame[:3, 1] + z = frame[:3, 2] + ax.quiver(origin[0], origin[1], origin[2], x[0], x[1], x[2], length=scale, color='r') + ax.quiver(origin[0], origin[1], origin[2], y[0], y[1], y[2], length=scale, color='g') + ax.quiver(origin[0], origin[1], origin[2], z[0], z[1], z[2], length=scale, color='b') + + +def draw_link(ax: axes3d.Axes3D, p0: np.ndarray, p1: np.ndarray): + """ + Draws a line from p0 to p1. + Args: + ax: The axis to draw on. + p0: The start point. + p1: The end point. + + Returns: None + """ + ax.plot([p0[0], p1[0]], [p0[1], p1[1]], [p0[2], p1[2]], color='black', linewidth=3) From 53da836291973348b767c125b1b62b0134a1030c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Tue, 28 Nov 2023 18:26:29 +0100 Subject: [PATCH 02/10] add axis labels --- src/pytorch_kinematics/visualize.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/pytorch_kinematics/visualize.py b/src/pytorch_kinematics/visualize.py index e512cc4..9762831 100644 --- a/src/pytorch_kinematics/visualize.py +++ b/src/pytorch_kinematics/visualize.py @@ -6,12 +6,13 @@ import numpy as np -def visualize(frames: np.ndarray): +def visualize(frames: np.ndarray, show: bool = False) -> plt.Axes: """ Visualizes a sequence of frames. Args: frames: An nx4x4 array of frame placements. Assumes the first frame is the base frame (no implicit world frame is assumed). + show: Whether to show the plot. Returns: None """ @@ -28,7 +29,12 @@ def visualize(frames: np.ndarray): ax.set_xlim(center[0] - axis_size, center[0] + axis_size) ax.set_ylim(center[1] - axis_size, center[1] + axis_size) ax.set_zlim(center[2] - axis_size, center[2] + axis_size) - plt.show() + ax.set_xlabel('x') + ax.set_ylabel('y') + ax.set_zlabel('z') + if show: + plt.show() + return ax def draw_frame(ax: axes3d.Axes3D, frame: np.ndarray, scale: float = .1): From 2c635d1ac51486009433107cf9593e6ecaa85029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Tue, 28 Nov 2023 18:27:09 +0100 Subject: [PATCH 03/10] Implements a first draft of a Diffchain robot --- src/pytorch_kinematics/chain.py | 30 ++-- src/pytorch_kinematics/diffchain.py | 89 +++++++++ .../transforms/parameter_conversions.py | 55 ++++++ .../transforms/parameterized_transform.py | 170 ++++++++++++++++++ .../transforms/transform3d.py | 4 +- 5 files changed, 335 insertions(+), 13 deletions(-) create mode 100644 src/pytorch_kinematics/diffchain.py create mode 100644 src/pytorch_kinematics/transforms/parameter_conversions.py create mode 100644 src/pytorch_kinematics/transforms/parameterized_transform.py diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index d837567..7ea9a66 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -1,5 +1,5 @@ from functools import lru_cache -from typing import Optional, Sequence +from typing import Optional, Sequence, Tuple import numpy as np import torch @@ -189,7 +189,7 @@ def _find_link_recursive(name, frame) -> Optional[Link]: @staticmethod def _get_joints(frame, exclude_fixed=True): joints = [] - if exclude_fixed and frame.joint.joint_type != "fixed": + if not exclude_fixed or frame.joint.joint_type != "fixed": joints.append(frame.joint) for child in frame.children: joints.extend(Chain._get_joints(child)) @@ -285,6 +285,9 @@ def _get_link_names(frame): link_names.extend(Chain._get_link_names(child)) return link_names + def get_joint_offset(self, chain_idx) -> Optional[torch.Tensor]: + return self.joint_offsets[chain_idx] + def get_link_names(self): names = self._get_link_names(self._root) return sorted(set(names), key=names.index) @@ -293,6 +296,18 @@ def get_link_names(self): def get_frame_indices(self, *frame_names): return torch.tensor([self.frame_to_idx[n] for n in frame_names], dtype=torch.long, device=self.device) + def _get_jnt_transform(self, th) -> Tuple[torch.Tensor, torch.Tensor]: + """ + compute all joint transforms at once first in order to handle multiple joint types without branching, + we create all possible transforms for all joint types and then select the appropriate one for each joint. + Args: + th: The joint configuration to use + + Returns: A tuple of revolute, prismatic joint transforms + """ + axes_expanded = self.axes.unsqueeze(0).repeat(th.shape[0], 1, 1) + return axis_and_angle_to_matrix_44(axes_expanded, th), axis_and_d_to_pris_matrix(axes_expanded, th) + def forward_kinematics(self, th, frame_indices: Optional = None): """ Compute forward kinematics for the given joint values. @@ -312,14 +327,7 @@ def forward_kinematics(self, th, frame_indices: Optional = None): th = self.ensure_tensor(th) th = torch.atleast_2d(th) - b = th.shape[0] - axes_expanded = self.axes.unsqueeze(0).repeat(b, 1, 1) - - # compute all joint transforms at once first - # in order to handle multiple joint types without branching, we create all possible transforms - # for all joint types and then select the appropriate one for each joint. - rev_jnt_transform = axis_and_angle_to_matrix_44(axes_expanded, th) - pris_jnt_transform = axis_and_d_to_pris_matrix(axes_expanded, th) + rev_jnt_transform, pris_jnt_transform = self._get_jnt_transform(th) frame_transforms = {} b = th.shape[0] @@ -335,7 +343,7 @@ def forward_kinematics(self, th, frame_indices: Optional = None): if link_offset_i is not None: frame_transform = frame_transform @ link_offset_i - joint_offset_i = self.joint_offsets[chain_idx] + joint_offset_i = self.get_joint_offset(chain_idx) if joint_offset_i is not None: frame_transform = frame_transform @ joint_offset_i diff --git a/src/pytorch_kinematics/diffchain.py b/src/pytorch_kinematics/diffchain.py new file mode 100644 index 0000000..ff74223 --- /dev/null +++ b/src/pytorch_kinematics/diffchain.py @@ -0,0 +1,89 @@ +# Author: Jonathan Külz +# Date: 23.11.23 +from typing import List, Optional, Tuple, Union + +import numpy as np +import torch + +from pytorch_kinematics import SerialChain, Transform3d, build_chain_from_urdf +from pytorch_kinematics.transforms.parameterized_transform import (CONVENTION_IMPLEMENTATIONS, MDHTransform, + ParameterConvention, ParameterizedTransform) + + +class DiffChain(SerialChain): + """ + A serial kinematic chain, but with all joint offsets created from differentiable parameters. + """ + + def __init__(self, + chain: SerialChain, + end_frame_name: str, + root_frame_name: str = "", + batch_size: int = 1, + parameter_convention: Union[str, ParameterConvention] = ParameterConvention.MDH, + **kwargs): + """Initialize the DiffChain by providing a serial chain.""" + super().__init__(chain, end_frame_name, root_frame_name, **kwargs) + self.batch_size = batch_size + parameter_batch_size = (batch_size, self.n_joints) + self._convention: ParameterConvention = ParameterConvention(parameter_convention) + self._num_parameters = CONVENTION_IMPLEMENTATIONS[self._convention].get_num_parameters() + self.parameterized_offsets: ParameterizedTransform = \ + CONVENTION_IMPLEMENTATIONS[self._convention](default_batch_size=parameter_batch_size) + self.__setup() + + @classmethod + def from_serial_chain(cls, c: SerialChain): + """Creates a DiffChain from a SerialChain.""" + return cls(c, c._serial_frames[-1].name, c._serial_frames[0].name) + + @property + def parameters(self) -> torch.Tensor: + """Returns the underlying differentiable parameters""" + return self.parameterized_offsets.parameters + + def _get_jnt_transform(self, th) -> Tuple[torch.Tensor, torch.Tensor]: + """Override the serial chain method: The joint transform is contained in the joint offset already!""" + T = torch.eye(4, dtype=self.dtype, device=self.device).expand(self.batch_size, self.n_joints, 4, 4) + return T, T + + def get_joint_offset(self, chain_idx) -> Optional[torch.Tensor]: + """Returns the parameterized joint offset""" + joint_idx = self.joint_indices[chain_idx] + if joint_idx == -1: # This is not a parameterized, moving joint + return self.joint_offsets[chain_idx] + return self.parameterized_offsets[:, joint_idx, :, :].get_matrix() + + def forward_kinematics(self, th, end_only: bool = True) -> Transform3d: + """Overrides the SerialChain method to ensure using the parameters of this chain.""" + th = th.reshape((self.batch_size, self.n_joints)) + self.set_joint_parameters(th) + return super().forward_kinematics(th, end_only) + + def set_joint_parameters(self, th: torch.Tensor): + """ + Overrides the parameters that describe the current joint configuration. + """ + types = np.repeat(np.array([joint.joint_type for joint in self.get_joints()]).reshape(1, -1), self.batch_size, axis=0) + self.parameterized_offsets.update_joint_parameters(th, types) + + def __setup(self): + """Ensure that all parameters are differentiable -- create a single leaf node containing all parameters.""" + offset_matrix = torch.empty((self.n_joints, 4, 4)) + for i, frame in enumerate(self._serial_frames): + if frame.joint.joint_type == 'fixed': + continue + joint_idx = self.joint_indices[i] + offset_matrix[joint_idx, :, :] = frame.joint.offset.get_matrix() + + offset_matrix = torch.stack([offset_matrix] * self.batch_size, dim=0) + self.parameterized_offsets = MDHTransform.from_homogeneous(offset_matrix, + dtype=self.dtype, + device=self.device, + requires_grad=True) + + +def build_diff_chain_from_urdf(data, end_link_name, root_link_name="", **kwargs): + """Pipes pk function to build a DiffChain from URDF data.""" + urdf_chain = build_chain_from_urdf(data) + return DiffChain(urdf_chain, end_link_name, "" if root_link_name == "" else root_link_name, **kwargs) diff --git a/src/pytorch_kinematics/transforms/parameter_conversions.py b/src/pytorch_kinematics/transforms/parameter_conversions.py new file mode 100644 index 0000000..fb0f163 --- /dev/null +++ b/src/pytorch_kinematics/transforms/parameter_conversions.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# Author: Jonathan Külz +# Date: 23.11.23 +import torch + + +def mdh_to_homogeneous(mdh_parameters: torch.Tensor) -> torch.Tensor: + """ + Converts a set of MDH parameters to a homogeneous transformation matrix. + + Follows Craig, Introduction to Robotics, 2005, p. 75. + :param mdh_parameters: The MDH parameters, ordered as alpha, a, d, theta. + :return: The homogeneous transformation matrix. + """ + alpha = mdh_parameters[..., 0] + a = mdh_parameters[..., 1] + d = mdh_parameters[..., 2] + theta = mdh_parameters[..., 3] + + ct = torch.cos(theta) + st = torch.sin(theta) + ca = torch.cos(alpha) + sa = torch.sin(alpha) + zeros = torch.zeros_like(theta) + return torch.stack([ + torch.stack([ct, -st, zeros, a], dim=-1), + torch.stack([st * ca, ct * ca, -sa, -d * sa], dim=-1), + torch.stack([st * sa, ct * sa, ca, d * ca], dim=-1), + torch.stack([zeros, zeros, zeros, torch.ones_like(theta)], + dim=-1) + ], dim=-2) + + +def homogeneous_to_mdh(T: torch.Tensor) -> torch.Tensor: + """ + Converts a homogeneous transformation matrix to a set of MDH parameters. + + Attention, this method is expensive due to an internal sanity check. + Follows Craig, Introduction to Robotics, 2005, p. 75. + :param T: The homogeneous transformation matrix. + :return: The MDH parameters. + """ + a = T[..., 0, 3] + theta = torch.atan2(-T[..., 0, 1], T[..., 0, 0]) + alpha = torch.atan2(-T[..., 1, 2], T[..., 2, 2]) + d = torch.empty_like(a) + use_cos = torch.isclose(torch.sin(alpha), torch.zeros_like(alpha)) + d[~use_cos] = -T[~use_cos][:, 1, 3] / torch.sin(alpha[~use_cos]) + d[use_cos] = T[use_cos][:, 2, 3] / torch.cos(alpha[use_cos]) + + parameters = torch.stack([alpha, a, d, theta], dim=-1) + if not torch.allclose(mdh_to_homogeneous(parameters), T, atol=1e-3): + raise ValueError('The given transformation is not MDH.') + + return parameters diff --git a/src/pytorch_kinematics/transforms/parameterized_transform.py b/src/pytorch_kinematics/transforms/parameterized_transform.py new file mode 100644 index 0000000..fcac08e --- /dev/null +++ b/src/pytorch_kinematics/transforms/parameterized_transform.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from enum import Enum +from typing import Collection, Optional, Tuple, Union + +import numpy as np +import torch + +from .transform3d import Transform3d +from .parameter_conversions import mdh_to_homogeneous, homogeneous_to_mdh + + +class ParameterConvention(Enum): + """A parameter convention for a kinematic chain.""" + + MDH = 1 # Modified Denavit-Hartenberg convention after Craig + + +class ParameterizedTransform(Transform3d, ABC): + """ + A 3d transform which is made from a set of (differentiable) parameters. + + The ParameterizedTransform class supports two levels of batching: The first level is the batch dimension of the + parameters for a single robot, the second level is the batch dimension of multiple robots. + """ + + convention: ParameterConvention # Implement this in subclasses + parameter_names: Tuple[str] # Implement this in subclasses + + def __init__( + self, + parameters: Optional[torch.Tensor] = None, + dtype: torch.dtype = torch.float32, + device: str = 'cpu', + requires_grad: bool = True, + matrix: Optional[torch.Tensor] = None, + default_batch_size: Tuple[int, int] = (1, 1) + ): + """Initialize a ParameterizedTransform.""" + super().__init__(dtype=dtype, device=device) + + if parameters is None: + parameters = torch.zeros(*default_batch_size, self.get_num_parameters(), dtype=dtype, device=device) + if parameters.ndim == 1: + parameters = parameters.unsqueeze(0) + if parameters.ndim == 2: + parameters = parameters.unsqueeze(0) + + self.requires_grad: bool = requires_grad + self.parameters: torch.Tensor = parameters + if matrix is None: + matrix = self._create_matrix().to(dtype=dtype, device=device) + self._matrix: torch.Tensor = matrix + + @abstractmethod + def _create_matrix(self) -> torch.Tensor: + """Returns the matrix representation of the transform.""" + + @abstractmethod + def update_joint_parameters(self, th: torch.Tensor, joint_types: np.array): + """Updates the parameters of the parameters according to joint configuration th.""" + + def clone(self) -> ParameterizedTransform: + """ + Deep copy of ParameterizedTransform object. All internal tensors are cloned individually. + + Returns: + new ParameterizedTransform object. + """ + other = self.__class__(dtype=self.dtype, device=self.device, requires_grad=self.requires_grad) + other._matrix = self._matrix.clone() + other._parameters = self._parameters.clone() + if self._lu is not None: + other._lu = [elem.clone() for elem in self._lu] + return other + + def stack(self, *others): + """ + Stacks multiple ParameterizedTransform objects together. + + Args: + others: ParameterizedTransform objects to stack. + + Returns: + new ParameterizedTransform object. + """ + other = self.__class__(dtype=self.dtype, device=self.device, requires_grad=self.requires_grad) + transforms = [self] + list(others) + matrix = torch.cat([t._matrix for t in transforms], dim=0) + parameters = torch.cat([t._parameters for t in transforms], dim=0) + other._matrix = matrix + other._parameters = parameters + return other + + @property + def parameters(self) -> torch.Tensor: + """Returns the joint parameters""" + return self._parameters + + @parameters.setter + def parameters(self, parameters: torch.Tensor): + """Sets the joint parameters""" + parameters.requires_grad = self.requires_grad + self._parameters = parameters + + @classmethod + def get_num_parameters(cls) -> int: + """Returns the number of parameters""" + return len(cls.parameter_names) + + def __repr__(self) -> str: + """Returns a string representation of the transform.""" + info = ', '.join([f'{name}={self.parameters[:, i]}' for i, name in enumerate(self.parameter_names)]) + return f"{self.__class__}({info})".replace('\n ', '') + + +class MDHTransform(ParameterizedTransform): + """A transformation derived from Denavit-Hartenberg parameters.""" + + convention: ParameterConvention = ParameterConvention.MDH + parameter_names: Tuple[str, str, str, str] = ('alpha', 'a', 'd', 'theta') + + @property + def theta(self): + """Returns the joint angle.""" + return self.parameters[:, 3] + + @property + def d(self): + """Returns the joint offset.""" + return self.parameters[:, 2] + + @property + def a(self): + """Returns the link length.""" + return self.parameters[:, 1] + + @property + def alpha(self): + """Returns the link twist.""" + return self.parameters[:, 0] + + def _create_matrix(self) -> torch.Tensor: + """Returns the matrix representation of the transform.""" + return mdh_to_homogeneous(self.parameters) + + def update_joint_parameters(self, th: torch.Tensor, joint_types: np.array): + """Updates the parameters of the parameters according to joint configuration th.""" + is_revolute = joint_types == 'revolute' + is_prismatic = joint_types == 'prismatic' + assert np.all(np.logical_xor(is_revolute, is_prismatic)) + self.parameters[is_revolute.nonzero()][:, 3] = th[is_revolute.nonzero()] + self.parameters[is_prismatic.nonzero()][:, 2] = th[is_prismatic.nonzero()] + + @classmethod + def from_homogeneous(cls, + homogeneous: torch.Tensor, + dtype: torch.dtype = torch.float32, + device: str = 'cpu', + requires_grad: bool = True, + ): + """Creates a MDHTransform from a homogeneous transformation matrix.""" + parameters = homogeneous_to_mdh(homogeneous) + return cls(parameters, dtype, device, requires_grad) + + +CONVENTION_IMPLEMENTATIONS = { + ParameterConvention.MDH: MDHTransform, +} diff --git a/src/pytorch_kinematics/transforms/transform3d.py b/src/pytorch_kinematics/transforms/transform3d.py index f0f9cc5..712d994 100644 --- a/src/pytorch_kinematics/transforms/transform3d.py +++ b/src/pytorch_kinematics/transforms/transform3d.py @@ -221,7 +221,7 @@ def __len__(self): return self.get_matrix().shape[0] def __getitem__(self, item): - return Transform3d(matrix=self.get_matrix()[item]) + return self.__class__(matrix=self.get_matrix()[item]) def __repr__(self): m = self.get_matrix() @@ -343,7 +343,7 @@ def inverse(self, invert_composed: bool = False): def stack(self, *others): transforms = [self] + list(others) matrix = torch.cat([t._matrix for t in transforms], dim=0) - out = Transform3d() + out = self.__class__() out._matrix = matrix return out From d88438eb9e7fc605f061fd125e1ffb15f996e33b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Wed, 29 Nov 2023 20:37:16 +0100 Subject: [PATCH 04/10] recompute matrix for param. transform --- src/pytorch_kinematics/chain.py | 6 ++--- src/pytorch_kinematics/diffchain.py | 26 +++++++++---------- .../transforms/parameterized_transform.py | 18 ++++++------- .../transforms/transform3d.py | 2 +- src/pytorch_kinematics/visualize.py | 11 ++++---- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index edcf199..4e8da7d 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -514,9 +514,9 @@ def convert_serial_inputs_to_chain_inputs(self, th, end_only: bool): th[..., jnt_idx] = partial_th_i return frame_indices, th - def visualize(self, th: torch.Tensor): + def visualize(self, **kwargs): """Visualize the robot chain in joint configuration th.""" from pytorch_kinematics.visualize import visualize - fk = self.forward_kinematics(th, end_only=False) + fk = self.forward_kinematics(end_only=False) arr = np.vstack([fk[f.name].get_matrix().cpu().detach().numpy() for f in self._serial_frames]) - visualize(arr) + visualize(arr, **kwargs) diff --git a/src/pytorch_kinematics/diffchain.py b/src/pytorch_kinematics/diffchain.py index ff74223..cd23295 100644 --- a/src/pytorch_kinematics/diffchain.py +++ b/src/pytorch_kinematics/diffchain.py @@ -1,6 +1,6 @@ # Author: Jonathan Külz # Date: 23.11.23 -from typing import List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union import numpy as np import torch @@ -52,20 +52,20 @@ def get_joint_offset(self, chain_idx) -> Optional[torch.Tensor]: joint_idx = self.joint_indices[chain_idx] if joint_idx == -1: # This is not a parameterized, moving joint return self.joint_offsets[chain_idx] - return self.parameterized_offsets[:, joint_idx, :, :].get_matrix() + return self.parameterized_offsets[:, joint_idx, :, :].get_matrix().reshape(self.batch_size, 4, 4) - def forward_kinematics(self, th, end_only: bool = True) -> Transform3d: + def forward_kinematics(self, end_only: bool = True) -> Union[Transform3d, Dict[str, Transform3d]]: """Overrides the SerialChain method to ensure using the parameters of this chain.""" - th = th.reshape((self.batch_size, self.n_joints)) - self.set_joint_parameters(th) - return super().forward_kinematics(th, end_only) - - def set_joint_parameters(self, th: torch.Tensor): - """ - Overrides the parameters that describe the current joint configuration. - """ - types = np.repeat(np.array([joint.joint_type for joint in self.get_joints()]).reshape(1, -1), self.batch_size, axis=0) - self.parameterized_offsets.update_joint_parameters(th, types) + transform = torch.eye(4).unsqueeze(0).repeat(self.batch_size, 1, 1) + ret = {} + for i, f in enumerate(self._serial_frames): + offset = self.get_joint_offset(i) + if offset is not None: + transform = torch.matmul(transform, offset) + ret[f.name] = Transform3d(matrix=transform) + if end_only: + return ret[self._serial_frames[-1].name] + return ret def __setup(self): """Ensure that all parameters are differentiable -- create a single leaf node containing all parameters.""" diff --git a/src/pytorch_kinematics/transforms/parameterized_transform.py b/src/pytorch_kinematics/transforms/parameterized_transform.py index fcac08e..1c2bce5 100644 --- a/src/pytorch_kinematics/transforms/parameterized_transform.py +++ b/src/pytorch_kinematics/transforms/parameterized_transform.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from enum import Enum +from functools import lru_cache from typing import Collection, Optional, Tuple, Union import numpy as np @@ -38,7 +39,7 @@ def __init__( default_batch_size: Tuple[int, int] = (1, 1) ): """Initialize a ParameterizedTransform.""" - super().__init__(dtype=dtype, device=device) + super().__init__(dtype=dtype, device=device, matrix=matrix) if parameters is None: parameters = torch.zeros(*default_batch_size, self.get_num_parameters(), dtype=dtype, device=device) @@ -49,12 +50,10 @@ def __init__( self.requires_grad: bool = requires_grad self.parameters: torch.Tensor = parameters - if matrix is None: - matrix = self._create_matrix().to(dtype=dtype, device=device) - self._matrix: torch.Tensor = matrix + self._matrix = None @abstractmethod - def _create_matrix(self) -> torch.Tensor: + def get_matrix(self) -> torch.Tensor: """Returns the matrix representation of the transform.""" @abstractmethod @@ -111,7 +110,7 @@ def get_num_parameters(cls) -> int: def __repr__(self) -> str: """Returns a string representation of the transform.""" - info = ', '.join([f'{name}={self.parameters[:, i]}' for i, name in enumerate(self.parameter_names)]) + info = ', '.join([f'{name}={self.parameters[..., i]}' for i, name in enumerate(self.parameter_names)]) return f"{self.__class__}({info})".replace('\n ', '') @@ -141,9 +140,10 @@ def alpha(self): """Returns the link twist.""" return self.parameters[:, 0] - def _create_matrix(self) -> torch.Tensor: - """Returns the matrix representation of the transform.""" - return mdh_to_homogeneous(self.parameters) + def get_matrix(self) -> torch.Tensor: + """Returns the matrix representation of the transform. Redos the computation on every call""" + self._matrix = mdh_to_homogeneous(self.parameters) + return self._matrix def update_joint_parameters(self, th: torch.Tensor, joint_types: np.array): """Updates the parameters of the parameters according to joint configuration th.""" diff --git a/src/pytorch_kinematics/transforms/transform3d.py b/src/pytorch_kinematics/transforms/transform3d.py index 712d994..8ed3cbf 100644 --- a/src/pytorch_kinematics/transforms/transform3d.py +++ b/src/pytorch_kinematics/transforms/transform3d.py @@ -221,7 +221,7 @@ def __len__(self): return self.get_matrix().shape[0] def __getitem__(self, item): - return self.__class__(matrix=self.get_matrix()[item]) + return Transform3d(matrix=self.get_matrix()[item]) def __repr__(self): m = self.get_matrix() diff --git a/src/pytorch_kinematics/visualize.py b/src/pytorch_kinematics/visualize.py index 9762831..1c126c9 100644 --- a/src/pytorch_kinematics/visualize.py +++ b/src/pytorch_kinematics/visualize.py @@ -6,7 +6,7 @@ import numpy as np -def visualize(frames: np.ndarray, show: bool = False) -> plt.Axes: +def visualize(frames: np.ndarray, show: bool = False, **kwargs) -> plt.Axes: """ Visualizes a sequence of frames. Args: @@ -23,9 +23,9 @@ def visualize(frames: np.ndarray, show: bool = False) -> plt.Axes: frames = frames.reshape(-1, 4, 4) num_frames = frames.shape[0] for i, frame in enumerate(frames): - draw_frame(ax, frame, scale=frame_scale) + draw_frame(ax, frame, scale=frame_scale, **kwargs) if i < num_frames - 1: - draw_link(ax, frame[:3, 3], frames[i + 1][:3, 3]) + draw_link(ax, frame[:3, 3], frames[i + 1][:3, 3], **kwargs) ax.set_xlim(center[0] - axis_size, center[0] + axis_size) ax.set_ylim(center[1] - axis_size, center[1] + axis_size) ax.set_zlim(center[2] - axis_size, center[2] + axis_size) @@ -56,7 +56,7 @@ def draw_frame(ax: axes3d.Axes3D, frame: np.ndarray, scale: float = .1): ax.quiver(origin[0], origin[1], origin[2], z[0], z[1], z[2], length=scale, color='b') -def draw_link(ax: axes3d.Axes3D, p0: np.ndarray, p1: np.ndarray): +def draw_link(ax: axes3d.Axes3D, p0: np.ndarray, p1: np.ndarray, **kwargs): """ Draws a line from p0 to p1. Args: @@ -66,4 +66,5 @@ def draw_link(ax: axes3d.Axes3D, p0: np.ndarray, p1: np.ndarray): Returns: None """ - ax.plot([p0[0], p1[0]], [p0[1], p1[1]], [p0[2], p1[2]], color='black', linewidth=3) + kwargs.setdefault('color', 'black') + ax.plot([p0[0], p1[0]], [p0[1], p1[1]], [p0[2], p1[2]], linewidth=3, **kwargs) From f1a73e7761d20a860e6b9a607f580f41d1f4ff8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Fri, 8 Dec 2023 20:10:14 +0100 Subject: [PATCH 05/10] Allow parameterized fk --- src/pytorch_kinematics/chain.py | 57 ++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index 4e8da7d..310992e 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -7,6 +7,7 @@ import pytorch_kinematics.transforms as tf from pytorch_kinematics import jacobian from pytorch_kinematics.frame import Frame, Link, Joint +from pytorch_kinematics.transforms.parameterized_transform import ParameterizedTransform from pytorch_kinematics.transforms.rotation_conversions import axis_and_angle_to_matrix_44, axis_and_d_to_pris_matrix @@ -23,6 +24,8 @@ def get_n_joints(th): return th.shape[-1] elif isinstance(th, list) or isinstance(th, dict): return len(th) + elif isinstance(th, ParameterizedTransform): + return th.parameters.shape[1] else: raise NotImplementedError(f"Unsupported type {type(th)}") @@ -36,6 +39,8 @@ def get_batch_size(th): elif isinstance(th, list): # Lists cannot be batched. We don't allow lists of lists. return 1 + elif isinstance(th, ParameterizedTransform): + return th.parameters.shape[0] else: raise NotImplementedError(f"Unsupported type {type(th)}") @@ -285,9 +290,6 @@ def _get_link_names(frame): link_names.extend(Chain._get_link_names(child)) return link_names - def get_joint_offset(self, chain_idx) -> Optional[torch.Tensor]: - return self.joint_offsets[chain_idx] - def get_link_names(self): names = self._get_link_names(self._root) return sorted(set(names), key=names.index) @@ -324,15 +326,21 @@ def forward_kinematics(self, th, frame_indices: Optional = None): if frame_indices is None: frame_indices = self.get_all_frame_indices() - th = self.ensure_tensor(th) - th = torch.atleast_2d(th) - - rev_jnt_transform, pris_jnt_transform = self._get_jnt_transform(th) + if isinstance(th, ParameterizedTransform): + diff_offsets = True + b = th.parameters.shape[0] + device = th.parameters.device + else: + diff_offsets = False + th = self.ensure_tensor(th) + th = torch.atleast_2d(th) + rev_jnt_transform, pris_jnt_transform = self._get_jnt_transform(th) + b = th.shape[0] + device = th.device frame_transforms = {} - b = th.shape[0] for frame_idx in frame_indices: - frame_transform = torch.eye(4).to(th).unsqueeze(0).repeat(b, 1, 1) + frame_transform = torch.eye(4).to(device).unsqueeze(0).repeat(b, 1, 1) # iterate down the list and compose the transform for chain_idx in self.parents_indices[frame_idx.item()]: @@ -343,20 +351,25 @@ def forward_kinematics(self, th, frame_indices: Optional = None): if link_offset_i is not None: frame_transform = frame_transform @ link_offset_i - joint_offset_i = self.get_joint_offset(chain_idx) + if diff_offsets: + jnt_idx = self.joint_indices[chain_idx] + joint_offset_i = th.get_matrix()[:, jnt_idx, :, :] + else: + joint_offset_i = self.joint_offsets[chain_idx] if joint_offset_i is not None: frame_transform = frame_transform @ joint_offset_i - jnt_idx = self.joint_indices[chain_idx] - jnt_type = self.joint_type_indices[chain_idx] - if jnt_type == 0: - pass - elif jnt_type == 1: - jnt_transform_i = rev_jnt_transform[:, jnt_idx] - frame_transform = frame_transform @ jnt_transform_i - elif jnt_type == 2: - jnt_transform_i = pris_jnt_transform[:, jnt_idx] - frame_transform = frame_transform @ jnt_transform_i + if not diff_offsets: + jnt_idx = self.joint_indices[chain_idx] + jnt_type = self.joint_type_indices[chain_idx] + if jnt_type == 0: + pass + elif jnt_type == 1: + jnt_transform_i = rev_jnt_transform[:, jnt_idx] + frame_transform = frame_transform @ jnt_transform_i + elif jnt_type == 2: + jnt_transform_i = pris_jnt_transform[:, jnt_idx] + frame_transform = frame_transform @ jnt_transform_i frame_transforms[frame_idx.item()] = frame_transform @@ -514,9 +527,9 @@ def convert_serial_inputs_to_chain_inputs(self, th, end_only: bool): th[..., jnt_idx] = partial_th_i return frame_indices, th - def visualize(self, **kwargs): + def visualize(self, th, **kwargs): """Visualize the robot chain in joint configuration th.""" from pytorch_kinematics.visualize import visualize - fk = self.forward_kinematics(end_only=False) + fk = self.forward_kinematics(th, end_only=False) arr = np.vstack([fk[f.name].get_matrix().cpu().detach().numpy() for f in self._serial_frames]) visualize(arr, **kwargs) From d30c53039f588a3e5e71f3ef54ef863d67a8db6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Tue, 12 Dec 2023 17:17:59 +0100 Subject: [PATCH 06/10] create chain from MDH --- src/pytorch_kinematics/chain.py | 117 +++++++++++++++--- .../transforms/parameterized_transform.py | 2 +- 2 files changed, 99 insertions(+), 20 deletions(-) diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index 4e8da7d..f8893d1 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -1,5 +1,5 @@ from functools import lru_cache -from typing import Optional, Sequence, Tuple +from typing import Collection, Optional, Sequence, Tuple, Union import numpy as np import torch @@ -308,12 +308,20 @@ def _get_jnt_transform(self, th) -> Tuple[torch.Tensor, torch.Tensor]: axes_expanded = self.axes.unsqueeze(0).repeat(th.shape[0], 1, 1) return axis_and_angle_to_matrix_44(axes_expanded, th), axis_and_d_to_pris_matrix(axes_expanded, th) - def forward_kinematics(self, th, frame_indices: Optional = None): + def forward_kinematics(self, + th: Optional[torch.Tensor] = None, + joint_offsets: Optional[Union[torch.Tensor, tf.Transform3d]] = None, + link_offsets: Optional[Union[torch.Tensor, tf.Transform3d]] = None, + frame_indices: Optional = None): """ - Compute forward kinematics for the given joint values. + Compute forward kinematics for the given any combination of joint values, joint offsets, and link offsets. Args: th: A dict, list, numpy array, or torch tensor of joints values. Possibly batched. + joint_offsets: A Transform3d object or a tensor of shape (N, 4, 4) where N is the number of joints. + If provided, overrides the joint offsets in the chain. + link_offsets: A Transform3d object or a tensor of shape (N, 4, 4) where N is the number of joints. + If provided, overrides the link offsets in the chain. frame_indices: A list of frame indices to compute transforms for. If None, all frames are computed. Use `get_frame_indices` to convert from frame names to frame indices. @@ -324,13 +332,33 @@ def forward_kinematics(self, th, frame_indices: Optional = None): if frame_indices is None: frame_indices = self.get_all_frame_indices() - th = self.ensure_tensor(th) - th = torch.atleast_2d(th) + if isinstance(joint_offsets, tf.Transform3d): + joint_offsets = joint_offsets.get_matrix() + if isinstance(link_offsets, tf.Transform3d): + link_offsets = link_offsets.get_matrix() + + if th is joint_offsets is link_offsets is None: + raise ValueError("Must provide at least one of th, joint_offsets, or link_offsets.") + if th is not None: + b = th.shape[0] + th = self.ensure_tensor(th) + th = torch.atleast_2d(th) + elif joint_offsets is not None: + b = joint_offsets.shape[0] + else: + b = link_offsets.shape[0] + + # initialize default values + if th is None: + th = torch.zeros([b, self.n_joints], device=self.device, dtype=self.dtype) + if joint_offsets is None: + joint_offsets = {chain_idx: self.get_joint_offset(chain_idx) for chain_idx in range(len(self.joint_offsets))} + if link_offsets is None: + link_offsets = self.link_offsets rev_jnt_transform, pris_jnt_transform = self._get_jnt_transform(th) frame_transforms = {} - b = th.shape[0] for frame_idx in frame_indices: frame_transform = torch.eye(4).to(th).unsqueeze(0).repeat(b, 1, 1) @@ -339,11 +367,11 @@ def forward_kinematics(self, th, frame_indices: Optional = None): if chain_idx.item() in frame_transforms: frame_transform = frame_transforms[chain_idx.item()] else: - link_offset_i = self.link_offsets[chain_idx] + link_offset_i = link_offsets[chain_idx] if link_offset_i is not None: frame_transform = frame_transform @ link_offset_i - joint_offset_i = self.get_joint_offset(chain_idx) + joint_offset_i = joint_offsets[chain_idx.item()] if joint_offset_i is not None: frame_transform = frame_transform @ joint_offset_i @@ -466,35 +494,86 @@ def _generate_serial_chain_recurse(root_frame, end_frame_name): return [child] + frames return None + @classmethod + def from_joint_transforms(cls, + transforms: tf.Transform3d, + link_offsets: Optional[tf.Transform3d] = None, + joint_names: Optional[Collection[str]] = None, + link_names: Optional[Collection[str]] = None, + joint_types: Optional[Collection[str]] = None, + **kwargs + ): + """ + Create a serial chain with zero link offsets and joint offsets according to the input. + + Assumes that frame 0 is the root frame that contains an empty link and a fixed "world" joint. Accordingly, frame + i is aligned with joint i, which moves. All joint axes are aligned with the z-axis of the joint frame. + Args: + transforms: A transform that represents a matrix of shape (N, 4, 4) where N is the number of joints. + link_offsets: A transform that represents a matrix of shape (N, 4, 4) where N is the number of joints. + Optional. If None, all link offsets are assumed to be zero. + joint_names: The names of the joints. If None, the joints are named "joint_0", "joint_1", etc. + link_names: The names of the links. If None, the links are named "link_0", "link_1", etc. + joint_types: The types of the joints. If None, the joints are assumed to be revolute. + """ + joint_offsets = transforms.get_matrix() + n = joint_offsets.shape[0] + if link_offsets is None: + link_offsets = [None] * n + if joint_names is None: + joint_names = [f"joint_{i+1}" for i in range(n)] + if link_names is None: + link_names = [f"link_{i+1}" for i in range(n)] + if joint_types is None: + joint_types = ['revolute' for _ in range(n)] + root_frame = Frame(name="world") + root_frame.link = Link(name="world") + root_frame.joint = Joint(name="world", joint_type="fixed") + children = [] + for (i, link, joint, joint_type) in reversed(list(zip(range(n), link_names, joint_names, joint_types))): + frame = Frame(name=f"{link}") + frame.link = Link(name=link, offset=link_offsets[i]) + frame.joint = Joint(name=joint, offset=transforms[i], joint_type=joint_type) + frame.children = children + children = [frame] + root_frame.children = children + return cls(Chain(root_frame, **kwargs), link_names[-1], root_frame_name="root") + def jacobian(self, th, locations=None): if locations is not None: locations = tf.Transform3d(pos=locations) return jacobian.calc_jacobian(self, th, tool=locations) - def forward_kinematics(self, th, end_only: bool = True): + def forward_kinematics(self, + th: Optional[torch.Tensor] = None, + joint_offsets: Optional[torch.Tensor] = None, + link_offsets: Optional[torch.Tensor] = None, + end_only: bool = True): """ Like the base class, except `th` only needs to contain the joints in the SerialChain, not all joints. """ - frame_indices, th = self.convert_serial_inputs_to_chain_inputs(th, end_only) + if end_only: + frame_indices = self.get_frame_indices(self._serial_frames[-1].name) + else: + # pass through default behavior for frame indices being None, which is currently + # to return all frames. + frame_indices = None + if th is not None: + th = self.convert_serial_inputs_to_chain_inputs(th) - mat = super().forward_kinematics(th, frame_indices) + mat = super().forward_kinematics(th, joint_offsets=joint_offsets, link_offsets=link_offsets, + frame_indices=frame_indices) if end_only: return mat[self._serial_frames[-1].name] else: return mat - def convert_serial_inputs_to_chain_inputs(self, th, end_only: bool): + def convert_serial_inputs_to_chain_inputs(self, th: torch.Tensor): # th = self.ensure_tensor(th) th_b = get_batch_size(th) th_n_joints = get_n_joints(th) if isinstance(th, list): th = torch.tensor(th, device=self.device, dtype=self.dtype) - if end_only: - frame_indices = self.get_frame_indices(self._serial_frames[-1].name) - else: - # pass through default behavior for frame indices being None, which is currently - # to return all frames. - frame_indices = None if th_n_joints < self.n_joints: # if th is only a partial list of joints, assume it's a list of joints for only the serial chain. partial_th = th @@ -512,7 +591,7 @@ def convert_serial_inputs_to_chain_inputs(self, th, end_only: bool): jnt_idx = self.joint_indices[k] if frame.joint.joint_type != 'fixed': th[..., jnt_idx] = partial_th_i - return frame_indices, th + return th def visualize(self, **kwargs): """Visualize the robot chain in joint configuration th.""" diff --git a/src/pytorch_kinematics/transforms/parameterized_transform.py b/src/pytorch_kinematics/transforms/parameterized_transform.py index 1c2bce5..b420bf3 100644 --- a/src/pytorch_kinematics/transforms/parameterized_transform.py +++ b/src/pytorch_kinematics/transforms/parameterized_transform.py @@ -142,7 +142,7 @@ def alpha(self): def get_matrix(self) -> torch.Tensor: """Returns the matrix representation of the transform. Redos the computation on every call""" - self._matrix = mdh_to_homogeneous(self.parameters) + self._matrix = mdh_to_homogeneous(self.parameters).view(-1, 4, 4) return self._matrix def update_joint_parameters(self, th: torch.Tensor, joint_types: np.array): From 4a4c9a1027659cc30bd796ed9434c0c181646bc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Tue, 12 Dec 2023 19:04:51 +0100 Subject: [PATCH 07/10] more towards batching --- src/pytorch_kinematics/chain.py | 26 +++++++------------ .../transforms/parameterized_transform.py | 17 ++++++------ src/pytorch_kinematics/visualize.py | 26 ++++++++++++++----- 3 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index 6528cf8..93ef008 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -24,8 +24,6 @@ def get_n_joints(th): return th.shape[-1] elif isinstance(th, list) or isinstance(th, dict): return len(th) - elif isinstance(th, ParameterizedTransform): - return th.parameters.shape[1] else: raise NotImplementedError(f"Unsupported type {type(th)}") @@ -39,8 +37,6 @@ def get_batch_size(th): elif isinstance(th, list): # Lists cannot be batched. We don't allow lists of lists. return 1 - elif isinstance(th, ParameterizedTransform): - return th.parameters.shape[0] else: raise NotImplementedError(f"Unsupported type {type(th)}") @@ -331,20 +327,25 @@ def forward_kinematics(self, A dict of Transform3d objects for each frame. """ + def get_ith_transform(offset, i): + if isinstance(offset, torch.Tensor): + return offset[:, i, ...] + return offset[i] + if frame_indices is None: frame_indices = self.get_all_frame_indices() if isinstance(joint_offsets, tf.Transform3d): - joint_offsets = joint_offsets.get_matrix() + joint_offsets = joint_offsets.get_matrix().view(-1, len(self.joint_offsets), 4, 4) if isinstance(link_offsets, tf.Transform3d): - link_offsets = link_offsets.get_matrix() + link_offsets = link_offsets.get_matrix().view(-1, len(self.link_offsets), 4, 4) if th is joint_offsets is link_offsets is None: raise ValueError("Must provide at least one of th, joint_offsets, or link_offsets.") if th is not None: - b = th.shape[0] th = self.ensure_tensor(th) th = torch.atleast_2d(th) + b = th.shape[0] to_this = th elif joint_offsets is not None: b = joint_offsets.shape[0] @@ -372,11 +373,11 @@ def forward_kinematics(self, if chain_idx.item() in frame_transforms: frame_transform = frame_transforms[chain_idx.item()] else: - link_offset_i = link_offsets[chain_idx] + link_offset_i = get_ith_transform(link_offsets, chain_idx) if link_offset_i is not None: frame_transform = frame_transform @ link_offset_i - joint_offset_i = joint_offsets[chain_idx.item()] + joint_offset_i = get_ith_transform(joint_offsets, chain_idx) if joint_offset_i is not None: frame_transform = frame_transform @ joint_offset_i @@ -603,10 +604,3 @@ def convert_serial_inputs_to_chain_inputs(self, th: torch.Tensor): if frame.joint.joint_type != 'fixed': th[..., jnt_idx] = partial_th_i return th - - def visualize(self, th, **kwargs): - """Visualize the robot chain in joint configuration th.""" - from pytorch_kinematics.visualize import visualize - fk = self.forward_kinematics(th, end_only=False) - arr = np.vstack([fk[f.name].get_matrix().cpu().detach().numpy() for f in self._serial_frames]) - visualize(arr, **kwargs) diff --git a/src/pytorch_kinematics/transforms/parameterized_transform.py b/src/pytorch_kinematics/transforms/parameterized_transform.py index 4b2f9c8..ea40504 100644 --- a/src/pytorch_kinematics/transforms/parameterized_transform.py +++ b/src/pytorch_kinematics/transforms/parameterized_transform.py @@ -68,27 +68,26 @@ def clone(self) -> ParameterizedTransform: new ParameterizedTransform object. """ other = self.__class__(dtype=self.dtype, device=self.device, requires_grad=self.requires_grad) - other._matrix = self._matrix.clone() + other._matrix = self.get_matrix() other._parameters = self._parameters.clone() if self._lu is not None: other._lu = [elem.clone() for elem in self._lu] return other - def stack(self, *others): + def stack(self, *others, dim=0): """ Stacks multiple ParameterizedTransform objects together. Args: others: ParameterizedTransform objects to stack. + dim: Dimension along which to stack. Can be 0 for batch and 1 for joints. Returns: new ParameterizedTransform object. """ other = self.__class__(dtype=self.dtype, device=self.device, requires_grad=self.requires_grad) transforms = [self] + list(others) - matrix = torch.cat([t._matrix for t in transforms], dim=0) - parameters = torch.cat([t._parameters for t in transforms], dim=0) - other._matrix = matrix + parameters = torch.cat([t._parameters for t in transforms], dim=dim) other._parameters = parameters return other @@ -132,22 +131,22 @@ class MDHTransform(ParameterizedTransform): @property def theta(self): """Returns the joint angle.""" - return self.parameters[:, 3] + return self.parameters[..., 3] @property def d(self): """Returns the joint offset.""" - return self.parameters[:, 2] + return self.parameters[..., 2] @property def a(self): """Returns the link length.""" - return self.parameters[:, 1] + return self.parameters[..., 1] @property def alpha(self): """Returns the link twist.""" - return self.parameters[:, 0] + return self.parameters[..., 0] def get_matrix(self) -> torch.Tensor: """Returns the matrix representation of the transform. Redos the computation on every call""" diff --git a/src/pytorch_kinematics/visualize.py b/src/pytorch_kinematics/visualize.py index 1c126c9..fa70e35 100644 --- a/src/pytorch_kinematics/visualize.py +++ b/src/pytorch_kinematics/visualize.py @@ -1,27 +1,32 @@ #!/usr/bin/env python3 # Author: Jonathan Külz # Date: 27.11.23 +from typing import Collection + import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import axes3d import numpy as np +from pytorch_kinematics import Transform3d + -def visualize(frames: np.ndarray, show: bool = False, **kwargs) -> plt.Axes: +def visualize(frames: Transform3d, show: bool = False, **kwargs) -> plt.Axes: """ Visualizes a sequence of frames. Args: - frames: An nx4x4 array of frame placements. Assumes the first frame is the base frame (no implicit world frame - is assumed). + frames: The forward kinematics transforms of a single robot. show: Whether to show the plot. Returns: None """ + frames = np.vstack([f.get_matrix().cpu().detach().numpy() for f in frames]) axis_size = np.max(frames[:, :3, 3]) - np.min(frames[:, :3, 3]) * .8 center = np.mean(frames[:, :3, 3], axis=0) frame_scale = axis_size / 15 ax = plt.figure(figsize=(12, 12)).add_subplot(projection='3d') frames = frames.reshape(-1, 4, 4) num_frames = frames.shape[0] + draw_base(ax, scale=frame_scale, **kwargs) for i, frame in enumerate(frames): draw_frame(ax, frame, scale=frame_scale, **kwargs) if i < num_frames - 1: @@ -37,6 +42,15 @@ def visualize(frames: np.ndarray, show: bool = False, **kwargs) -> plt.Axes: return ax +def draw_base(ax: axes3d.Axes3D, scale: float = .1, **kwargs): + """Draw a sphere of radius scale at the origin.""" + u, v = np.mgrid[0:2 * np.pi:20j, 0:np.pi:10j] + x = scale * np.cos(u) * np.sin(v) + y = scale * np.sin(u) * np.sin(v) + z = scale * np.cos(v) + ax.plot_wireframe(x, y, z, color='gray') + + def draw_frame(ax: axes3d.Axes3D, frame: np.ndarray, scale: float = .1): """ Draws a frame. @@ -51,9 +65,9 @@ def draw_frame(ax: axes3d.Axes3D, frame: np.ndarray, scale: float = .1): x = frame[:3, 0] y = frame[:3, 1] z = frame[:3, 2] - ax.quiver(origin[0], origin[1], origin[2], x[0], x[1], x[2], length=scale, color='r') - ax.quiver(origin[0], origin[1], origin[2], y[0], y[1], y[2], length=scale, color='g') - ax.quiver(origin[0], origin[1], origin[2], z[0], z[1], z[2], length=scale, color='b') + ax.quiver(origin[0], origin[1], origin[2], x[0], x[1], x[2], length=scale, color='r', linewidths=3) + ax.quiver(origin[0], origin[1], origin[2], y[0], y[1], y[2], length=scale, color='g', linewidths=3) + ax.quiver(origin[0], origin[1], origin[2], z[0], z[1], z[2], length=scale, color='b', linewidths=3) def draw_link(ax: axes3d.Axes3D, p0: np.ndarray, p1: np.ndarray, **kwargs): From 16e128a6c958fee0045e65287b473afd387c0990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Wed, 13 Dec 2023 11:32:00 +0100 Subject: [PATCH 08/10] enable batching --- src/pytorch_kinematics/transforms/parameterized_transform.py | 4 +++- src/pytorch_kinematics/transforms/transform3d.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/pytorch_kinematics/transforms/parameterized_transform.py b/src/pytorch_kinematics/transforms/parameterized_transform.py index ea40504..0c1a04b 100644 --- a/src/pytorch_kinematics/transforms/parameterized_transform.py +++ b/src/pytorch_kinematics/transforms/parameterized_transform.py @@ -114,6 +114,7 @@ def get_num_parameters(cls) -> int: return len(cls.parameter_names) def __getitem__(self, item): + item = [i if not isinstance(i, int) else slice(i, i + 1) for i in item] # Make sure to not loose dimensions return self.__class__(parameters=self.parameters[item], dtype=self.dtype, device=self.device) def __repr__(self) -> str: @@ -150,7 +151,8 @@ def alpha(self): def get_matrix(self) -> torch.Tensor: """Returns the matrix representation of the transform. Redos the computation on every call""" - self._matrix = mdh_to_homogeneous(self.parameters).view(-1, 4, 4) + b = self.parameters.shape[0] + self._matrix = torch.squeeze(mdh_to_homogeneous(self.parameters).view(b, -1, 4, 4)) return self._matrix def update_joint_parameters(self, th: torch.Tensor, joint_types: np.array): diff --git a/src/pytorch_kinematics/transforms/transform3d.py b/src/pytorch_kinematics/transforms/transform3d.py index 1ac196c..2f71cd9 100644 --- a/src/pytorch_kinematics/transforms/transform3d.py +++ b/src/pytorch_kinematics/transforms/transform3d.py @@ -293,7 +293,7 @@ def inverse(self, invert_composed: bool = False): def stack(self, *others): transforms = [self] + list(others) - matrix = torch.cat([t._matrix for t in transforms], dim=0).to(self) + matrix = torch.cat([t._matrix for t in transforms], dim=0).to(self.device, self.dtype) out = self.__class__(device=self.device, dtype=self.dtype) out._matrix = matrix return out From 9a8b55c216db666346837bb262b807ded5f596c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Wed, 13 Dec 2023 17:02:44 +0100 Subject: [PATCH 09/10] fix device bugs --- src/pytorch_kinematics/chain.py | 14 ++++--- .../transforms/parameterized_transform.py | 42 +++++++++++++------ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/pytorch_kinematics/chain.py b/src/pytorch_kinematics/chain.py index 93ef008..e0900c1 100644 --- a/src/pytorch_kinematics/chain.py +++ b/src/pytorch_kinematics/chain.py @@ -303,7 +303,7 @@ def _get_jnt_transform(self, th) -> Tuple[torch.Tensor, torch.Tensor]: Returns: A tuple of revolute, prismatic joint transforms """ - axes_expanded = self.axes.unsqueeze(0).repeat(th.shape[0], 1, 1) + axes_expanded = self.axes.unsqueeze(0).repeat(th.shape[0], 1, 1).to(th) return axis_and_angle_to_matrix_44(axes_expanded, th), axis_and_d_to_pris_matrix(axes_expanded, th) def forward_kinematics(self, @@ -356,7 +356,7 @@ def get_ith_transform(offset, i): # initialize default values if th is None: - th = torch.zeros([b, self.n_joints], device=self.device, dtype=self.dtype) + th = torch.zeros([b, self.n_joints]).to(to_this) if joint_offsets is None: joint_offsets = self.joint_offsets if link_offsets is None: @@ -523,15 +523,17 @@ def from_joint_transforms(cls, link_names: The names of the links. If None, the links are named "link_0", "link_1", etc. joint_types: The types of the joints. If None, the joints are assumed to be revolute. """ - if isinstance(transforms, tf.parameterized_transform.ParameterizedTransform): - transforms = transforms.toTransform3d() - if isinstance(link_offsets, tf.parameterized_transform.ParameterizedTransform): - link_offsets = link_offsets.toTransform3d() + device = kwargs.get('device', transforms.device) + dtype = kwargs.get('dtype', transforms.dtype) + transforms = transforms.to(device=device, dtype=dtype) joint_offsets = transforms.get_matrix() + assert len(joint_offsets.shape) == 3, "Expected a 3D matrix of shape (N, 4, 4)." n = joint_offsets.shape[0] if link_offsets is None: link_offsets = [None] * n + else: + link_offsets = link_offsets.to(device=device, dtype=dtype) if joint_names is None: joint_names = [f"joint_{i+1}" for i in range(n)] if link_names is None: diff --git a/src/pytorch_kinematics/transforms/parameterized_transform.py b/src/pytorch_kinematics/transforms/parameterized_transform.py index 0c1a04b..5caae67 100644 --- a/src/pytorch_kinematics/transforms/parameterized_transform.py +++ b/src/pytorch_kinematics/transforms/parameterized_transform.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from enum import Enum from functools import lru_cache -from typing import Collection, Optional, Tuple, Union +from typing import Collection, Iterable, Optional, Tuple, Union import numpy as np import torch @@ -36,20 +36,20 @@ def __init__( device: str = 'cpu', requires_grad: bool = True, matrix: Optional[torch.Tensor] = None, - default_batch_size: Tuple[int, int] = (1, 1) + default_batch_size: Union[Tuple[int], Tuple[int, int]] = (1, 1) ): """Initialize a ParameterizedTransform.""" super().__init__(dtype=dtype, device=device, matrix=matrix) + assert len(default_batch_size) in (1, 2), "default_batch_size must be a tuple of length 1 or 2" if parameters is None: - parameters = torch.zeros(*default_batch_size, self.get_num_parameters(), dtype=dtype, device=device) - if parameters.ndim == 1: - parameters = parameters.unsqueeze(0) - if parameters.ndim == 2: + parameters = torch.zeros(*default_batch_size, self.get_num_parameters()) + while parameters.ndim < 1 + len(default_batch_size): parameters = parameters.unsqueeze(0) + self.default_batch_size: Union[Tuple[int], Tuple[int, int]] = default_batch_size self.requires_grad: bool = requires_grad - self.parameters: torch.Tensor = parameters + self.parameters: torch.Tensor = parameters.to(self.device, self.dtype) self._matrix = None @abstractmethod @@ -69,7 +69,7 @@ def clone(self) -> ParameterizedTransform: """ other = self.__class__(dtype=self.dtype, device=self.device, requires_grad=self.requires_grad) other._matrix = self.get_matrix() - other._parameters = self._parameters.clone() + other.parameters = self._parameters.detach().clone() if self._lu is not None: other._lu = [elem.clone() for elem in self._lu] return other @@ -87,14 +87,27 @@ def stack(self, *others, dim=0): """ other = self.__class__(dtype=self.dtype, device=self.device, requires_grad=self.requires_grad) transforms = [self] + list(others) - parameters = torch.cat([t._parameters for t in transforms], dim=dim) + parameters = torch.cat([t._parameters for t in transforms], dim=dim).to(self.device, dtype=self.dtype) other._parameters = parameters return other + def to(self, device, copy: bool = False, dtype=None): + """Makes sure to also set parameters to the correct device.""" + other = super().to(device, copy, dtype) + if other is self and other._parameters.device == device and (dtype is None or dtype == other._parameters.dtype): + return self + other.parameters = self._parameters.detach().to(device, dtype) + return other + def toTransform3d(self): """Returns a Transform3d object with the same matrix as this ParameterizedTransform.""" return Transform3d(matrix=self.get_matrix(), dtype=self.dtype, device=self.device) + @property + def num_batch_levels(self) -> int: + """Returns the number of batch levels.""" + return len(self.default_batch_size) + @property def parameters(self) -> torch.Tensor: """Returns the joint parameters""" @@ -114,8 +127,10 @@ def get_num_parameters(cls) -> int: return len(cls.parameter_names) def __getitem__(self, item): - item = [i if not isinstance(i, int) else slice(i, i + 1) for i in item] # Make sure to not loose dimensions - return self.__class__(parameters=self.parameters[item], dtype=self.dtype, device=self.device) + if isinstance(item, Iterable): + item = [i if not isinstance(i, int) else slice(i, i + 1) for i in item] # Make sure to not lose dimensions + return self.__class__(parameters=self.parameters[item], dtype=self.dtype, device=self.device, + default_batch_size=self.default_batch_size) def __repr__(self) -> str: """Returns a string representation of the transform.""" @@ -152,7 +167,10 @@ def alpha(self): def get_matrix(self) -> torch.Tensor: """Returns the matrix representation of the transform. Redos the computation on every call""" b = self.parameters.shape[0] - self._matrix = torch.squeeze(mdh_to_homogeneous(self.parameters).view(b, -1, 4, 4)) + if self.num_batch_levels == 1: + self._matrix = mdh_to_homogeneous(self.parameters).view(b, 4, 4) + else: + self._matrix = mdh_to_homogeneous(self.parameters).view(b, -1, 4, 4) return self._matrix def update_joint_parameters(self, th: torch.Tensor, joint_types: np.array): From de33e5a0ef6c99b6f094eb917875eb71397ae78f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonathan=20K=C3=BClz?= Date: Fri, 15 Dec 2023 18:16:01 +0100 Subject: [PATCH 10/10] new viz feature --- src/pytorch_kinematics/visualize.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/pytorch_kinematics/visualize.py b/src/pytorch_kinematics/visualize.py index fa70e35..564937c 100644 --- a/src/pytorch_kinematics/visualize.py +++ b/src/pytorch_kinematics/visualize.py @@ -1,20 +1,22 @@ #!/usr/bin/env python3 # Author: Jonathan Külz # Date: 27.11.23 -from typing import Collection +from typing import Collection, Optional import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import axes3d import numpy as np +import torch from pytorch_kinematics import Transform3d -def visualize(frames: Transform3d, show: bool = False, **kwargs) -> plt.Axes: +def visualize(frames: Transform3d, goal: Optional[torch.Tensor] = None, show: bool = False, **kwargs) -> plt.Axes: """ Visualizes a sequence of frames. Args: frames: The forward kinematics transforms of a single robot. + goal: The goal to visualize. show: Whether to show the plot. Returns: None @@ -27,6 +29,8 @@ def visualize(frames: Transform3d, show: bool = False, **kwargs) -> plt.Axes: frames = frames.reshape(-1, 4, 4) num_frames = frames.shape[0] draw_base(ax, scale=frame_scale, **kwargs) + if goal is not None: + draw_goal(ax, goal, scale=frame_scale, **kwargs) for i, frame in enumerate(frames): draw_frame(ax, frame, scale=frame_scale, **kwargs) if i < num_frames - 1: @@ -51,6 +55,16 @@ def draw_base(ax: axes3d.Axes3D, scale: float = .1, **kwargs): ax.plot_wireframe(x, y, z, color='gray') +def draw_goal(ax: axes3d.Axes3D, goal: torch.Tensor, scale: float = .1, **kwargs): + """Draw a sphere of radius scale at the origin.""" + u, v = np.mgrid[0:2 * np.pi:10j, 0:np.pi:5j] + goal = np.squeeze(goal.cpu().detach().numpy()) + x = goal[0] + scale * np.cos(u) * np.sin(v) + y = goal[1] + scale * np.sin(u) * np.sin(v) + z = goal[2] + scale * np.cos(v) + ax.plot_wireframe(x, y, z, color='red') + + def draw_frame(ax: axes3d.Axes3D, frame: np.ndarray, scale: float = .1): """ Draws a frame.