From 248780876ce98cced63d5f6a1edee3d6065a092f Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Fri, 29 May 2026 22:26:08 +0800 Subject: [PATCH 01/16] feat: add neural ik --- embodichain/lab/sim/solvers/__init__.py | 1 + .../lab/sim/solvers/neural_ik_solver.py | 228 ++++++++++++++++++ examples/sim/solvers/neural_ik_solver.py | 178 ++++++++++++++ 3 files changed, 407 insertions(+) create mode 100644 embodichain/lab/sim/solvers/neural_ik_solver.py create mode 100644 examples/sim/solvers/neural_ik_solver.py diff --git a/embodichain/lab/sim/solvers/__init__.py b/embodichain/lab/sim/solvers/__init__.py index 901ab401..25a932ba 100644 --- a/embodichain/lab/sim/solvers/__init__.py +++ b/embodichain/lab/sim/solvers/__init__.py @@ -21,3 +21,4 @@ from .pink_solver import PinkSolverCfg, PinkSolver from .opw_solver import OPWSolverCfg, OPWSolver from .srs_solver import SRSSolverCfg, SRSSolver +from .neural_ik_solver import NeuralIKSolverCfg, NeuralIKSolver diff --git a/embodichain/lab/sim/solvers/neural_ik_solver.py b/embodichain/lab/sim/solvers/neural_ik_solver.py new file mode 100644 index 00000000..949b0534 --- /dev/null +++ b/embodichain/lab/sim/solvers/neural_ik_solver.py @@ -0,0 +1,228 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +from __future__ import annotations + +import torch +import torch.nn as nn + +from embodichain.utils import configclass +from embodichain.utils.math import ( + convert_quat, + quat_error_magnitude, + quat_from_matrix, +) +from embodichain.lab.sim.solvers import SolverCfg, BaseSolver + +__all__ = ["NeuralIKSolverCfg", "NeuralIKSolver"] + + +@configclass +class NeuralIKSolverCfg(SolverCfg): + """Configuration for the neural network IK solver.""" + + class_type: str = "NeuralIKSolver" + + checkpoint_path: str = "" + """Path to the trained policy checkpoint (.pt file).""" + + max_steps: int = 30 + """Number of policy inference iterations per IK solve.""" + + action_scale: float = 0.2 + """Action scaling factor (radians).""" + + obs_dim: int | None = None + """Observation dimension. If None, auto-computed as ``2 * num_arm_joints + 14``.""" + + num_arm_joints: int = 7 + """Number of arm joints (policy only controls arm, not fingers).""" + + hidden_dims: list[int] = [256, 256] + """Hidden layer dimensions for the MLP policy network.""" + + pos_eps: float = 0.01 + """Position convergence tolerance (meters) for success check.""" + + rot_eps: float = 0.1 + """Rotation convergence tolerance (radians) for success check.""" + + def init_solver( + self, device: torch.device = torch.device("cpu"), **kwargs + ) -> NeuralIKSolver: + if self.obs_dim is None: + self.obs_dim = 2 * self.num_arm_joints + 14 + solver = NeuralIKSolver(cfg=self, device=device, **kwargs) + solver.set_tcp(self._get_tcp_as_numpy()) + return solver + + +def _build_mlp(obs_dim: int, hidden_dims: list[int], action_dim: int) -> nn.Sequential: + """Build an MLP with Tanh activations between hidden layers.""" + layers = [] + in_dim = obs_dim + for h in hidden_dims: + layers.append(nn.Linear(in_dim, h)) + layers.append(nn.Tanh()) + in_dim = h + layers.append(nn.Linear(in_dim, action_dim)) + return nn.Sequential(*layers) + + +class NeuralIKSolver(BaseSolver): + """IK solver using a trained neural network policy. + + Loads a checkpoint containing actor_mean weights and obs_normalizer stats, + then runs iterative inference to solve IK queries. + """ + + def __init__(self, cfg: NeuralIKSolverCfg, device=None, **kwargs): + super().__init__(cfg=cfg, device=device, **kwargs) + + self._max_steps = cfg.max_steps + self._action_scale = cfg.action_scale + self._num_arm_joints = cfg.num_arm_joints + self._pos_eps = cfg.pos_eps + self._rot_eps = cfg.rot_eps + + ckpt = torch.load( + cfg.checkpoint_path, map_location=self.device, weights_only=False + ) + + if "agent" not in ckpt: + raise KeyError( + f"Checkpoint at '{cfg.checkpoint_path}' is missing 'agent' key. " + f"Available keys: {list(ckpt.keys())}. " + f"Expected a checkpoint from the analytic_policy_gradients training pipeline." + ) + actor_keys = [k for k in ckpt["agent"] if k.startswith("actor_mean.")] + if not actor_keys: + raise KeyError( + f"Checkpoint 'agent' has no 'actor_mean.*' keys. " + f"Available: {list(ckpt['agent'].keys())}." + ) + if "obs_normalizer" not in ckpt: + raise KeyError( + f"Checkpoint at '{cfg.checkpoint_path}' is missing 'obs_normalizer'. " + f"Available keys: {list(ckpt.keys())}." + ) + for subkey in ("mean", "var"): + if subkey not in ckpt["obs_normalizer"]: + raise KeyError( + f"Checkpoint 'obs_normalizer' is missing '{subkey}'. " + f"Available: {list(ckpt['obs_normalizer'].keys())}." + ) + + self.mlp = _build_mlp(cfg.obs_dim, cfg.hidden_dims, cfg.num_arm_joints) + + state_dict = { + k.replace("actor_mean.", ""): v + for k, v in ckpt["agent"].items() + if k.startswith("actor_mean.") + } + self.mlp.load_state_dict(state_dict) + self.mlp.to(self.device).eval() + + self._obs_mean = ckpt["obs_normalizer"]["mean"].to(self.device) + self._obs_var = ckpt["obs_normalizer"]["var"].to(self.device) + + def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor: + """Normalize observations using stored running mean/var.""" + return (obs - self._obs_mean) / (self._obs_var.sqrt() + 1e-8) + + def _build_obs( + self, + qpos: torch.Tensor, + ee_pos: torch.Tensor, + ee_quat: torch.Tensor, + target_pos: torch.Tensor, + target_quat: torch.Tensor, + last_action: torch.Tensor, + ) -> torch.Tensor: + """Build observation vector: [joint_pos(N), ee_pose(7), target_pose(7), last_action(N)].""" + return torch.cat( + [qpos[:, : self._num_arm_joints], ee_pos, ee_quat, target_pos, target_quat, last_action], + dim=-1, + ) + + def get_ik( + self, + target_xpos: torch.Tensor, + qpos_seed: torch.Tensor | None = None, + num_samples: int | None = None, + **kwargs, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Solve IK using the trained neural policy. + + Args: + target_xpos: Target pose as 4x4 matrix, shape (4,4) or (B,4,4). + qpos_seed: Initial joint positions, shape (dof,) or (B,dof). + num_samples: Ignored (policy is deterministic). + + Returns: + Tuple of (success [B], target_joints [B,1,dof]). + """ + target_xpos = torch.as_tensor( + target_xpos, device=self.device, dtype=torch.float32 + ) + if target_xpos.dim() == 2: + target_xpos = target_xpos.unsqueeze(0) + B = target_xpos.shape[0] + + target_pos = target_xpos[:, :3, 3] + target_quat = convert_quat( + quat_from_matrix(target_xpos[:, :3, :3]), to="xyzw" + ) + + if qpos_seed is None: + qpos = torch.zeros(B, self.dof, device=self.device) + else: + qpos = torch.as_tensor(qpos_seed, device=self.device, dtype=torch.float32) + if qpos.dim() == 1: + qpos = qpos.unsqueeze(0).expand(B, -1) + qpos = qpos.clone() + + last_action = torch.zeros(B, self._num_arm_joints, device=self.device) + + with torch.no_grad(): + for _ in range(self._max_steps): + ee_xpos = self.get_fk(qpos) + ee_pos = ee_xpos[:, :3, 3] + ee_quat = convert_quat( + quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw" + ) + + obs = self._build_obs( + qpos, ee_pos, ee_quat, target_pos, target_quat, last_action + ) + action = self.mlp(self._normalize_obs(obs)).clamp(-1.0, 1.0) + + qpos[:, : self._num_arm_joints] += action * self._action_scale + qpos[:, : self._num_arm_joints] = torch.clamp( + qpos[:, : self._num_arm_joints], + self.lower_qpos_limits[: self._num_arm_joints], + self.upper_qpos_limits[: self._num_arm_joints], + ) + last_action = action + + # Convergence check + ik_xpos = self.get_fk(qpos) + pos_err = (ik_xpos[:, :3, 3] - target_pos).norm(dim=-1) + ik_quat_wxyz = quat_from_matrix(ik_xpos[:, :3, :3]) + target_quat_wxyz = quat_from_matrix(target_xpos[:, :3, :3]) + rot_err = quat_error_magnitude(target_quat_wxyz, ik_quat_wxyz) + success = (pos_err < self._pos_eps) & (rot_err < self._rot_eps) + + return success, qpos.unsqueeze(1) diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py new file mode 100644 index 00000000..fee6058d --- /dev/null +++ b/examples/sim/solvers/neural_ik_solver.py @@ -0,0 +1,178 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +import math +import os +import time + +import numpy as np +import torch +from IPython import embed + +from embodichain.data import get_data_path +from embodichain.lab.sim.cfg import MarkerCfg, RobotCfg +from embodichain.lab.sim.objects import Robot +from embodichain.lab.sim import SimulationManager, SimulationManagerCfg + + +def main(): + np.set_printoptions(precision=5, suppress=True) + torch.set_printoptions(precision=5, sci_mode=False) + + sim_device = "cpu" + config = SimulationManagerCfg(headless=False, sim_device=sim_device) + sim = SimulationManager(config) + sim.set_manual_update(False) + + # Franka FR3 URDF from EmbodiChain data + urdf = get_data_path("Franka/FR3/fr3.urdf") + assert os.path.isfile(urdf) + + checkpoint_path = os.path.expanduser( + "~/文档/Research/analytic_policy_gradients/checkpoints/" + "FrankaReach-v0__rl__1__1779942193/best.pt" + ) + assert os.path.isfile(checkpoint_path), f"Checkpoint not found: {checkpoint_path}" + c = math.cos(-math.pi / 4) + s = math.sin(-math.pi / 4) + tcp = [ + [c, -s, 0.0, 0.0], + [s, c, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.1034], + [0.0, 0.0, 0.0, 1.0], + ] + joint_names = [ + "fr3_joint1", "fr3_joint2", "fr3_joint3", "fr3_joint4", + "fr3_joint5", "fr3_joint6", "fr3_joint7", + ] + + + cfg_dict = { + "fpath": urdf, + "control_parts": { + "main_arm": joint_names, + }, + "solver_cfg": { + "main_arm": { + "class_type": "NeuralIKSolver", + "end_link_name": "fr3_link8", + "root_link_name": "fr3_link0", + "tcp": tcp, + "checkpoint_path": checkpoint_path, + "num_arm_joints": 7, + "max_steps": 30, + "action_scale": 0.2, + "hidden_dims": [256, 256], + }, + }, + } + import ipdb; ipdb.set_trace() + robot: Robot = sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) + ipdb.set_trace() + arm_name = "main_arm" + + # Set home position and also set it as the joint target for PD control + qpos = torch.tensor( + [0.0, -0.7854, 0.0, -2.3562, 0.0, 1.5708, 0.7854], + dtype=torch.float32, + device=sim_device, + ).unsqueeze(0) + robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name)) + robot.set_qpos_target(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name)) + + # Wait for physics to settle + time.sleep(2.0) + + # Compute FK to get current EE pose (with TCP applied) + fk_xpos = robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) + print(f"Current EE pose (TCP):\n{fk_xpos}") + + # Define target offset from current pose + start_pose = fk_xpos.clone()[0] + end_pose = fk_xpos.clone()[0] + end_pose[:3, 3] += torch.tensor([0.1, 0.2, -0.1], device=sim_device) + + # Interpolate between start and end + num_steps = 50 + interpolated_poses = [ + torch.lerp(start_pose, end_pose, t) for t in np.linspace(0, 1, num_steps) + ] + + ik_qpos = qpos + + # Solve IK for the end pose and measure time + t0 = time.time() + res, ik_qpos = robot.compute_ik( + pose=end_pose, joint_seed=ik_qpos, name=arm_name + ) + t1 = time.time() + print(f"End-pose IK: success={res}, time={t1 - t0:.4f}s") + + if ik_qpos.dim() == 3: + ik_xpos = robot.compute_fk( + qpos=ik_qpos[0][0], name=arm_name, to_matrix=True + ) + else: + ik_xpos = robot.compute_fk(qpos=ik_qpos, name=arm_name, to_matrix=True) + pos_err = (ik_xpos[:, :3, 3] - end_pose[:3, 3].unsqueeze(0)).norm().item() + print(f"Position error at end pose: {pos_err:.6f} m") + + # Draw markers for target and IK result + sim.draw_marker( + cfg=MarkerCfg( + name="target", + marker_type="axis", + axis_xpos=np.array(end_pose.tolist()), + axis_size=0.002, + axis_len=0.005, + ) + ) + sim.draw_marker( + cfg=MarkerCfg( + name="ik_result", + marker_type="axis", + axis_xpos=np.array(ik_xpos.tolist()), + axis_size=0.002, + axis_len=0.005, + ) + ) + + # Step through interpolated poses + for i, pose in enumerate(interpolated_poses): + t_start = time.time() + res, ik_qpos = robot.compute_ik( + pose=pose, joint_seed=ik_qpos, name=arm_name + ) + t_end = time.time() + + if not res: + print(f"Step {i}: IK failed (pos_err shown in solver)") + continue + + if ik_qpos.dim() == 3: + q = ik_qpos[0][0] + else: + q = ik_qpos + robot.set_qpos(qpos=q, joint_ids=robot.get_joint_ids(arm_name)) + robot.set_qpos_target(qpos=q, joint_ids=robot.get_joint_ids(arm_name)) + + print(f"Step {i}: time={t_end - t_start:.4f}s") + time.sleep(0.01) + + embed(header="NeuralIKSolver example. Press Ctrl+D to exit.") + + +if __name__ == "__main__": + main() From b8bc8d5f08066813d6648e8551afa44852976312 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Sat, 30 May 2026 00:00:05 +0800 Subject: [PATCH 02/16] fix: use panda with hand --- examples/sim/solvers/neural_ik_solver.py | 40 +++++++++--------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py index fee6058d..baa446c8 100644 --- a/examples/sim/solvers/neural_ik_solver.py +++ b/examples/sim/solvers/neural_ik_solver.py @@ -34,10 +34,8 @@ def main(): sim_device = "cpu" config = SimulationManagerCfg(headless=False, sim_device=sim_device) sim = SimulationManager(config) - sim.set_manual_update(False) - # Franka FR3 URDF from EmbodiChain data - urdf = get_data_path("Franka/FR3/fr3.urdf") + urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) checkpoint_path = os.path.expanduser( @@ -54,8 +52,8 @@ def main(): [0.0, 0.0, 0.0, 1.0], ] joint_names = [ - "fr3_joint1", "fr3_joint2", "fr3_joint3", "fr3_joint4", - "fr3_joint5", "fr3_joint6", "fr3_joint7", + "Joint1", "Joint2", "Joint3", "Joint4", + "Joint5", "Joint6", "Joint7", ] @@ -67,8 +65,8 @@ def main(): "solver_cfg": { "main_arm": { "class_type": "NeuralIKSolver", - "end_link_name": "fr3_link8", - "root_link_name": "fr3_link0", + "end_link_name": "ee_link", + "root_link_name": "base_link", "tcp": tcp, "checkpoint_path": checkpoint_path, "num_arm_joints": 7, @@ -78,23 +76,18 @@ def main(): }, }, } - import ipdb; ipdb.set_trace() robot: Robot = sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) - ipdb.set_trace() arm_name = "main_arm" - # Set home position and also set it as the joint target for PD control qpos = torch.tensor( - [0.0, -0.7854, 0.0, -2.3562, 0.0, 1.5708, 0.7854], + [0.0, -np.pi/4, 0.0, -3*np.pi/4, 0.0, np.pi/2, np.pi/4], dtype=torch.float32, device=sim_device, ).unsqueeze(0) - robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name)) - robot.set_qpos_target(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name)) - - # Wait for physics to settle + robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name), target=False) + robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name), target=True) + sim.set_manual_update(False) time.sleep(2.0) - # Compute FK to get current EE pose (with TCP applied) fk_xpos = robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) print(f"Current EE pose (TCP):\n{fk_xpos}") @@ -102,7 +95,7 @@ def main(): # Define target offset from current pose start_pose = fk_xpos.clone()[0] end_pose = fk_xpos.clone()[0] - end_pose[:3, 3] += torch.tensor([0.1, 0.2, -0.1], device=sim_device) + end_pose[:3, 3] += torch.tensor([0.3, 0.4, -0.2], device=sim_device) # Interpolate between start and end num_steps = 50 @@ -157,19 +150,16 @@ def main(): ) t_end = time.time() - if not res: - print(f"Step {i}: IK failed (pos_err shown in solver)") - continue - if ik_qpos.dim() == 3: q = ik_qpos[0][0] else: q = ik_qpos - robot.set_qpos(qpos=q, joint_ids=robot.get_joint_ids(arm_name)) - robot.set_qpos_target(qpos=q, joint_ids=robot.get_joint_ids(arm_name)) + robot.set_qpos(qpos=q, joint_ids=robot.get_joint_ids(arm_name), target=False) + robot.set_qpos(qpos=q, joint_ids=robot.get_joint_ids(arm_name), target=True) - print(f"Step {i}: time={t_end - t_start:.4f}s") - time.sleep(0.01) + status = "OK" if res.item() else "FAIL" + print(f"Step {i}: [{status}] time={t_end - t_start:.4f}s") + time.sleep(0.02) embed(header="NeuralIKSolver example. Press Ctrl+D to exit.") From ff895d8fe7cda25682791fafb1ca1a8bc98ae4f6 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Sat, 30 May 2026 16:03:46 +0800 Subject: [PATCH 03/16] visualization --- examples/sim/solvers/neural_ik_solver.py | 108 ++++++++++++++--------- 1 file changed, 65 insertions(+), 43 deletions(-) diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py index baa446c8..d461ab83 100644 --- a/examples/sim/solvers/neural_ik_solver.py +++ b/examples/sim/solvers/neural_ik_solver.py @@ -31,36 +31,45 @@ def main(): np.set_printoptions(precision=5, suppress=True) torch.set_printoptions(precision=5, sci_mode=False) + # Set up simulation with specified device (CPU or CUDA) sim_device = "cpu" config = SimulationManagerCfg(headless=False, sim_device=sim_device) sim = SimulationManager(config) + sim.set_manual_update(False) + # Load robot URDF file urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) + # Load neural IK checkpoint checkpoint_path = os.path.expanduser( "~/文档/Research/analytic_policy_gradients/checkpoints/" "FrankaReach-v0__rl__1__1779942193/best.pt" ) assert os.path.isfile(checkpoint_path), f"Checkpoint not found: {checkpoint_path}" + + # TCP offset (rotation of -pi/4 around Z, translation along Z) c = math.cos(-math.pi / 4) s = math.sin(-math.pi / 4) tcp = [ [c, -s, 0.0, 0.0], - [s, c, 0.0, 0.0], + [s, c, 0.0, 0.0], [0.0, 0.0, 1.0, 0.1034], [0.0, 0.0, 0.0, 1.0], ] - joint_names = [ - "Joint1", "Joint2", "Joint3", "Joint4", - "Joint5", "Joint6", "Joint7", - ] - cfg_dict = { "fpath": urdf, "control_parts": { - "main_arm": joint_names, + "main_arm": [ + "Joint1", + "Joint2", + "Joint3", + "Joint4", + "Joint5", + "Joint6", + "Joint7", + ], }, "solver_cfg": { "main_arm": { @@ -73,32 +82,34 @@ def main(): "max_steps": 30, "action_scale": 0.2, "hidden_dims": [256, 256], + "pos_eps": 0.1, + "rot_eps": 0.5, }, }, } + robot: Robot = sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) arm_name = "main_arm" + # Set initial joint positions qpos = torch.tensor( - [0.0, -np.pi/4, 0.0, -3*np.pi/4, 0.0, np.pi/2, np.pi/4], + [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], dtype=torch.float32, device=sim_device, ).unsqueeze(0) - robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name), target=False) - robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name), target=True) - sim.set_manual_update(False) - time.sleep(2.0) + robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name)) + time.sleep(3.0) + # Compute FK to get current EE pose (with TCP applied) fk_xpos = robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) - print(f"Current EE pose (TCP):\n{fk_xpos}") - - # Define target offset from current pose + print(f"fk_xpos: {fk_xpos}") start_pose = fk_xpos.clone()[0] end_pose = fk_xpos.clone()[0] end_pose[:3, 3] += torch.tensor([0.3, 0.4, -0.2], device=sim_device) - # Interpolate between start and end num_steps = 50 + + # Interpolate poses between start and end interpolated_poses = [ torch.lerp(start_pose, end_pose, t) for t in np.linspace(0, 1, num_steps) ] @@ -106,35 +117,29 @@ def main(): ik_qpos = qpos # Solve IK for the end pose and measure time - t0 = time.time() - res, ik_qpos = robot.compute_ik( - pose=end_pose, joint_seed=ik_qpos, name=arm_name - ) - t1 = time.time() - print(f"End-pose IK: success={res}, time={t1 - t0:.4f}s") + start_time = time.time() + res, ik_qpos = robot.compute_ik(pose=end_pose, joint_seed=ik_qpos, name=arm_name) + end_time = time.time() + print(f"End-pose IK: success={res}, time={end_time - start_time:.4f}s") if ik_qpos.dim() == 3: - ik_xpos = robot.compute_fk( - qpos=ik_qpos[0][0], name=arm_name, to_matrix=True - ) + ik_xpos = robot.compute_fk(qpos=ik_qpos[0][0], name=arm_name, to_matrix=True) else: ik_xpos = robot.compute_fk(qpos=ik_qpos, name=arm_name, to_matrix=True) - pos_err = (ik_xpos[:, :3, 3] - end_pose[:3, 3].unsqueeze(0)).norm().item() - print(f"Position error at end pose: {pos_err:.6f} m") - # Draw markers for target and IK result sim.draw_marker( cfg=MarkerCfg( - name="target", + name="fk_xpos", marker_type="axis", axis_xpos=np.array(end_pose.tolist()), axis_size=0.002, axis_len=0.005, ) ) + sim.draw_marker( cfg=MarkerCfg( - name="ik_result", + name="ik_xpos", marker_type="axis", axis_xpos=np.array(ik_xpos.tolist()), axis_size=0.002, @@ -142,23 +147,40 @@ def main(): ) ) - # Step through interpolated poses for i, pose in enumerate(interpolated_poses): - t_start = time.time() - res, ik_qpos = robot.compute_ik( - pose=pose, joint_seed=ik_qpos, name=arm_name - ) - t_end = time.time() - + print(f"Step {i}: Moving to pose:\n{pose}") + start_time = time.time() + res, ik_qpos = robot.compute_ik(pose=pose, joint_seed=ik_qpos, name=arm_name) + end_time = time.time() + compute_time = end_time - start_time + print(f"Step {i}: IK computation time: {compute_time:.6f} seconds") + + print(f"IK result: {res}, ik_qpos: {ik_qpos}") + if not res: + print(f"Step {i}: IK failed for pose:\n{pose}") + continue + + # Set robot joint positions if ik_qpos.dim() == 3: - q = ik_qpos[0][0] + robot.set_qpos( + qpos=ik_qpos[0][0], joint_ids=robot.get_joint_ids(arm_name) + ) else: - q = ik_qpos - robot.set_qpos(qpos=q, joint_ids=robot.get_joint_ids(arm_name), target=False) - robot.set_qpos(qpos=q, joint_ids=robot.get_joint_ids(arm_name), target=True) + robot.set_qpos(qpos=ik_qpos, joint_ids=robot.get_joint_ids(arm_name)) + + # Visualize current pose + ik_xpos = robot.compute_fk(qpos=ik_qpos, name=arm_name, to_matrix=True) + + sim.draw_marker( + cfg=MarkerCfg( + name=f"ik_xpos_step_{i}", + marker_type="axis", + axis_xpos=np.array(ik_xpos.tolist()), + axis_size=0.002, + axis_len=0.005, + ) + ) - status = "OK" if res.item() else "FAIL" - print(f"Step {i}: [{status}] time={t_end - t_start:.4f}s") time.sleep(0.02) embed(header="NeuralIKSolver example. Press Ctrl+D to exit.") From 77f511518923576b0b00a2156d842b25658d5024 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Sat, 30 May 2026 16:32:50 +0800 Subject: [PATCH 04/16] add tests for neural ik solver --- tests/sim/solvers/test_neural_ik_solver.py | 208 +++++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 tests/sim/solvers/test_neural_ik_solver.py diff --git a/tests/sim/solvers/test_neural_ik_solver.py b/tests/sim/solvers/test_neural_ik_solver.py new file mode 100644 index 00000000..43cbcb8c --- /dev/null +++ b/tests/sim/solvers/test_neural_ik_solver.py @@ -0,0 +1,208 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +from __future__ import annotations + +import math +import os + +import numpy as np +import pytest +import torch + +from embodichain.data import get_data_path +from embodichain.lab.sim import SimulationManager, SimulationManagerCfg +from embodichain.lab.sim.cfg import RobotCfg +from embodichain.lab.sim.objects import Robot +from embodichain.utils.utility import reset_all_seeds + +CHECKPOINT_PATH = os.path.expanduser( + "~/文档/Research/analytic_policy_gradients/checkpoints/" + "FrankaReach-v0__rl__1__1779942193/best.pt" +) + +IK_POS_ATOL = 0.05 +IK_ROT_ATOL = 0.55 + +_c = math.cos(-math.pi / 4) +_s = math.sin(-math.pi / 4) +TCP = [ + [_c, -_s, 0.0, 0.0], + [_s, _c, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.1034], + [0.0, 0.0, 0.0, 1.0], +] + + +def grid_sample_qpos_from_limits( + qpos_limits: torch.Tensor, + steps_per_joint: int = 4, + device=None, + max_samples: int = 4096, +) -> torch.Tensor: + if device is None: + device = qpos_limits.device + + limits = qpos_limits.squeeze(0) if qpos_limits.dim() == 3 else qpos_limits + lows = limits[:, 0].to(device) + 1e-2 + highs = limits[:, 1].to(device) - 1e-2 + + # create per-joint linspaces + grids = [ + torch.linspace(l.item(), h.item(), steps_per_joint, device=device) + for l, h in zip(lows, highs) + ] + + # meshgrid and stack + mesh = torch.meshgrid(*grids, indexing="ij") + stacked = torch.stack([m.reshape(-1) for m in mesh], dim=1) + + if stacked.shape[0] > max_samples: + return stacked[:max_samples] + return stacked + + +# Skip entire module if checkpoint is not available. +pytestmark = pytest.mark.skipif( + not os.path.isfile(CHECKPOINT_PATH), + reason=f"NeuralIK checkpoint not found: {CHECKPOINT_PATH}", +) + + +class BaseSolverTest: + sim = None + + def setup_simulation(self, solver_type: str): + config = SimulationManagerCfg(headless=True, sim_device="cpu") + self.sim = SimulationManager(config) + + urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") + assert os.path.isfile(urdf) + + cfg_dict = { + "fpath": urdf, + "control_parts": { + "main_arm": [ + "Joint1", + "Joint2", + "Joint3", + "Joint4", + "Joint5", + "Joint6", + "Joint7", + ], + }, + "solver_cfg": { + "main_arm": { + "class_type": solver_type, + "end_link_name": "ee_link", + "root_link_name": "base_link", + "tcp": TCP, + "checkpoint_path": CHECKPOINT_PATH, + "num_arm_joints": 7, + "max_steps": 30, + "action_scale": 0.2, + "hidden_dims": [256, 256], + "pos_eps": 0.1, + "rot_eps": 0.5, + }, + }, + } + + self.robot: Robot = self.sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) + self.sim.update(step=100) + + def test_ik(self): + reset_all_seeds(0) + arm_name = "main_arm" + qpos_limit = torch.tensor( + [ + [0.2, 0.8], + [0.2, 0.8], + [0.2, 0.8], + [0.2, 0.8], + [0.2, 0.8], + [0.2, 0.8], + [0.2, 0.8], + ] + ) + sample_qpos = grid_sample_qpos_from_limits( + qpos_limit, steps_per_joint=3, device=self.robot.device, max_samples=200 + ) + sample_qpos = sample_qpos[None, :, :] + + fk_xpos = self.robot.compute_batch_fk( + qpos=sample_qpos, name=arm_name, to_matrix=True + ) + fk_xpos_xyzquat = self.robot.compute_batch_fk( + qpos=sample_qpos, name=arm_name, to_matrix=False + ) + + res, ik_qpos = self.robot.compute_batch_ik( + pose=fk_xpos, joint_seed=sample_qpos, name=arm_name + ) + + res, ik_qpos_xyzquat = self.robot.compute_batch_ik( + pose=fk_xpos_xyzquat, joint_seed=sample_qpos, name=arm_name + ) + + ik_xpos = self.robot.compute_batch_fk( + qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True + ) + + pos_err = (fk_xpos[:, :, :3, 3] - ik_xpos[:, :, :3, 3]).norm(dim=-1) + assert torch.all( + pos_err < IK_POS_ATOL + ), f"Position error too large: max {pos_err.max().item():.4f} m > {IK_POS_ATOL} m" + + rot_err = (fk_xpos[:, :, :3, :3] - ik_xpos[:, :, :3, :3]).norm(dim=(-2, -1)) + assert torch.all( + rot_err < IK_ROT_ATOL + ), f"Rotation error too large: max {rot_err.max().item():.4f} > {IK_ROT_ATOL}" + + # test for failed xpos + invalid_pose = torch.tensor( + [ + [ + [1.0, 0.0, 0.0, 10.0], + [0.0, 1.0, 0.0, 10.0], + [0.0, 0.0, 1.0, 10.0], + [0.0, 0.0, 0.0, 1.0], + ] + ], + dtype=torch.float32, + device=self.robot.device, + ) + res, ik_qpos = self.robot.compute_ik( + pose=invalid_pose, joint_seed=ik_qpos[:, 0, :], name=arm_name + ) + dof = ik_qpos.shape[-1] + assert res[0].item() == False + assert ik_qpos.shape == (1, dof) + + def teardown_method(self): + """Clean up resources after each test method.""" + self.sim.destroy() + + +class TestNeuralIKSolver(BaseSolverTest): + def setup_method(self): + self.setup_simulation(solver_type="NeuralIKSolver") + + +if __name__ == "__main__": + np.set_printoptions(precision=5, suppress=True) + test_solver = TestNeuralIKSolver() + test_solver.setup_method() From 3fcb6d231381a3bc222684784ab43dc7cd4824b9 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Sat, 30 May 2026 17:07:27 +0800 Subject: [PATCH 05/16] download checkpoint from hugging face --- examples/sim/solvers/neural_ik_solver.py | 10 ++++------ tests/sim/solvers/test_neural_ik_solver.py | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py index d461ab83..a369b253 100644 --- a/examples/sim/solvers/neural_ik_solver.py +++ b/examples/sim/solvers/neural_ik_solver.py @@ -19,6 +19,7 @@ import numpy as np import torch +from huggingface_hub import hf_hub_download from IPython import embed from embodichain.data import get_data_path @@ -40,13 +41,10 @@ def main(): # Load robot URDF file urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) - - # Load neural IK checkpoint - checkpoint_path = os.path.expanduser( - "~/文档/Research/analytic_policy_gradients/checkpoints/" - "FrankaReach-v0__rl__1__1779942193/best.pt" + + checkpoint_path = hf_hub_download( + repo_id="dexforce/neural_ik_solver", filename="franka.pt" ) - assert os.path.isfile(checkpoint_path), f"Checkpoint not found: {checkpoint_path}" # TCP offset (rotation of -pi/4 around Z, translation along Z) c = math.cos(-math.pi / 4) diff --git a/tests/sim/solvers/test_neural_ik_solver.py b/tests/sim/solvers/test_neural_ik_solver.py index 43cbcb8c..ecb9e3f6 100644 --- a/tests/sim/solvers/test_neural_ik_solver.py +++ b/tests/sim/solvers/test_neural_ik_solver.py @@ -21,6 +21,7 @@ import numpy as np import pytest import torch +from huggingface_hub import hf_hub_download from embodichain.data import get_data_path from embodichain.lab.sim import SimulationManager, SimulationManagerCfg @@ -28,10 +29,8 @@ from embodichain.lab.sim.objects import Robot from embodichain.utils.utility import reset_all_seeds -CHECKPOINT_PATH = os.path.expanduser( - "~/文档/Research/analytic_policy_gradients/checkpoints/" - "FrankaReach-v0__rl__1__1779942193/best.pt" -) +CHECKPOINT_REPO = "dexforce/neural_ik_solver" +CHECKPOINT_FILE = "franka.pt" IK_POS_ATOL = 0.05 IK_ROT_ATOL = 0.55 @@ -74,10 +73,10 @@ def grid_sample_qpos_from_limits( return stacked -# Skip entire module if checkpoint is not available. +# Skip tests if huggingface_hub cannot reach the checkpoint repo. pytestmark = pytest.mark.skipif( - not os.path.isfile(CHECKPOINT_PATH), - reason=f"NeuralIK checkpoint not found: {CHECKPOINT_PATH}", + os.environ.get("NEURAL_IK_OFFLINE") is not None, + reason="NEURAL_IK_OFFLINE is set, skipping NeuralIK tests", ) @@ -90,6 +89,9 @@ def setup_simulation(self, solver_type: str): urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) + checkpoint_path = hf_hub_download( + repo_id=CHECKPOINT_REPO, filename=CHECKPOINT_FILE + ) cfg_dict = { "fpath": urdf, @@ -110,7 +112,7 @@ def setup_simulation(self, solver_type: str): "end_link_name": "ee_link", "root_link_name": "base_link", "tcp": TCP, - "checkpoint_path": CHECKPOINT_PATH, + "checkpoint_path": checkpoint_path, "num_arm_joints": 7, "max_steps": 30, "action_scale": 0.2, From ba1f916b806553971a5481933cff2a2f6cdf4361 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Sat, 30 May 2026 17:10:21 +0800 Subject: [PATCH 06/16] reformat files --- embodichain/lab/sim/solvers/neural_ik_solver.py | 17 ++++++++++------- examples/sim/solvers/neural_ik_solver.py | 6 ++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/embodichain/lab/sim/solvers/neural_ik_solver.py b/embodichain/lab/sim/solvers/neural_ik_solver.py index 949b0534..c5c0207a 100644 --- a/embodichain/lab/sim/solvers/neural_ik_solver.py +++ b/embodichain/lab/sim/solvers/neural_ik_solver.py @@ -153,7 +153,14 @@ def _build_obs( ) -> torch.Tensor: """Build observation vector: [joint_pos(N), ee_pose(7), target_pose(7), last_action(N)].""" return torch.cat( - [qpos[:, : self._num_arm_joints], ee_pos, ee_quat, target_pos, target_quat, last_action], + [ + qpos[:, : self._num_arm_joints], + ee_pos, + ee_quat, + target_pos, + target_quat, + last_action, + ], dim=-1, ) @@ -182,9 +189,7 @@ def get_ik( B = target_xpos.shape[0] target_pos = target_xpos[:, :3, 3] - target_quat = convert_quat( - quat_from_matrix(target_xpos[:, :3, :3]), to="xyzw" - ) + target_quat = convert_quat(quat_from_matrix(target_xpos[:, :3, :3]), to="xyzw") if qpos_seed is None: qpos = torch.zeros(B, self.dof, device=self.device) @@ -200,9 +205,7 @@ def get_ik( for _ in range(self._max_steps): ee_xpos = self.get_fk(qpos) ee_pos = ee_xpos[:, :3, 3] - ee_quat = convert_quat( - quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw" - ) + ee_quat = convert_quat(quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw") obs = self._build_obs( qpos, ee_pos, ee_quat, target_pos, target_quat, last_action diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py index a369b253..34fdc499 100644 --- a/examples/sim/solvers/neural_ik_solver.py +++ b/examples/sim/solvers/neural_ik_solver.py @@ -41,7 +41,7 @@ def main(): # Load robot URDF file urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) - + checkpoint_path = hf_hub_download( repo_id="dexforce/neural_ik_solver", filename="franka.pt" ) @@ -160,9 +160,7 @@ def main(): # Set robot joint positions if ik_qpos.dim() == 3: - robot.set_qpos( - qpos=ik_qpos[0][0], joint_ids=robot.get_joint_ids(arm_name) - ) + robot.set_qpos(qpos=ik_qpos[0][0], joint_ids=robot.get_joint_ids(arm_name)) else: robot.set_qpos(qpos=ik_qpos, joint_ids=robot.get_joint_ids(arm_name)) From c7ffb4ed7e7acd195cd5f9f38fbe5af88f1c93c4 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Mon, 1 Jun 2026 00:44:12 +0800 Subject: [PATCH 07/16] fix: download checkpoint from hugging face --- embodichain/data/assets/solver_assets.py | 33 ++++++++++++++++++++++ examples/sim/solvers/neural_ik_solver.py | 6 ++-- tests/sim/solvers/test_neural_ik_solver.py | 9 ++---- 3 files changed, 37 insertions(+), 11 deletions(-) create mode 100644 embodichain/data/assets/solver_assets.py diff --git a/embodichain/data/assets/solver_assets.py b/embodichain/data/assets/solver_assets.py new file mode 100644 index 00000000..09ecf1e1 --- /dev/null +++ b/embodichain/data/assets/solver_assets.py @@ -0,0 +1,33 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +from __future__ import annotations + +from huggingface_hub import hf_hub_download + + +def download_neural_ik_checkpoint( + repo_id: str = "dexforce/neural_ik_solver", + filename: str = "franka.pt", +) -> str: + """Download a neural IK solver checkpoint from HuggingFace. + + Returns: + str: Local path to the downloaded checkpoint file. + """ + return hf_hub_download( + repo_id=repo_id, + filename=filename, + ) diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py index 34fdc499..31d0b03f 100644 --- a/examples/sim/solvers/neural_ik_solver.py +++ b/examples/sim/solvers/neural_ik_solver.py @@ -19,10 +19,10 @@ import numpy as np import torch -from huggingface_hub import hf_hub_download from IPython import embed from embodichain.data import get_data_path +from embodichain.data.assets.solver_assets import download_neural_ik_checkpoint from embodichain.lab.sim.cfg import MarkerCfg, RobotCfg from embodichain.lab.sim.objects import Robot from embodichain.lab.sim import SimulationManager, SimulationManagerCfg @@ -42,9 +42,7 @@ def main(): urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) - checkpoint_path = hf_hub_download( - repo_id="dexforce/neural_ik_solver", filename="franka.pt" - ) + checkpoint_path = download_neural_ik_checkpoint() # TCP offset (rotation of -pi/4 around Z, translation along Z) c = math.cos(-math.pi / 4) diff --git a/tests/sim/solvers/test_neural_ik_solver.py b/tests/sim/solvers/test_neural_ik_solver.py index ecb9e3f6..0f5a2cdd 100644 --- a/tests/sim/solvers/test_neural_ik_solver.py +++ b/tests/sim/solvers/test_neural_ik_solver.py @@ -21,17 +21,14 @@ import numpy as np import pytest import torch -from huggingface_hub import hf_hub_download from embodichain.data import get_data_path +from embodichain.data.assets.solver_assets import download_neural_ik_checkpoint from embodichain.lab.sim import SimulationManager, SimulationManagerCfg from embodichain.lab.sim.cfg import RobotCfg from embodichain.lab.sim.objects import Robot from embodichain.utils.utility import reset_all_seeds -CHECKPOINT_REPO = "dexforce/neural_ik_solver" -CHECKPOINT_FILE = "franka.pt" - IK_POS_ATOL = 0.05 IK_ROT_ATOL = 0.55 @@ -89,9 +86,7 @@ def setup_simulation(self, solver_type: str): urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) - checkpoint_path = hf_hub_download( - repo_id=CHECKPOINT_REPO, filename=CHECKPOINT_FILE - ) + checkpoint_path = download_neural_ik_checkpoint() cfg_dict = { "fpath": urdf, From ca2e1469a6ac9b15fc57c624ad3803c98d2870bd Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Mon, 1 Jun 2026 13:54:37 +0800 Subject: [PATCH 08/16] support multiple sampled init qpos --- .../lab/sim/solvers/neural_ik_solver.py | 131 ++++++++++++++---- tests/sim/solvers/test_neural_ik_solver.py | 46 ++++++ 2 files changed, 148 insertions(+), 29 deletions(-) diff --git a/embodichain/lab/sim/solvers/neural_ik_solver.py b/embodichain/lab/sim/solvers/neural_ik_solver.py index c5c0207a..35a0ba86 100644 --- a/embodichain/lab/sim/solvers/neural_ik_solver.py +++ b/embodichain/lab/sim/solvers/neural_ik_solver.py @@ -25,6 +25,7 @@ quat_from_matrix, ) from embodichain.lab.sim.solvers import SolverCfg, BaseSolver +from embodichain.lab.sim.solvers.qpos_seed_sampler import QposSeedSampler __all__ = ["NeuralIKSolverCfg", "NeuralIKSolver"] @@ -59,6 +60,9 @@ class NeuralIKSolverCfg(SolverCfg): rot_eps: float = 0.1 """Rotation convergence tolerance (radians) for success check.""" + num_samples: int = 1 + """Number of random initial qpos seeds to sample per target pose.""" + def init_solver( self, device: torch.device = torch.device("cpu"), **kwargs ) -> NeuralIKSolver: @@ -96,6 +100,7 @@ def __init__(self, cfg: NeuralIKSolverCfg, device=None, **kwargs): self._num_arm_joints = cfg.num_arm_joints self._pos_eps = cfg.pos_eps self._rot_eps = cfg.rot_eps + self._num_samples = cfg.num_samples ckpt = torch.load( cfg.checkpoint_path, map_location=self.device, weights_only=False @@ -164,48 +169,34 @@ def _build_obs( dim=-1, ) - def get_ik( + def _run_policy( self, + qpos: torch.Tensor, target_xpos: torch.Tensor, - qpos_seed: torch.Tensor | None = None, - num_samples: int | None = None, - **kwargs, + target_pos: torch.Tensor, + target_quat: torch.Tensor, ) -> tuple[torch.Tensor, torch.Tensor]: - """Solve IK using the trained neural policy. + """Run the iterative neural policy loop and check convergence. Args: - target_xpos: Target pose as 4x4 matrix, shape (4,4) or (B,4,4). - qpos_seed: Initial joint positions, shape (dof,) or (B,dof). - num_samples: Ignored (policy is deterministic). + qpos: Joint positions, shape (B, dof). Modified in-place. + target_xpos: Target poses, shape (B, 4, 4). + target_pos: Target positions, shape (B, 3). + target_quat: Target quaternions (xyzw), shape (B, 4). Returns: - Tuple of (success [B], target_joints [B,1,dof]). + Tuple of (success [B], ik_qpos [B, dof]). """ - target_xpos = torch.as_tensor( - target_xpos, device=self.device, dtype=torch.float32 - ) - if target_xpos.dim() == 2: - target_xpos = target_xpos.unsqueeze(0) - B = target_xpos.shape[0] - - target_pos = target_xpos[:, :3, 3] - target_quat = convert_quat(quat_from_matrix(target_xpos[:, :3, :3]), to="xyzw") - - if qpos_seed is None: - qpos = torch.zeros(B, self.dof, device=self.device) - else: - qpos = torch.as_tensor(qpos_seed, device=self.device, dtype=torch.float32) - if qpos.dim() == 1: - qpos = qpos.unsqueeze(0).expand(B, -1) - qpos = qpos.clone() - + B = qpos.shape[0] last_action = torch.zeros(B, self._num_arm_joints, device=self.device) with torch.no_grad(): for _ in range(self._max_steps): ee_xpos = self.get_fk(qpos) ee_pos = ee_xpos[:, :3, 3] - ee_quat = convert_quat(quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw") + ee_quat = convert_quat( + quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw" + ) obs = self._build_obs( qpos, ee_pos, ee_quat, target_pos, target_quat, last_action @@ -228,4 +219,86 @@ def get_ik( rot_err = quat_error_magnitude(target_quat_wxyz, ik_quat_wxyz) success = (pos_err < self._pos_eps) & (rot_err < self._rot_eps) - return success, qpos.unsqueeze(1) + return success, qpos + + def get_ik( + self, + target_xpos: torch.Tensor, + qpos_seed: torch.Tensor | None = None, + num_samples: int | None = None, + **kwargs, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Solve IK using the trained neural policy. + + Args: + target_xpos: Target pose as 4x4 matrix, shape (4,4) or (B,4,4). + qpos_seed: Initial joint positions, shape (dof,) or (B,dof). + num_samples: Number of random initial seeds per target pose. + Defaults to ``cfg.num_samples`` (1). When > 1, generates + multiple random seeds within joint limits and returns the + solution closest to ``qpos_seed``. + return_all_solutions: If True, return all sampled solutions + with shape (B, num_samples, dof) instead of the closest. + + Returns: + Tuple of (success [B], target_joints [B,1,dof] or [B,num_samples,dof]). + """ + return_all_solutions = kwargs.get("return_all_solutions", False) + + n = num_samples if num_samples is not None else self._num_samples + + target_xpos = torch.as_tensor( + target_xpos, device=self.device, dtype=torch.float32 + ) + if target_xpos.dim() == 2: + target_xpos = target_xpos.unsqueeze(0) + B = target_xpos.shape[0] + + target_pos = target_xpos[:, :3, 3] + target_quat = convert_quat(quat_from_matrix(target_xpos[:, :3, :3]), to="xyzw") + + if qpos_seed is None: + qpos_seed = torch.zeros(B, self.dof, device=self.device) + else: + qpos_seed = torch.as_tensor( + qpos_seed, device=self.device, dtype=torch.float32 + ) + if qpos_seed.dim() == 1: + qpos_seed = qpos_seed.unsqueeze(0).expand(B, -1) + qpos_seed = qpos_seed.clone() + + # Single sample: run directly without QposSeedSampler overhead. + if n <= 1: + success, ik_qpos = self._run_policy( + qpos_seed, target_xpos, target_pos, target_quat + ) + return success, ik_qpos.unsqueeze(1) + + # Multiple samples: use QposSeedSampler for random seeds. + sampler = QposSeedSampler(num_samples=n, dof=self.dof, device=self.device) + all_seeds = sampler.sample( + qpos_seed, self.lower_qpos_limits, self.upper_qpos_limits, B + ) + target_xpos_repeated = sampler.repeat_target_xpos(target_xpos, n) + target_pos_rep = target_xpos_repeated[:, :3, 3] + target_quat_rep = convert_quat( + quat_from_matrix(target_xpos_repeated[:, :3, :3]), to="xyzw" + ) + + success_flat, ik_qpos_flat = self._run_policy( + all_seeds, target_xpos_repeated, target_pos_rep, target_quat_rep + ) + + all_success = success_flat.reshape(B, n) + all_results = ik_qpos_flat.reshape(B, n, self.dof) + + if return_all_solutions: + return all_success.any(dim=1), all_results + + # Pick solution closest to seed. + seed_repeat = qpos_seed.unsqueeze(1).repeat(1, n, 1) + dist = (all_results - seed_repeat).norm(dim=-1) + dist[~all_success] = float("inf") + closest_idx = torch.argmin(dist, dim=1) + closest_qpos = all_results[torch.arange(B, device=self.device), closest_idx] + return all_success.any(dim=1), closest_qpos[:, None, :] diff --git a/tests/sim/solvers/test_neural_ik_solver.py b/tests/sim/solvers/test_neural_ik_solver.py index 0f5a2cdd..dbe495b8 100644 --- a/tests/sim/solvers/test_neural_ik_solver.py +++ b/tests/sim/solvers/test_neural_ik_solver.py @@ -198,6 +198,52 @@ class TestNeuralIKSolver(BaseSolverTest): def setup_method(self): self.setup_simulation(solver_type="NeuralIKSolver") + def _make_solver_input(self): + """Create a standard qpos and its FK target for solver tests.""" + arm_name = "main_arm" + qpos = torch.tensor( + [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], + dtype=torch.float32, + device=self.robot.device, + ).unsqueeze(0) + target_xpos = self.robot.compute_fk( + qpos=qpos, name=arm_name, to_matrix=True + ) + solver = self.robot.get_solver(arm_name) + return solver, qpos, target_xpos + + def test_multi_sample_shape(self): + """Verify output shape when using multiple samples.""" + reset_all_seeds(0) + solver, qpos, target_xpos = self._make_solver_input() + + success, ik_qpos = solver.get_ik( + target_xpos=target_xpos, + qpos_seed=qpos, + num_samples=5, + ) + + dof = qpos.shape[-1] + assert success.shape == (1,) + assert ik_qpos.shape == (1, 1, dof) + + def test_multi_sample_return_all(self): + """Verify return_all_solutions returns all sampled solutions.""" + reset_all_seeds(0) + solver, qpos, target_xpos = self._make_solver_input() + num_samples = 5 + + success, ik_qpos = solver.get_ik( + target_xpos=target_xpos, + qpos_seed=qpos, + num_samples=num_samples, + return_all_solutions=True, + ) + + dof = qpos.shape[-1] + assert success.shape == (1,) + assert ik_qpos.shape == (1, num_samples, dof) + if __name__ == "__main__": np.set_printoptions(precision=5, suppress=True) From 081c1073ff5b97554b0da0854e1c24d6934c22fd Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Mon, 1 Jun 2026 13:56:53 +0800 Subject: [PATCH 09/16] reformat files --- embodichain/lab/sim/solvers/neural_ik_solver.py | 4 +--- tests/sim/solvers/test_neural_ik_solver.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/embodichain/lab/sim/solvers/neural_ik_solver.py b/embodichain/lab/sim/solvers/neural_ik_solver.py index 35a0ba86..7f1cb1d1 100644 --- a/embodichain/lab/sim/solvers/neural_ik_solver.py +++ b/embodichain/lab/sim/solvers/neural_ik_solver.py @@ -194,9 +194,7 @@ def _run_policy( for _ in range(self._max_steps): ee_xpos = self.get_fk(qpos) ee_pos = ee_xpos[:, :3, 3] - ee_quat = convert_quat( - quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw" - ) + ee_quat = convert_quat(quat_from_matrix(ee_xpos[:, :3, :3]), to="xyzw") obs = self._build_obs( qpos, ee_pos, ee_quat, target_pos, target_quat, last_action diff --git a/tests/sim/solvers/test_neural_ik_solver.py b/tests/sim/solvers/test_neural_ik_solver.py index dbe495b8..7d814e21 100644 --- a/tests/sim/solvers/test_neural_ik_solver.py +++ b/tests/sim/solvers/test_neural_ik_solver.py @@ -206,9 +206,7 @@ def _make_solver_input(self): dtype=torch.float32, device=self.robot.device, ).unsqueeze(0) - target_xpos = self.robot.compute_fk( - qpos=qpos, name=arm_name, to_matrix=True - ) + target_xpos = self.robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) solver = self.robot.get_solver(arm_name) return solver, qpos, target_xpos From 798f9bf1908eb7ce95c71186f49ebb191556b2b8 Mon Sep 17 00:00:00 2001 From: yuecideng Date: Tue, 2 Jun 2026 14:14:18 +0800 Subject: [PATCH 10/16] wip --- embodichain/data/assets/solver_assets.py | 64 +++++- embodichain/lab/sim/objects/articulation.py | 8 + embodichain/lab/sim/sim_manager.py | 14 +- examples/sim/solvers/neural_ik_solver.py | 227 ++++++++++++++------ 4 files changed, 231 insertions(+), 82 deletions(-) diff --git a/embodichain/data/assets/solver_assets.py b/embodichain/data/assets/solver_assets.py index 09ecf1e1..a4fb2e65 100644 --- a/embodichain/data/assets/solver_assets.py +++ b/embodichain/data/assets/solver_assets.py @@ -15,19 +15,75 @@ # ---------------------------------------------------------------------------- from __future__ import annotations +import os + from huggingface_hub import hf_hub_download +# HuggingFace endpoint. Mirrors (e.g. hf-mirror.com) often redirect to the +# real hub without forwarding the required commit-hash response headers, so we +# default to the canonical endpoint and rely on the system proxy when needed. +_HF_ENDPOINT = "https://huggingface.co" + def download_neural_ik_checkpoint( repo_id: str = "dexforce/neural_ik_solver", filename: str = "franka.pt", + token: str | None = None, + endpoint: str = _HF_ENDPOINT, ) -> str: """Download a neural IK solver checkpoint from HuggingFace. + The repository is gated. Either set the ``HF_TOKEN`` environment variable or + run ``huggingface-cli login`` before calling this function. + + If your network requires an HTTP proxy, set ``HTTPS_PROXY`` or + ``https_proxy`` in the environment before launching Python. + + Args: + repo_id: HuggingFace repository ID. + filename: Checkpoint filename to download. + token: HuggingFace API token. Falls back to the ``HF_TOKEN`` + environment variable or the cached token from + ``huggingface-cli login``. + endpoint: HuggingFace-compatible endpoint URL. Defaults to + ``https://huggingface.co``. Mirrors that merely redirect to the + real hub are not supported. + Returns: str: Local path to the downloaded checkpoint file. + + Raises: + RuntimeError: If the download fails, with authentication instructions. """ - return hf_hub_download( - repo_id=repo_id, - filename=filename, - ) + # Normalize proxy env vars: the ``requests`` library on Linux requires the + # lowercase form (``https_proxy``), but users typically export the uppercase + # form (``HTTPS_PROXY``). + https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") + if https_proxy: + os.environ.setdefault("https_proxy", https_proxy) + os.environ.setdefault("HTTPS_PROXY", https_proxy) + + # Allow callers to pass the token explicitly; otherwise fall back to + # HF_TOKEN (huggingface_hub also reads this automatically, but being + # explicit makes the fallback order transparent). + if token is None: + token = os.environ.get("HF_TOKEN") or None + + try: + return hf_hub_download( + repo_id=repo_id, + filename=filename, + token=token, + endpoint=endpoint, + ) + except Exception as exc: + raise RuntimeError( + f"Failed to download '{filename}' from '{repo_id}'.\n" + "The repository is gated and requires an authenticated HuggingFace account.\n" + "To fix this:\n" + " 1. Accept the model license at https://huggingface.co/dexforce/neural_ik_solver\n" + " 2. Create an access token at https://huggingface.co/settings/tokens\n" + " 3. Export the token: export HF_TOKEN=\n" + " or run: huggingface-cli login\n" + f"Original error: {exc}" + ) from exc diff --git a/embodichain/lab/sim/objects/articulation.py b/embodichain/lab/sim/objects/articulation.py index 6d995e85..15d377b7 100644 --- a/embodichain/lab/sim/objects/articulation.py +++ b/embodichain/lab/sim/objects/articulation.py @@ -1105,6 +1105,10 @@ def set_qpos( else: qpos_set = self.body_data._qpos + if not isinstance(local_env_ids, torch.Tensor): + local_env_ids = torch.as_tensor( + local_env_ids, dtype=torch.long, device=self.device + ) indices = self.body_data.gpu_indices[local_env_ids] qpos_set[local_env_ids[:, None], local_joint_ids] = qpos self._ps.gpu_apply_joint_data( @@ -1181,6 +1185,10 @@ def set_qvel( else: qvel_set = self.body_data._qvel + if not isinstance(local_env_ids, torch.Tensor): + local_env_ids = torch.as_tensor( + local_env_ids, dtype=torch.long, device=self.device + ) indices = self.body_data.gpu_indices[local_env_ids] qvel_set[local_env_ids[:, None], local_joint_ids] = qvel self._ps.gpu_apply_joint_data( diff --git a/embodichain/lab/sim/sim_manager.py b/embodichain/lab/sim/sim_manager.py index 1998192d..757c83c3 100644 --- a/embodichain/lab/sim/sim_manager.py +++ b/embodichain/lab/sim/sim_manager.py @@ -307,7 +307,7 @@ def __init__( if sim_config.headless is False: self._window = self._world.get_windows() - # self._register_default_window_control() + self._register_default_window_control() @classmethod def get_instance(cls, instance_id: int = 0) -> SimulationManager: @@ -550,12 +550,12 @@ def open_window(self) -> None: self._window = self._world.get_windows() # TODO: will open these features after fix the related blocking issues. - # self._register_default_window_control() - # if ( - # self._window_record_hotkey_cfg is not None - # and self._window_record_input_control is None - # ): - # self.enable_window_record_hotkey(**self._window_record_hotkey_cfg) + self._register_default_window_control() + if ( + self._window_record_hotkey_cfg is not None + and self._window_record_input_control is None + ): + self.enable_window_record_hotkey(**self._window_record_hotkey_cfg) self.is_window_opened = True def close_window(self) -> None: diff --git a/examples/sim/solvers/neural_ik_solver.py b/examples/sim/solvers/neural_ik_solver.py index 31d0b03f..39c59dab 100644 --- a/examples/sim/solvers/neural_ik_solver.py +++ b/examples/sim/solvers/neural_ik_solver.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ---------------------------------------------------------------------------- +import argparse import math import os import time @@ -28,23 +29,79 @@ from embodichain.lab.sim import SimulationManager, SimulationManagerCfg +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="NeuralIKSolver example") + parser.add_argument( + "--device", + type=str, + default="cpu", + choices=["cpu", "cuda"], + help="Compute device for tensors and the neural IK solver (default: cpu).", + ) + parser.add_argument( + "--num-envs", + type=int, + default=1, + help="Number of parallel environments to simulate. IK is solved for all " + "environments simultaneously at each step (default: 1).", + ) + return parser.parse_args() + + +def _resolve_device(device: str) -> str: + if device == "cuda" and not torch.cuda.is_available(): + raise RuntimeError( + "CUDA was requested but is not available. Use --device cpu or install " + "a CUDA-enabled PyTorch build." + ) + return device + + +def _squeeze_ik_qpos(ik_qpos: torch.Tensor) -> torch.Tensor: + """Normalize IK output to (num_envs, dof).""" + if ik_qpos.dim() == 3: + return ik_qpos[:, 0, :] + return ik_qpos + + +def _pose_with_arena_offset( + pose: torch.Tensor | np.ndarray, arena_offset: torch.Tensor +) -> np.ndarray: + """Convert arena-local 4x4 pose to world frame by adding arena translation.""" + if isinstance(pose, torch.Tensor): + xpos = pose.detach().cpu().numpy() + else: + xpos = np.asarray(pose) + xpos = np.array(xpos, copy=True, dtype=np.float64) + offset = arena_offset.detach().cpu().numpy().reshape(3) + if xpos.ndim == 2: + xpos[:3, 3] += offset + elif xpos.ndim == 3: + xpos[:, :3, 3] += offset + return xpos + + def main(): + args = parse_args() np.set_printoptions(precision=5, suppress=True) torch.set_printoptions(precision=5, sci_mode=False) - # Set up simulation with specified device (CPU or CUDA) - sim_device = "cpu" - config = SimulationManagerCfg(headless=False, sim_device=sim_device) + sim_device = _resolve_device(args.device) + num_envs = args.num_envs + + config = SimulationManagerCfg( + headless=True, + sim_device=sim_device, + num_envs=num_envs, + arena_space=2.0, + ) sim = SimulationManager(config) - sim.set_manual_update(False) - # Load robot URDF file urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) checkpoint_path = download_neural_ik_checkpoint() - # TCP offset (rotation of -pi/4 around Z, translation along Z) c = math.cos(-math.pi / 4) s = math.sin(-math.pi / 4) tcp = [ @@ -85,97 +142,125 @@ def main(): } robot: Robot = sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) + + sim.open_window() + arm_name = "main_arm" + device = robot.device - # Set initial joint positions - qpos = torch.tensor( + seed_qpos = torch.tensor( [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], dtype=torch.float32, - device=sim_device, - ).unsqueeze(0) + device=device, + ) + qpos = seed_qpos.unsqueeze(0).expand(num_envs, -1).clone() robot.set_qpos(qpos=qpos, joint_ids=robot.get_joint_ids(arm_name)) time.sleep(3.0) - # Compute FK to get current EE pose (with TCP applied) fk_xpos = robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) - print(f"fk_xpos: {fk_xpos}") - start_pose = fk_xpos.clone()[0] - end_pose = fk_xpos.clone()[0] - end_pose[:3, 3] += torch.tensor([0.3, 0.4, -0.2], device=sim_device) + print(f"fk_xpos shape: {tuple(fk_xpos.shape)}") - num_steps = 50 + start_pose = fk_xpos.clone() + end_pose = fk_xpos.clone() - # Interpolate poses between start and end - interpolated_poses = [ - torch.lerp(start_pose, end_pose, t) for t in np.linspace(0, 1, num_steps) - ] + # Per-environment target offsets (cycle if num_envs exceeds preset count) + move_vecs = torch.tensor( + [ + [0.3, 0.4, -0.2], + [0.2, 0.0, 0.0], + [0.0, 0.2, 0.0], + [0.0, -0.2, -0.1], + [-0.2, 0.0, 0.0], + [0.0, -0.2, 0.0], + [0.0, 0.0, -0.15], + [-0.2, 0.2, 0.0], + [0.0, 0.2, -0.15], + ], + dtype=torch.float32, + device=device, + ) + for env_id in range(num_envs): + end_pose[env_id, :3, 3] += move_vecs[env_id % move_vecs.shape[0]] - ik_qpos = qpos + num_steps = 50 + interpolated_poses = torch.stack( + [ + torch.lerp(start_pose, end_pose, t) + for t in torch.linspace(0.0, 1.0, num_steps, device=device) + ], + dim=1, + ) - # Solve IK for the end pose and measure time - start_time = time.time() - res, ik_qpos = robot.compute_ik(pose=end_pose, joint_seed=ik_qpos, name=arm_name) - end_time = time.time() - print(f"End-pose IK: success={res}, time={end_time - start_time:.4f}s") + ik_qpos = qpos.clone() + ik_qpos_results: list[torch.Tensor] = [] + ik_success_flags: list[torch.Tensor] = [] - if ik_qpos.dim() == 3: - ik_xpos = robot.compute_fk(qpos=ik_qpos[0][0], name=arm_name, to_matrix=True) - else: - ik_xpos = robot.compute_fk(qpos=ik_qpos, name=arm_name, to_matrix=True) - - sim.draw_marker( - cfg=MarkerCfg( - name="fk_xpos", - marker_type="axis", - axis_xpos=np.array(end_pose.tolist()), - axis_size=0.002, - axis_len=0.005, - ) + print( + f"\nRunning {num_steps} batch IK steps: num_envs={num_envs}, device='{sim_device}' ..." ) - - sim.draw_marker( - cfg=MarkerCfg( - name="ik_xpos", - marker_type="axis", - axis_xpos=np.array(ik_xpos.tolist()), - axis_size=0.002, - axis_len=0.005, + ik_compute_begin = time.time() + for step in range(num_steps): + poses = interpolated_poses[:, step, :, :] + res, ik_qpos_new = robot.compute_ik( + pose=poses, joint_seed=ik_qpos, name=arm_name ) + ik_qpos = _squeeze_ik_qpos(ik_qpos_new) + ik_qpos_results.append(ik_qpos.clone()) + ik_success_flags.append(res) + ik_compute_end = time.time() + print( + f"IK compute time for {num_steps} steps and {num_envs} envs: " + f"{ik_compute_end - ik_compute_begin:.4f}s" ) - for i, pose in enumerate(interpolated_poses): - print(f"Step {i}: Moving to pose:\n{pose}") - start_time = time.time() - res, ik_qpos = robot.compute_ik(pose=pose, joint_seed=ik_qpos, name=arm_name) - end_time = time.time() - compute_time = end_time - start_time - print(f"Step {i}: IK computation time: {compute_time:.6f} seconds") - - print(f"IK result: {res}, ik_qpos: {ik_qpos}") - if not res: - print(f"Step {i}: IK failed for pose:\n{pose}") - continue - - # Set robot joint positions - if ik_qpos.dim() == 3: - robot.set_qpos(qpos=ik_qpos[0][0], joint_ids=robot.get_joint_ids(arm_name)) - else: - robot.set_qpos(qpos=ik_qpos, joint_ids=robot.get_joint_ids(arm_name)) - - # Visualize current pose - ik_xpos = robot.compute_fk(qpos=ik_qpos, name=arm_name, to_matrix=True) + # Draw target and achieved EE axes for each environment (final step) + final_step = num_steps - 1 + final_ik_qpos = ik_qpos_results[final_step] + final_res = ik_success_flags[final_step] + ik_xpos_all = robot.compute_fk(qpos=final_ik_qpos, name=arm_name, to_matrix=True) + arena_offsets = sim.arena_offsets + for env_id in range(num_envs): + target_axis = _pose_with_arena_offset(end_pose[env_id], arena_offsets[env_id]) sim.draw_marker( cfg=MarkerCfg( - name=f"ik_xpos_step_{i}", + name=f"fk_target_env{env_id}", marker_type="axis", - axis_xpos=np.array(ik_xpos.tolist()), + axis_xpos=target_axis, axis_size=0.002, axis_len=0.005, + arena_index=-1, ) ) - time.sleep(0.02) + if final_res[env_id]: + ik_axis = _pose_with_arena_offset( + ik_xpos_all[env_id], arena_offsets[env_id] + ) + sim.draw_marker( + cfg=MarkerCfg( + name=f"ik_result_env{env_id}", + marker_type="axis", + axis_xpos=ik_axis, + axis_size=0.002, + axis_len=0.005, + arena_index=-1, + ) + ) + + # Animate: batch-apply IK qpos for successful envs, then step simulation + joint_ids = robot.get_joint_ids(arm_name) + for step in range(num_steps): + ik_qpos_step = ik_qpos_results[step] + res = ik_success_flags[step] + if res.any(): + success_ids = res.nonzero(as_tuple=True)[0] + robot.set_qpos( + qpos=ik_qpos_step[success_ids], + joint_ids=joint_ids, + env_ids=success_ids, + ) + sim.update(step=5) embed(header="NeuralIKSolver example. Press Ctrl+D to exit.") From 15dab08f3fa8877718befae4c8cc16e891c5d2d6 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Tue, 2 Jun 2026 17:00:58 +0800 Subject: [PATCH 11/16] add docs for neural ik solver --- docs/source/overview/sim/solvers/index.rst | 5 +- .../overview/sim/solvers/neural_ik_solver.md | 71 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 docs/source/overview/sim/solvers/neural_ik_solver.md diff --git a/docs/source/overview/sim/solvers/index.rst b/docs/source/overview/sim/solvers/index.rst index 8ffa5570..30ad1043 100644 --- a/docs/source/overview/sim/solvers/index.rst +++ b/docs/source/overview/sim/solvers/index.rst @@ -80,7 +80,9 @@ Choosing a solver - Use analytic solvers (OPW for 6-DOF arms or SRS for 7-DOF arms) when available for speed and determinism. - Use numerical solvers (PyTorch/optimization, Differential) when you need - flexibility.. + flexibility. +- Use the neural IK solver (experimental) when you have a trained checkpoint and need + fast batch inference on a supported robot. See also -------- @@ -94,3 +96,4 @@ See also pinocchio_solver.md opw_solver.md srs_solver.md + neural_ik_solver.md diff --git a/docs/source/overview/sim/solvers/neural_ik_solver.md b/docs/source/overview/sim/solvers/neural_ik_solver.md new file mode 100644 index 00000000..f2690d01 --- /dev/null +++ b/docs/source/overview/sim/solvers/neural_ik_solver.md @@ -0,0 +1,71 @@ +# NeuralIKSolver + +````{admonition} Experimental +:class: warning + +`NeuralIKSolver` is an **experimental** feature. The API, checkpoint format, +and default parameters may change without a deprecation cycle. It is currently +only validated on the **Franka Panda** robot. +```` + +`NeuralIKSolver` is a learning-based inverse kinematics (IK) solver that uses a +trained neural network policy to iteratively solve IK queries. It requires a +pre-trained checkpoint and supports batch processing. + +## Key Features + +* Iterative neural policy inference for IK solving +* Batch processing for multiple target poses simultaneously +* Multi-seed sampling: generate several random initial guesses and return the best solution +* Joint limit enforcement at every iteration +* PyTorch-based — supports both CPU and CUDA devices + +## Configuration + +The solver is configured using the `NeuralIKSolverCfg` class. Pre-trained +checkpoints are hosted on HuggingFace and can be downloaded with +`download_neural_ik_checkpoint()` (requires `HF_TOKEN` environment variable). + +```python +from embodichain.data.assets.solver_assets import download_neural_ik_checkpoint +from embodichain.lab.sim.solvers.neural_ik_solver import NeuralIKSolverCfg + +checkpoint_path = download_neural_ik_checkpoint() + +cfg = NeuralIKSolverCfg( + checkpoint_path=checkpoint_path, + num_arm_joints=7, + max_steps=30, + action_scale=0.2, + hidden_dims=[256, 256], + pos_eps=0.01, + rot_eps=0.1, + num_samples=1, +) +``` + +## Main Methods + +* `get_ik(self, target_xpos, qpos_seed=None, num_samples=None, **kwargs)` + Solve IK for the given target end-effector pose(s). + + **Parameters:** + + `target_xpos` (`torch.Tensor`): Target pose(s) as 4x4 matrix, shape `(4, 4)` or `(B, 4, 4)`. + + `qpos_seed` (`torch.Tensor`, optional): Initial joint positions, shape `(dof,)` or `(B, dof)`. + + `num_samples` (`int`, optional): Override `cfg.num_samples` for this call. + + `return_all_solutions` (`bool`): If `True`, return all sampled solutions with shape `(B, num_samples, dof)`. + + **Returns:** + + `Tuple[torch.Tensor, torch.Tensor]`: + - Success flags, shape `(B,)`. + - Joint positions, shape `(B, 1, dof)` or `(B, num_samples, dof)`. + + **Example:** + +```python + import torch + success, ik_qpos = solver.get_ik(target_xpos=target_pose, qpos_seed=qpos_seed) + print("Success:", success) + print("IK solution:", ik_qpos) +``` + From 2a386fda1e12a7a025ac8b220d3ab166f2937d0c Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Tue, 2 Jun 2026 20:30:00 +0800 Subject: [PATCH 12/16] use fake policy for test --- tests/sim/solvers/test_neural_ik_solver.py | 177 ++++++++------------- 1 file changed, 63 insertions(+), 114 deletions(-) diff --git a/tests/sim/solvers/test_neural_ik_solver.py b/tests/sim/solvers/test_neural_ik_solver.py index 7d814e21..67c8b37d 100644 --- a/tests/sim/solvers/test_neural_ik_solver.py +++ b/tests/sim/solvers/test_neural_ik_solver.py @@ -23,15 +23,12 @@ import torch from embodichain.data import get_data_path -from embodichain.data.assets.solver_assets import download_neural_ik_checkpoint from embodichain.lab.sim import SimulationManager, SimulationManagerCfg from embodichain.lab.sim.cfg import RobotCfg from embodichain.lab.sim.objects import Robot +from embodichain.lab.sim.solvers.neural_ik_solver import _build_mlp from embodichain.utils.utility import reset_all_seeds -IK_POS_ATOL = 0.05 -IK_ROT_ATOL = 0.55 - _c = math.cos(-math.pi / 4) _s = math.sin(-math.pi / 4) TCP = [ @@ -41,52 +38,37 @@ [0.0, 0.0, 0.0, 1.0], ] +NUM_ARM_JOINTS = 7 +OBS_DIM = 2 * NUM_ARM_JOINTS + 14 # 28 +HIDDEN_DIMS = [256, 256] -def grid_sample_qpos_from_limits( - qpos_limits: torch.Tensor, - steps_per_joint: int = 4, - device=None, - max_samples: int = 4096, -) -> torch.Tensor: - if device is None: - device = qpos_limits.device - - limits = qpos_limits.squeeze(0) if qpos_limits.dim() == 3 else qpos_limits - lows = limits[:, 0].to(device) + 1e-2 - highs = limits[:, 1].to(device) - 1e-2 - - # create per-joint linspaces - grids = [ - torch.linspace(l.item(), h.item(), steps_per_joint, device=device) - for l, h in zip(lows, highs) - ] - - # meshgrid and stack - mesh = torch.meshgrid(*grids, indexing="ij") - stacked = torch.stack([m.reshape(-1) for m in mesh], dim=1) - if stacked.shape[0] > max_samples: - return stacked[:max_samples] - return stacked +def _create_fake_checkpoint(tmp_path) -> str: + """Create a minimal fake checkpoint for testing the solver interface.""" + mlp = _build_mlp(OBS_DIM, HIDDEN_DIMS, NUM_ARM_JOINTS) + ckpt = { + "agent": {f"actor_mean.{k}": v for k, v in mlp.state_dict().items()}, + "obs_normalizer": { + "mean": torch.zeros(OBS_DIM), + "var": torch.ones(OBS_DIM), + }, + } + ckpt_path = str(tmp_path / "fake_neural_ik.pt") + torch.save(ckpt, ckpt_path) + return ckpt_path -# Skip tests if huggingface_hub cannot reach the checkpoint repo. -pytestmark = pytest.mark.skipif( - os.environ.get("NEURAL_IK_OFFLINE") is not None, - reason="NEURAL_IK_OFFLINE is set, skipping NeuralIK tests", -) +class TestNeuralIKSolver: + sim: SimulationManager | None = None + robot: Robot | None = None - -class BaseSolverTest: - sim = None - - def setup_simulation(self, solver_type: str): + def _setup(self, tmp_path): + checkpoint_path = _create_fake_checkpoint(tmp_path) config = SimulationManagerCfg(headless=True, sim_device="cpu") self.sim = SimulationManager(config) urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") assert os.path.isfile(urdf) - checkpoint_path = download_neural_ik_checkpoint() cfg_dict = { "fpath": urdf, @@ -103,15 +85,15 @@ def setup_simulation(self, solver_type: str): }, "solver_cfg": { "main_arm": { - "class_type": solver_type, + "class_type": "NeuralIKSolver", "end_link_name": "ee_link", "root_link_name": "base_link", "tcp": TCP, "checkpoint_path": checkpoint_path, - "num_arm_joints": 7, + "num_arm_joints": NUM_ARM_JOINTS, "max_steps": 30, "action_scale": 0.2, - "hidden_dims": [256, 256], + "hidden_dims": HIDDEN_DIMS, "pos_eps": 0.1, "rot_eps": 0.5, }, @@ -121,55 +103,45 @@ def setup_simulation(self, solver_type: str): self.robot: Robot = self.sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) self.sim.update(step=100) - def test_ik(self): - reset_all_seeds(0) - arm_name = "main_arm" - qpos_limit = torch.tensor( - [ - [0.2, 0.8], - [0.2, 0.8], - [0.2, 0.8], - [0.2, 0.8], - [0.2, 0.8], - [0.2, 0.8], - [0.2, 0.8], - ] - ) - sample_qpos = grid_sample_qpos_from_limits( - qpos_limit, steps_per_joint=3, device=self.robot.device, max_samples=200 - ) - sample_qpos = sample_qpos[None, :, :] + def teardown_method(self): + if self.sim is not None: + self.sim.destroy() - fk_xpos = self.robot.compute_batch_fk( - qpos=sample_qpos, name=arm_name, to_matrix=True - ) - fk_xpos_xyzquat = self.robot.compute_batch_fk( - qpos=sample_qpos, name=arm_name, to_matrix=False - ) + def _make_solver_input(self): + """Create a standard qpos and its FK target for solver tests.""" + arm_name = "main_arm" + qpos = torch.tensor( + [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], + dtype=torch.float32, + device=self.robot.device, + ).unsqueeze(0) + target_xpos = self.robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) + solver = self.robot.get_solver(arm_name) + return solver, qpos, target_xpos - res, ik_qpos = self.robot.compute_batch_ik( - pose=fk_xpos, joint_seed=sample_qpos, name=arm_name - ) + def test_ik_interface(self, tmp_path): + """Verify compute_ik returns correct shapes and types.""" + reset_all_seeds(0) + self._setup(tmp_path) + arm_name = "main_arm" - res, ik_qpos_xyzquat = self.robot.compute_batch_ik( - pose=fk_xpos_xyzquat, joint_seed=sample_qpos, name=arm_name - ) + qpos = torch.tensor( + [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], + dtype=torch.float32, + device=self.robot.device, + ).unsqueeze(0) + target_xpos = self.robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) - ik_xpos = self.robot.compute_batch_fk( - qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True + res, ik_qpos = self.robot.compute_ik( + pose=target_xpos, joint_seed=qpos, name=arm_name ) - pos_err = (fk_xpos[:, :, :3, 3] - ik_xpos[:, :, :3, 3]).norm(dim=-1) - assert torch.all( - pos_err < IK_POS_ATOL - ), f"Position error too large: max {pos_err.max().item():.4f} m > {IK_POS_ATOL} m" - - rot_err = (fk_xpos[:, :, :3, :3] - ik_xpos[:, :, :3, :3]).norm(dim=(-2, -1)) - assert torch.all( - rot_err < IK_ROT_ATOL - ), f"Rotation error too large: max {rot_err.max().item():.4f} > {IK_ROT_ATOL}" + assert res.shape == (1,) + assert res.dtype == torch.bool + dof = qpos.shape[-1] + assert ik_qpos.shape[-1] == dof - # test for failed xpos + # test for unreachable pose invalid_pose = torch.tensor( [ [ @@ -183,36 +155,14 @@ def test_ik(self): device=self.robot.device, ) res, ik_qpos = self.robot.compute_ik( - pose=invalid_pose, joint_seed=ik_qpos[:, 0, :], name=arm_name + pose=invalid_pose, joint_seed=qpos, name=arm_name ) - dof = ik_qpos.shape[-1] - assert res[0].item() == False - assert ik_qpos.shape == (1, dof) - - def teardown_method(self): - """Clean up resources after each test method.""" - self.sim.destroy() - - -class TestNeuralIKSolver(BaseSolverTest): - def setup_method(self): - self.setup_simulation(solver_type="NeuralIKSolver") - - def _make_solver_input(self): - """Create a standard qpos and its FK target for solver tests.""" - arm_name = "main_arm" - qpos = torch.tensor( - [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], - dtype=torch.float32, - device=self.robot.device, - ).unsqueeze(0) - target_xpos = self.robot.compute_fk(qpos=qpos, name=arm_name, to_matrix=True) - solver = self.robot.get_solver(arm_name) - return solver, qpos, target_xpos + assert res[0].item() is False - def test_multi_sample_shape(self): + def test_multi_sample_shape(self, tmp_path): """Verify output shape when using multiple samples.""" reset_all_seeds(0) + self._setup(tmp_path) solver, qpos, target_xpos = self._make_solver_input() success, ik_qpos = solver.get_ik( @@ -225,9 +175,10 @@ def test_multi_sample_shape(self): assert success.shape == (1,) assert ik_qpos.shape == (1, 1, dof) - def test_multi_sample_return_all(self): + def test_multi_sample_return_all(self, tmp_path): """Verify return_all_solutions returns all sampled solutions.""" reset_all_seeds(0) + self._setup(tmp_path) solver, qpos, target_xpos = self._make_solver_input() num_samples = 5 @@ -245,5 +196,3 @@ def test_multi_sample_return_all(self): if __name__ == "__main__": np.set_printoptions(precision=5, suppress=True) - test_solver = TestNeuralIKSolver() - test_solver.setup_method() From 18273018fadd39aa54f9340b413261ef05474913 Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Tue, 2 Jun 2026 23:42:28 +0800 Subject: [PATCH 13/16] wip --- embodichain/lab/sim/solvers/srs_solver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embodichain/lab/sim/solvers/srs_solver.py b/embodichain/lab/sim/solvers/srs_solver.py index d68f470b..82dd0071 100644 --- a/embodichain/lab/sim/solvers/srs_solver.py +++ b/embodichain/lab/sim/solvers/srs_solver.py @@ -1175,7 +1175,7 @@ def __init__(self, cfg: SRSSolverCfg, num_envs: int, device: str, **kwargs): # Compute root base transform fk_dict = self.pk_serial_chain.forward_kinematics( - th=np.zeros(7), end_only=False + th=np.zeros(7, dtype=torch.float32, device=self.device), end_only=False ) root_tf = fk_dict[list(fk_dict.keys())[0]] self.root_base_xpos = root_tf.get_matrix().cpu().numpy() From b7d922ac8fc858fb5134b8d8b6fd2a2668c5fb0f Mon Sep 17 00:00:00 2001 From: yangchen73 <122090643@link.cuhk.edu.cn> Date: Wed, 3 Jun 2026 12:39:52 +0800 Subject: [PATCH 14/16] wip --- embodichain/lab/sim/solvers/srs_solver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embodichain/lab/sim/solvers/srs_solver.py b/embodichain/lab/sim/solvers/srs_solver.py index 82dd0071..967e8ec7 100644 --- a/embodichain/lab/sim/solvers/srs_solver.py +++ b/embodichain/lab/sim/solvers/srs_solver.py @@ -1175,7 +1175,7 @@ def __init__(self, cfg: SRSSolverCfg, num_envs: int, device: str, **kwargs): # Compute root base transform fk_dict = self.pk_serial_chain.forward_kinematics( - th=np.zeros(7, dtype=torch.float32, device=self.device), end_only=False + th=torch.zeros(7, dtype=torch.float32, device=self.device), end_only=False ) root_tf = fk_dict[list(fk_dict.keys())[0]] self.root_base_xpos = root_tf.get_matrix().cpu().numpy() From 022047f95b829c9df1d8c4a3b16792995cc4c401 Mon Sep 17 00:00:00 2001 From: yuecideng Date: Wed, 3 Jun 2026 14:18:55 +0800 Subject: [PATCH 15/16] try fix ci issue --- embodichain/lab/sim/solvers/base_solver.py | 8 ++++++++ tests/sim/solvers/test_srs_solver.py | 1 + 2 files changed, 9 insertions(+) diff --git a/embodichain/lab/sim/solvers/base_solver.py b/embodichain/lab/sim/solvers/base_solver.py index ae04cb41..ed69d9c5 100644 --- a/embodichain/lab/sim/solvers/base_solver.py +++ b/embodichain/lab/sim/solvers/base_solver.py @@ -177,6 +177,14 @@ def __init__(self, cfg: SolverCfg = None, device: str = None, **kwargs): fullgraph=True, dynamic=True, ) + # Warm up on the solver device so Dynamo guards match CUDA/CPU at init + # instead of on the first get_fk call (avoids recompile_limit hits in CI). + if self.dof > 0: + with torch.no_grad(): + warmup_qpos = torch.zeros( + 1, self.dof, device=self.device, dtype=torch.float32 + ) + self.compiled_fk(warmup_qpos) self._init_qpos_limits() diff --git a/tests/sim/solvers/test_srs_solver.py b/tests/sim/solvers/test_srs_solver.py index cfd970e0..0cdad765 100644 --- a/tests/sim/solvers/test_srs_solver.py +++ b/tests/sim/solvers/test_srs_solver.py @@ -16,6 +16,7 @@ import os import torch +torch._dynamo.config.cache_size_limit = 128 # recompile_limit import pytest import numpy as np From 783881a22a0a3fd84f6f3f4a764d8acb17854f7a Mon Sep 17 00:00:00 2001 From: yuecideng Date: Wed, 3 Jun 2026 15:34:21 +0800 Subject: [PATCH 16/16] wip --- tests/sim/solvers/test_srs_solver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/sim/solvers/test_srs_solver.py b/tests/sim/solvers/test_srs_solver.py index 0cdad765..ada04e84 100644 --- a/tests/sim/solvers/test_srs_solver.py +++ b/tests/sim/solvers/test_srs_solver.py @@ -16,6 +16,7 @@ import os import torch + torch._dynamo.config.cache_size_limit = 128 # recompile_limit import pytest import numpy as np