From 83732afa5e39b9406b6176854c1e5694ef2db776 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Tue, 24 Feb 2026 19:24:32 -0800 Subject: [PATCH 1/6] Resolve annotations using namespaces from the full MRO chain --- dimos/core/blueprints.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dimos/core/blueprints.py b/dimos/core/blueprints.py index 605517e6c..17475e903 100644 --- a/dimos/core/blueprints.py +++ b/dimos/core/blueprints.py @@ -63,9 +63,16 @@ def create( streams: list[StreamRef] = [] module_refs: list[ModuleRef] = [] - # Use get_type_hints() to properly resolve string annotations. + # Resolve annotations using namespaces from the full MRO chain so that + # In/Out behind TYPE_CHECKING + `from __future__ import annotations` work. + # Iterate reversed MRO so the most specific class's namespace wins when + # parent modules shadow names (e.g. spec.perception.Image vs sensor_msgs.Image). + globalns: dict[str, Any] = {} + for c in reversed(module.__mro__): + if c.__module__ in sys.modules: + globalns.update(sys.modules[c.__module__].__dict__) try: - all_annotations = get_type_hints(module) + all_annotations = get_type_hints(module, globalns=globalns) except Exception: # Fallback to raw annotations if get_type_hints fails. all_annotations = {} From 2ff79b94a7dacccdb5a8d4a4f5d1a356409a41a3 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Tue, 24 Feb 2026 19:25:20 -0800 Subject: [PATCH 2/6] refactored manipulation module to make it generic --- dimos/manipulation/manipulation_module.py | 592 +++------------------- 1 file changed, 64 insertions(+), 528 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 310b77d76..93f25281b 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -14,28 +14,25 @@ """Manipulation Module - Motion planning with ControlCoordinator execution. -Interface layers: +Base module providing core manipulation infrastructure: - @rpc: Low-level building blocks (plan_to_pose, plan_to_joints, preview_path, execute) -- @skill (short-horizon): Single-step actions (move_to_pose, open_gripper, scan_objects, go_init) -- @skill (long-horizon): Multi-step composed behaviors (pick, place, place_back, pick_and_place) +- @skill (short-horizon): Single-step actions (move_to_pose, open_gripper, go_home, go_init) + +Subclass PickAndPlaceModule (pick_and_place_module.py) adds perception integration +(scan_objects, get_scene_info) and long-horizon skills (pick, place, pick_and_place). """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum -import math -from pathlib import Path import threading import time from typing import TYPE_CHECKING, Any, TypeAlias from dimos.agents.annotation import skill -from dimos.constants import DIMOS_PROJECT_ROOT from dimos.core import In, Module, rpc -from dimos.core.docker_runner import DockerModule as DockerRunner from dimos.core.module import ModuleConfig -from dimos.manipulation.grasping.graspgen_module import GraspGenModule from dimos.manipulation.planning import ( JointPath, JointTrajectoryGenerator, @@ -55,15 +52,10 @@ # These must be imported at runtime (not TYPE_CHECKING) for In/Out port creation from dimos.msgs.sensor_msgs import JointState from dimos.msgs.trajectory_msgs import JointTrajectory -from dimos.perception.detection.type.detection3d.object import Object as DetObject -from dimos.utils.data import get_data from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: from dimos.core.rpc_client import RPCClient - from dimos.msgs.geometry_msgs import PoseArray - from dimos.msgs.sensor_msgs import PointCloud2 - from dimos.perception.detection.type.detection3d.object import Object as DetObject logger = setup_logger() @@ -80,10 +72,6 @@ PlannedTrajectories: TypeAlias = dict[RobotName, JointTrajectory] """Maps robot_name -> planned trajectory""" -# The host-side path (graspgen_visualization_output_path) is volume-mounted here. -_GRASPGEN_VIZ_CONTAINER_DIR = "/output/graspgen" -_GRASPGEN_VIZ_CONTAINER_PATH = f"{_GRASPGEN_VIZ_CONTAINER_DIR}/visualization.json" - class ManipulationState(Enum): """State machine for manipulation module.""" @@ -105,25 +93,14 @@ class ManipulationModuleConfig(ModuleConfig): planner_name: str = "rrt_connect" # "rrt_connect" kinematics_name: str = "jacobian" # "jacobian" or "drake_optimization" - # GraspGen Docker settings (optional) - graspgen_docker_image: str = "dimos-graspgen:latest" - graspgen_gripper_type: str = "robotiq_2f_140" - graspgen_num_grasps: int = 400 - graspgen_topk_num_grasps: int = 100 - graspgen_grasp_threshold: float = -1.0 - graspgen_filter_collisions: bool = False - graspgen_save_visualization_data: bool = False - graspgen_visualization_output_path: Path = field( - default_factory=lambda: Path.home() / ".dimos" / "graspgen" / "visualization.json" - ) - class ManipulationModule(Module): - """Motion planning module with ControlCoordinator execution. + """Base motion planning module with ControlCoordinator execution. + + - @rpc: Low-level building blocks (plan, execute, gripper) + - @skill (short-horizon): Single-step actions (move_to_pose, open_gripper, go_home) - - @rpc: Low-level building blocks (plan, execute, obstacles) - - @skill (short-horizon): Single-step actions (move_to_pose, open_gripper, scan_objects) - - @skill (long-horizon): Multi-step behaviors (pick, place, pick_and_place) + Subclass PickAndPlaceModule adds perception integration and long-horizon skills. """ default_config = ManipulationModuleConfig @@ -134,9 +111,6 @@ class ManipulationModule(Module): # Input: Joint state from coordinator (for world sync) joint_state: In[JointState] - # Input: Objects from perception (for obstacle integration) - objects: In[list[DetObject]] - def __init__(self, *args: object, **kwargs: object) -> None: super().__init__(*args, **kwargs) @@ -160,19 +134,9 @@ def __init__(self, *args: object, **kwargs: object) -> None: # Coordinator integration (lazy initialized) self._coordinator_client: RPCClient | None = None - # GraspGen Docker runner (lazy initialized on first generate_grasps call) - self._graspgen: DockerRunner | None = None # Init joints: captured from first joint state received, used by go_init self._init_joints: JointState | None = None - # Last pick position: stored during pick so place_back() can return the object - self._last_pick_position: Vector3 | None = None - - # Snapshotted detections from the last scan_objects/refresh call. - # The live detection cache is volatile (labels change every frame), - # so pick/place use this stable snapshot instead. - self._detection_snapshot: list[DetObject] = [] - # TF publishing thread self._tf_stop_event = threading.Event() self._tf_thread: threading.Thread | None = None @@ -192,11 +156,6 @@ def start(self) -> None: self.joint_state.subscribe(self._on_joint_state) logger.info("Subscribed to joint_state port") - # Subscribe to objects port for perception obstacle integration - if self.objects is not None: - self.objects.observable().subscribe(self._on_objects) # type: ignore[no-untyped-call] - logger.info("Subscribed to objects port (async)") - logger.info("ManipulationModule started") def _initialize_planning(self) -> None: @@ -221,9 +180,6 @@ def _initialize_planning(self) -> None: for _, (robot_id, _, _) in self._robots.items(): self._world_monitor.start_state_monitor(robot_id) - # Start obstacle monitor for perception integration - self._world_monitor.start_obstacle_monitor() - if self.config.enable_viz: self._world_monitor.start_visualization_thread(rate_hz=10.0) if url := self._world_monitor.get_visualization_url(): @@ -294,14 +250,6 @@ def _on_joint_state(self, msg: JointState) -> None: logger.error(traceback.format_exc()) - def _on_objects(self, objects: list[DetObject]) -> None: - """Callback when objects received from perception (runs on RxPY thread pool).""" - try: - if self._world_monitor is not None: - self._world_monitor.on_objects(objects) - except Exception as e: - logger.error(f"Exception in _on_objects: {e}") - def _tf_publish_loop(self) -> None: """Publish TF transforms at 10Hz for EE and extra links.""" from dimos.msgs.geometry_msgs import Transform @@ -362,14 +310,18 @@ def cancel(self) -> bool: logger.info("Motion cancelled") return True - @rpc - def reset(self) -> bool: - """Reset to IDLE state (fails if EXECUTING).""" + @skill + def reset(self) -> str: + """Reset the robot module to IDLE state, clearing any fault. + + Use this after an error or fault to allow new commands. + Cannot reset while a motion is executing — cancel first. + """ if self._state == ManipulationState.EXECUTING: - return False + return "Error: Cannot reset while executing — cancel the motion first" self._state = ManipulationState.IDLE self._error_message = "" - return True + return "Reset to IDLE — ready for new commands" @rpc def get_current_joints(self, robot_name: RobotName | None = None) -> list[float] | None: @@ -779,70 +731,6 @@ def get_trajectory_status(self, robot_name: RobotName | None = None) -> dict[str except Exception: return None - def _get_graspgen(self) -> DockerRunner: - """Get or create GraspGen Docker module (lazy init, thread-safe).""" - # Fast path: already initialized (no lock needed for read) - if self._graspgen is not None: - return self._graspgen - - # Slow path: need to initialize (acquire lock to prevent race condition) - with self._lock: - # Double-check: another thread may have initialized while we waited for lock - if self._graspgen is not None: - return self._graspgen - - # Ensure GraspGen model checkpoints are pulled from LFS - get_data("models_graspgen") - - docker_file = ( - DIMOS_PROJECT_ROOT - / "dimos" - / "manipulation" - / "grasping" - / "docker_context" - / "Dockerfile" - ) - - # Auto-mount host directory for visualization output when enabled. - docker_volumes: list[tuple[str, str, str]] = [] - if self.config.graspgen_save_visualization_data: - host_dir = self.config.graspgen_visualization_output_path.parent - host_dir.mkdir(parents=True, exist_ok=True) - docker_volumes.append((str(host_dir), _GRASPGEN_VIZ_CONTAINER_DIR, "rw")) - - graspgen = DockerRunner( - GraspGenModule, # type: ignore[arg-type] - docker_file=docker_file, - docker_build_context=DIMOS_PROJECT_ROOT, - docker_image=self.config.graspgen_docker_image, - docker_env={"CI": "1"}, # skip interactive system config prompt in container - docker_volumes=docker_volumes, - gripper_type=self.config.graspgen_gripper_type, - num_grasps=self.config.graspgen_num_grasps, - topk_num_grasps=self.config.graspgen_topk_num_grasps, - grasp_threshold=self.config.graspgen_grasp_threshold, - filter_collisions=self.config.graspgen_filter_collisions, - save_visualization_data=self.config.graspgen_save_visualization_data, - visualization_output_path=_GRASPGEN_VIZ_CONTAINER_PATH, - ) - graspgen.start() - self._graspgen = graspgen # cache only after successful start - return self._graspgen - - @rpc - def generate_grasps( - self, - pointcloud: PointCloud2, - scene_pointcloud: PointCloud2 | None = None, - ) -> PoseArray | None: - """Generate grasp poses for the given point cloud via GraspGen Docker module.""" - try: - graspgen = self._get_graspgen() - return graspgen.generate_grasps(pointcloud, scene_pointcloud) # type: ignore[no-any-return] - except Exception as e: - logger.error(f"Grasp generation failed: {e}") - return None - @property def world_monitor(self) -> WorldMonitor | None: """Access the world monitor for advanced obstacle/world operations.""" @@ -897,52 +785,6 @@ def remove_obstacle(self, obstacle_id: str) -> bool: return False return self._world_monitor.remove_obstacle(obstacle_id) - # ========================================================================= - # Perception RPC Methods - # ========================================================================= - - @rpc - def refresh_obstacles(self, min_duration: float = 0.0) -> list[dict[str, Any]]: - """Refresh perception obstacles. Returns the list of obstacles added. - - Also snapshots the current detections so pick/place can use stable labels. - """ - if self._world_monitor is None: - return [] - result = self._world_monitor.refresh_obstacles(min_duration) - # Snapshot detections at refresh time — the live cache is volatile - self._detection_snapshot = self._world_monitor.get_cached_objects() - logger.info(f"Detection snapshot: {[d.name for d in self._detection_snapshot]}") - return result - - @rpc - def clear_perception_obstacles(self) -> int: - """Remove all perception obstacles. Returns count removed.""" - if self._world_monitor is None: - return 0 - return self._world_monitor.clear_perception_obstacles() - - @rpc - def get_perception_status(self) -> dict[str, int]: - """Get perception obstacle status (cached/added counts).""" - if self._world_monitor is None: - return {"cached": 0, "added": 0} - return self._world_monitor.get_perception_status() - - @rpc - def list_cached_detections(self) -> list[dict[str, Any]]: - """List cached detections from perception.""" - if self._world_monitor is None: - return [] - return self._world_monitor.list_cached_detections() - - @rpc - def list_added_obstacles(self) -> list[dict[str, Any]]: - """List perception obstacles currently in the planning world.""" - if self._world_monitor is None: - return [] - return self._world_monitor.list_added_obstacles() - # ========================================================================= # Gripper Methods # ========================================================================= @@ -1107,76 +949,41 @@ def _preview_execute_wait( return None - def _compute_pre_grasp_pose(self, grasp_pose: Pose, offset: float = 0.10) -> Pose: - """Compute a pre-grasp pose offset along the approach direction (local -Z). - - Args: - grasp_pose: The final grasp pose - offset: Distance to retract along the approach direction (meters) - - Returns: - Pre-grasp pose offset from the grasp pose - """ - from dimos.utils.transform_utils import offset_distance - - return offset_distance(grasp_pose, offset) - - def _find_object_in_detections( - self, object_name: str, object_id: str | None = None - ) -> DetObject | None: - """Find an object in the detection snapshot by name or ID. + # ========================================================================= + # Short-Horizon Skills — Single-step actions + # ========================================================================= - Uses the snapshot taken during the last scan_objects/refresh call, - not the volatile live cache (which changes labels every frame). + @skill + def get_robot_state(self, robot_name: str | None = None) -> str: + """Get current robot state: joint positions, end-effector pose, and gripper. Args: - object_name: Name/label to search for - object_id: Optional specific object ID - - Returns: - Matching DetObject, or None + robot_name: Robot to query (only needed for multi-arm setups). """ - if not self._detection_snapshot: - logger.warning("No detection snapshot — call scan_objects() first") - return None - - for det in self._detection_snapshot: - if object_id and det.object_id == object_id: - return det - if object_name.lower() in det.name.lower() or det.name.lower() in object_name.lower(): - return det - - available = [det.name for det in self._detection_snapshot] - logger.warning(f"Object '{object_name}' not found in snapshot. Available: {available}") - return None - - def _generate_grasps_for_pick( - self, object_name: str, object_id: str | None = None - ) -> list[Pose] | None: - """Generate grasp poses for an object. + lines: list[str] = [] - Computes a top-down approach grasp from the object's detected position. + joints = self.get_current_joints(robot_name) + if joints is not None: + lines.append(f"Joints: [{', '.join(f'{j:.3f}' for j in joints)}]") + else: + lines.append("Joints: unavailable (no state received)") - Args: - object_name: Name of the object - object_id: Optional object ID + ee_pose = self.get_ee_pose(robot_name) + if ee_pose is not None: + p = ee_pose.position + lines.append(f"EE pose: ({p.x:.4f}, {p.y:.4f}, {p.z:.4f})") + else: + lines.append("EE pose: unavailable") - Returns: - List of grasp poses (best first), or None if object not found - """ - det = self._find_object_in_detections(object_name, object_id) - if det is None: - logger.warning(f"Object '{object_name}' not found in detections") - return None + gripper_pos = self.get_gripper(robot_name) + if gripper_pos is not None: + lines.append(f"Gripper: {gripper_pos:.3f}m") + else: + lines.append("Gripper: not configured") - c = det.center - grasp_pose = Pose(Vector3(c.x, c.y, c.z), Quaternion.from_euler(Vector3(0.0, math.pi, 0.0))) - logger.info(f"Heuristic grasp for '{object_name}' at ({c.x:.3f}, {c.y:.3f}, {c.z:.3f})") - return [grasp_pose] + lines.append(f"State: {self.get_state()}") - # ========================================================================= - # Short-Horizon Skills — Single-step actions - # ========================================================================= + return "\n".join(lines) @skill def move_to_pose( @@ -1184,26 +991,38 @@ def move_to_pose( x: float, y: float, z: float, - roll: float = 0.0, - pitch: float = 0.0, - yaw: float = 0.0, + roll: float | None = None, + pitch: float | None = None, + yaw: float | None = None, robot_name: str | None = None, ) -> str: """Move the robot end-effector to a target pose. Plans a collision-free trajectory and executes it. + If roll/pitch/yaw are omitted, the current EE orientation is preserved. Args: x: Target X position in meters. y: Target Y position in meters. z: Target Z position in meters. - roll: Target roll in radians (default 0). - pitch: Target pitch in radians (default 0). - yaw: Target yaw in radians (default 0). + roll: Target roll in radians (omit to keep current orientation). + pitch: Target pitch in radians (omit to keep current orientation). + yaw: Target yaw in radians (omit to keep current orientation). robot_name: Robot to move (only needed for multi-arm setups). """ logger.info(f"Planning motion to ({x:.3f}, {y:.3f}, {z:.3f})...") - pose = Pose(Vector3(x, y, z), Quaternion.from_euler(Vector3(roll, pitch, yaw))) + + # If no orientation specified, preserve the current EE orientation + if roll is None and pitch is None and yaw is None: + current_pose = self.get_ee_pose(robot_name) + if current_pose is not None: + orientation = current_pose.orientation + else: + orientation = Quaternion(0, 0, 0, 1) # identity fallback + else: + orientation = Quaternion.from_euler(Vector3(roll or 0.0, pitch or 0.0, yaw or 0.0)) + + pose = Pose(Vector3(x, y, z), orientation) if not self.plan_to_pose(pose, robot_name): return f"Error: Planning failed — pose ({x:.3f}, {y:.3f}, {z:.3f}) may be unreachable or in collision" @@ -1249,95 +1068,6 @@ def move_to_joints( return "Reached target joint configuration" - @skill - def get_scene_info(self, robot_name: str | None = None) -> str: - """Get current robot state, detected objects, and scene information. - - Returns a summary of the robot's joint positions, end-effector pose, - gripper state, detected objects, and obstacle count. - - Args: - robot_name: Robot to query (only needed for multi-arm setups). - """ - lines: list[str] = [] - - # Robot state - joints = self.get_current_joints(robot_name) - if joints is not None: - lines.append(f"Joints: [{', '.join(f'{j:.3f}' for j in joints)}]") - else: - lines.append("Joints: unavailable (no state received)") - - ee_pose = self.get_ee_pose(robot_name) - if ee_pose is not None: - p = ee_pose.position - lines.append(f"EE pose: ({p.x:.4f}, {p.y:.4f}, {p.z:.4f})") - else: - lines.append("EE pose: unavailable") - - # Gripper - gripper_pos = self.get_gripper(robot_name) - if gripper_pos is not None: - lines.append(f"Gripper: {gripper_pos:.3f}m") - else: - lines.append("Gripper: not configured") - - # Perception - perception = self.get_perception_status() - lines.append( - f"Perception: {perception.get('cached', 0)} cached, {perception.get('added', 0)} obstacles added" - ) - - detections = self._detection_snapshot - if detections: - lines.append(f"Detected objects ({len(detections)}):") - for det in detections: - c = det.center - lines.append(f" - {det.name}: ({c.x:.3f}, {c.y:.3f}, {c.z:.3f})") - else: - lines.append("Detected objects: none") - - # Visualization - url = self.get_visualization_url() - if url: - lines.append(f"Visualization: {url}") - - # State - lines.append(f"State: {self.get_state()}") - - return "\n".join(lines) - - @skill - def scan_objects(self, min_duration: float = 1.0, robot_name: str | None = None) -> str: - """Scan the scene and list detected objects with their 3D positions. - - Refreshes perception obstacles from the latest sensor data and returns - a formatted list of all detected objects. - - Args: - min_duration: Minimum time in seconds to wait for stable detections. - robot_name: Robot context (only needed for multi-arm setups). - """ - obstacles = self.refresh_obstacles(min_duration) - - detections = self._detection_snapshot - if not detections: - return "No objects detected in scene" - - lines = [f"Detected {len(detections)} object(s):"] - for det in detections: - c = det.center - lines.append(f" - {det.name}: ({c.x:.3f}, {c.y:.3f}, {c.z:.3f})") - - if obstacles: - lines.append(f"\n{len(obstacles)} obstacle(s) added to planning world") - - return "\n".join(lines) - - # ========================================================================= - # Long-Horizon Skills — Multi-step composed behaviors - # ========================================================================= - @skill def go_home(self, robot_name: str | None = None) -> str: """Move the robot to its home/observe joint configuration. @@ -1395,194 +1125,6 @@ def go_init(self, robot_name: str | None = None) -> str: return "Reached init position" - @skill - def pick( - self, - object_name: str, - object_id: str | None = None, - robot_name: str | None = None, - ) -> str: - """Pick up an object by name using grasp planning and motion execution. - - Generates grasp poses, plans collision-free approach/grasp/retract motions, - and executes them. - - Args: - object_name: Name of the object to pick (e.g. "cup", "bottle", "can"). - object_id: Optional unique object ID from perception for precise identification. - robot_name: Robot to use (only needed for multi-arm setups). - """ - robot = self._get_robot(robot_name) - if robot is None: - return "Error: Robot not found" - rname, _, config, _ = robot - pre_grasp_offset = config.pre_grasp_offset - - # 1. Generate grasps (uses already-cached detections — call scan_objects first) - logger.info(f"Generating grasp poses for '{object_name}'...") - grasp_poses = self._generate_grasps_for_pick(object_name, object_id) - if not grasp_poses: - return f"Error: No grasp poses found for '{object_name}'. Object may not be detected." - - # 2. Try each grasp candidate - max_attempts = min(len(grasp_poses), 5) - for i, grasp_pose in enumerate(grasp_poses[:max_attempts]): - pre_grasp_pose = self._compute_pre_grasp_pose(grasp_pose, pre_grasp_offset) - - logger.info(f"Planning approach to pre-grasp (attempt {i + 1}/{max_attempts})...") - if not self.plan_to_pose(pre_grasp_pose, rname): - logger.info(f"Grasp candidate {i + 1} approach planning failed, trying next") - continue # Try next candidate - - # Open gripper before approach - logger.info("Opening gripper...") - self._set_gripper_position(0.85, rname) - time.sleep(0.5) - - # 3. Preview + execute approach - err = self._preview_execute_wait(rname) - if err: - return err - - # 4. Move to grasp pose - logger.info("Moving to grasp position...") - if not self.plan_to_pose(grasp_pose, rname): - return "Error: Grasp pose planning failed" - err = self._preview_execute_wait(rname) - if err: - return err - - # 5. Close gripper - logger.info("Closing gripper...") - self._set_gripper_position(0.0, rname) - time.sleep(1.5) # Wait for gripper to close - - # 6. Retract to pre-grasp - logger.info("Retracting with object...") - if not self.plan_to_pose(pre_grasp_pose, rname): - return "Error: Retract planning failed" - err = self._preview_execute_wait(rname) - if err: - return err - - # Store pick position so place_back() can return the object - self._last_pick_position = grasp_pose.position - - return f"Pick complete — grasped '{object_name}' successfully" - - return f"Error: All {max_attempts} grasp attempts failed for '{object_name}'" - - @skill - def place( - self, - x: float, - y: float, - z: float, - robot_name: str | None = None, - ) -> str: - """Place a held object at the specified position. - - Plans and executes an approach, lowers to the target, releases the gripper, - and retracts. - - Args: - x: Target X position in meters. - y: Target Y position in meters. - z: Target Z position in meters. - robot_name: Robot to use (only needed for multi-arm setups). - """ - robot = self._get_robot(robot_name) - if robot is None: - return "Error: Robot not found" - rname, _, config, _ = robot - pre_place_offset = config.pre_grasp_offset - - # Compute place pose (top-down approach) - place_pose = Pose(Vector3(x, y, z), Quaternion.from_euler(Vector3(0.0, math.pi, 0.0))) - pre_place_pose = self._compute_pre_grasp_pose(place_pose, pre_place_offset) - - # 1. Move to pre-place - logger.info(f"Planning approach to place position ({x:.3f}, {y:.3f}, {z:.3f})...") - if not self.plan_to_pose(pre_place_pose, rname): - return "Error: Pre-place approach planning failed" - - err = self._preview_execute_wait(rname) - if err: - return err - - # 2. Lower to place position - logger.info("Lowering to place position...") - if not self.plan_to_pose(place_pose, rname): - return "Error: Place pose planning failed" - err = self._preview_execute_wait(rname) - if err: - return err - - # 3. Release - logger.info("Releasing object...") - self._set_gripper_position(0.85, rname) - time.sleep(1.0) - - # 4. Retract - logger.info("Retracting...") - if not self.plan_to_pose(pre_place_pose, rname): - return "Error: Retract planning failed" - err = self._preview_execute_wait(rname) - if err: - return err - - return f"Place complete — object released at ({x:.3f}, {y:.3f}, {z:.3f})" - - @skill - def place_back(self, robot_name: str | None = None) -> str: - """Place the held object back at its original pick position. - - Uses the position stored from the last successful pick operation. - - Args: - robot_name: Robot to use (only needed for multi-arm setups). - """ - if self._last_pick_position is None: - return "Error: No previous pick position stored — run pick() first" - - p = self._last_pick_position - logger.info(f"Placing back at original position ({p.x:.3f}, {p.y:.3f}, {p.z:.3f})...") - return self.place(p.x, p.y, p.z, robot_name) - - @skill - def pick_and_place( - self, - object_name: str, - place_x: float, - place_y: float, - place_z: float, - object_id: str | None = None, - robot_name: str | None = None, - ) -> str: - """Pick up an object and place it at a target location. - - Combines the pick and place skills into a single end-to-end operation. - - Args: - object_name: Name of the object to pick (e.g. "cup", "bottle"). - place_x: Target X position to place the object (meters). - place_y: Target Y position to place the object (meters). - place_z: Target Z position to place the object (meters). - object_id: Optional unique object ID from perception. - robot_name: Robot to use (only needed for multi-arm setups). - """ - logger.info( - f"Starting pick and place: pick '{object_name}' → place at ({place_x:.3f}, {place_y:.3f}, {place_z:.3f})" - ) - - # Pick phase - result = self.pick(object_name, object_id, robot_name) - if result.startswith("Error:"): - return result - - # Place phase - return self.place(place_x, place_y, place_z, robot_name) - # ========================================================================= # Lifecycle # ========================================================================= @@ -1592,12 +1134,6 @@ def stop(self) -> None: """Stop the manipulation module.""" logger.info("Stopping ManipulationModule") - # Stop GraspGen Docker container (thread-safe access to shared state) - with self._lock: - if self._graspgen is not None: - self._graspgen.stop() - self._graspgen = None - # Stop TF thread if self._tf_thread is not None: self._tf_stop_event.set() From 035709d513060ef54583da38e84eb0ab3af2d248 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Tue, 24 Feb 2026 19:31:32 -0800 Subject: [PATCH 3/6] pick and place module added --- dimos/manipulation/__init__.py | 8 + dimos/manipulation/pick_and_place_module.py | 625 ++++++++++++++++++++ 2 files changed, 633 insertions(+) create mode 100644 dimos/manipulation/pick_and_place_module.py diff --git a/dimos/manipulation/__init__.py b/dimos/manipulation/__init__.py index 3ed186309..d2a511d14 100644 --- a/dimos/manipulation/__init__.py +++ b/dimos/manipulation/__init__.py @@ -20,10 +20,18 @@ ManipulationState, manipulation_module, ) +from dimos.manipulation.pick_and_place_module import ( + PickAndPlaceModule, + PickAndPlaceModuleConfig, + pick_and_place_module, +) __all__ = [ "ManipulationModule", "ManipulationModuleConfig", "ManipulationState", + "PickAndPlaceModule", + "PickAndPlaceModuleConfig", "manipulation_module", + "pick_and_place_module", ] diff --git a/dimos/manipulation/pick_and_place_module.py b/dimos/manipulation/pick_and_place_module.py new file mode 100644 index 000000000..7032b1722 --- /dev/null +++ b/dimos/manipulation/pick_and_place_module.py @@ -0,0 +1,625 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pick-and-place manipulation module. + +Extends ManipulationModule with perception integration and long-horizon skills: +- Perception: objects port, obstacle monitor, scan_objects, get_scene_info +- @rpc: generate_grasps (GraspGen Docker), refresh_obstacles, perception status +- @skill: pick, place, place_back, pick_and_place, scan_objects, get_scene_info +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import math +from pathlib import Path +import time +from typing import TYPE_CHECKING, Any + +from dimos.agents.annotation import skill +from dimos.constants import DIMOS_PROJECT_ROOT +from dimos.core import In, rpc +from dimos.core.docker_runner import DockerModule as DockerRunner +from dimos.manipulation.grasping.graspgen_module import GraspGenModule +from dimos.manipulation.manipulation_module import ( + ManipulationModule, + ManipulationModuleConfig, +) +from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 +from dimos.perception.detection.type.detection3d.object import ( + Object as DetObject, # noqa: TC001 +) +from dimos.utils.data import get_data +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.msgs.geometry_msgs import PoseArray + from dimos.msgs.sensor_msgs import PointCloud2 + +logger = setup_logger() + +# The host-side path (graspgen_visualization_output_path) is volume-mounted here. +_GRASPGEN_VIZ_CONTAINER_DIR = "/output/graspgen" +_GRASPGEN_VIZ_CONTAINER_PATH = f"{_GRASPGEN_VIZ_CONTAINER_DIR}/visualization.json" + + +@dataclass +class PickAndPlaceModuleConfig(ManipulationModuleConfig): + """Configuration for PickAndPlaceModule (adds GraspGen settings).""" + + # GraspGen Docker settings + graspgen_docker_image: str = "dimos-graspgen:latest" + graspgen_gripper_type: str = "robotiq_2f_140" + graspgen_num_grasps: int = 400 + graspgen_topk_num_grasps: int = 100 + graspgen_grasp_threshold: float = -1.0 + graspgen_filter_collisions: bool = False + graspgen_save_visualization_data: bool = False + graspgen_visualization_output_path: Path = field( + default_factory=lambda: Path.home() / ".dimos" / "graspgen" / "visualization.json" + ) + + +class PickAndPlaceModule(ManipulationModule): + """Manipulation module with perception integration and pick-and-place skills. + + Extends ManipulationModule with: + - Perception: objects port, obstacle monitor, scan_objects, get_scene_info + - @rpc: generate_grasps (GraspGen Docker), refresh_obstacles, perception status + - @skill: pick, place, place_back, pick_and_place, scan_objects, get_scene_info + """ + + default_config = PickAndPlaceModuleConfig + + # Type annotation for the config attribute (mypy uses this) + config: PickAndPlaceModuleConfig + + # Input: Objects from perception (for obstacle integration) + objects: In[list[DetObject]] + + def __init__(self, *args: object, **kwargs: object) -> None: + super().__init__(*args, **kwargs) + + # GraspGen Docker runner (lazy initialized on first generate_grasps call) + self._graspgen: DockerRunner | None = None + + # Last pick position: stored during pick so place_back() can return the object + self._last_pick_position: Vector3 | None = None + + # Snapshotted detections from the last scan_objects/refresh call. + # The live detection cache is volatile (labels change every frame), + # so pick/place use this stable snapshot instead. + self._detection_snapshot: list[DetObject] = [] + + # ========================================================================= + # Lifecycle (perception integration) + # ========================================================================= + + @rpc + def start(self) -> None: + """Start the pick-and-place module (adds perception subscriptions).""" + super().start() + + # Subscribe to objects port for perception obstacle integration + if self.objects is not None: + self.objects.observable().subscribe(self._on_objects) # type: ignore[no-untyped-call] + logger.info("Subscribed to objects port (async)") + + # Start obstacle monitor for perception integration + if self._world_monitor is not None: + self._world_monitor.start_obstacle_monitor() + + logger.info("PickAndPlaceModule started") + + def _on_objects(self, objects: list[DetObject]) -> None: + """Callback when objects received from perception (runs on RxPY thread pool).""" + try: + if self._world_monitor is not None: + self._world_monitor.on_objects(objects) + except Exception as e: + logger.error(f"Exception in _on_objects: {e}") + + # ========================================================================= + # Perception RPC Methods + # ========================================================================= + + @rpc + def refresh_obstacles(self, min_duration: float = 0.0) -> list[dict[str, Any]]: + """Refresh perception obstacles. Returns the list of obstacles added. + + Also snapshots the current detections so pick/place can use stable labels. + """ + if self._world_monitor is None: + return [] + result = self._world_monitor.refresh_obstacles(min_duration) + # Snapshot detections at refresh time — the live cache is volatile + self._detection_snapshot = self._world_monitor.get_cached_objects() + logger.info(f"Detection snapshot: {[d.name for d in self._detection_snapshot]}") + return result + + @skill + def clear_perception_obstacles(self) -> str: + """Clear all perception obstacles from the planning world. + + Use this when the planner reports COLLISION_AT_START — detected objects + may overlap the robot's current position and block planning. + """ + if self._world_monitor is None: + return "No world monitor available" + count = self._world_monitor.clear_perception_obstacles() + self._detection_snapshot = [] + return f"Cleared {count} perception obstacle(s) from planning world" + + @rpc + def get_perception_status(self) -> dict[str, int]: + """Get perception obstacle status (cached/added counts).""" + if self._world_monitor is None: + return {"cached": 0, "added": 0} + return self._world_monitor.get_perception_status() + + @rpc + def list_cached_detections(self) -> list[dict[str, Any]]: + """List cached detections from perception.""" + if self._world_monitor is None: + return [] + return self._world_monitor.list_cached_detections() + + @rpc + def list_added_obstacles(self) -> list[dict[str, Any]]: + """List perception obstacles currently in the planning world.""" + if self._world_monitor is None: + return [] + return self._world_monitor.list_added_obstacles() + + # ========================================================================= + # GraspGen + # ========================================================================= + + def _get_graspgen(self) -> DockerRunner: + """Get or create GraspGen Docker module (lazy init, thread-safe).""" + # Fast path: already initialized (no lock needed for read) + if self._graspgen is not None: + return self._graspgen + + # Slow path: need to initialize (acquire lock to prevent race condition) + with self._lock: + # Double-check: another thread may have initialized while we waited for lock + if self._graspgen is not None: + return self._graspgen + + # Ensure GraspGen model checkpoints are pulled from LFS + get_data("models_graspgen") + + docker_file = ( + DIMOS_PROJECT_ROOT + / "dimos" + / "manipulation" + / "grasping" + / "docker_context" + / "Dockerfile" + ) + + # Auto-mount host directory for visualization output when enabled. + docker_volumes: list[tuple[str, str, str]] = [] + if self.config.graspgen_save_visualization_data: + host_dir = self.config.graspgen_visualization_output_path.parent + host_dir.mkdir(parents=True, exist_ok=True) + docker_volumes.append((str(host_dir), _GRASPGEN_VIZ_CONTAINER_DIR, "rw")) + + graspgen = DockerRunner( + GraspGenModule, # type: ignore[arg-type] + docker_file=docker_file, + docker_build_context=DIMOS_PROJECT_ROOT, + docker_image=self.config.graspgen_docker_image, + docker_env={"CI": "1"}, # skip interactive system config prompt in container + docker_volumes=docker_volumes, + gripper_type=self.config.graspgen_gripper_type, + num_grasps=self.config.graspgen_num_grasps, + topk_num_grasps=self.config.graspgen_topk_num_grasps, + grasp_threshold=self.config.graspgen_grasp_threshold, + filter_collisions=self.config.graspgen_filter_collisions, + save_visualization_data=self.config.graspgen_save_visualization_data, + visualization_output_path=_GRASPGEN_VIZ_CONTAINER_PATH, + ) + graspgen.start() + self._graspgen = graspgen # cache only after successful start + return self._graspgen + + @rpc + def generate_grasps( + self, + pointcloud: PointCloud2, + scene_pointcloud: PointCloud2 | None = None, + ) -> PoseArray | None: + """Generate grasp poses for the given point cloud via GraspGen Docker module.""" + try: + graspgen = self._get_graspgen() + return graspgen.generate_grasps(pointcloud, scene_pointcloud) # type: ignore[no-any-return] + except Exception as e: + logger.error(f"Grasp generation failed: {e}") + return None + + # ========================================================================= + # Pick/Place Helpers + # ========================================================================= + + def _compute_pre_grasp_pose(self, grasp_pose: Pose, offset: float = 0.10) -> Pose: + """Compute a pre-grasp pose offset along the approach direction (local -Z). + + Args: + grasp_pose: The final grasp pose + offset: Distance to retract along the approach direction (meters) + + Returns: + Pre-grasp pose offset from the grasp pose + """ + from dimos.utils.transform_utils import offset_distance + + return offset_distance(grasp_pose, offset) + + def _find_object_in_detections( + self, object_name: str, object_id: str | None = None + ) -> DetObject | None: + """Find an object in the detection snapshot by name or ID. + + Uses the snapshot taken during the last scan_objects/refresh call, + not the volatile live cache (which changes labels every frame). + + Args: + object_name: Name/label to search for + object_id: Optional specific object ID + + Returns: + Matching DetObject, or None + """ + if not self._detection_snapshot: + logger.warning("No detection snapshot — call scan_objects() first") + return None + + for det in self._detection_snapshot: + if object_id and det.object_id == object_id: + return det + if object_name.lower() in det.name.lower() or det.name.lower() in object_name.lower(): + return det + + available = [det.name for det in self._detection_snapshot] + logger.warning(f"Object '{object_name}' not found in snapshot. Available: {available}") + return None + + def _generate_grasps_for_pick( + self, object_name: str, object_id: str | None = None + ) -> list[Pose] | None: + """Generate grasp poses for an object. + + Computes a top-down approach grasp from the object's detected position. + + Args: + object_name: Name of the object + object_id: Optional object ID + + Returns: + List of grasp poses (best first), or None if object not found + """ + det = self._find_object_in_detections(object_name, object_id) + if det is None: + logger.warning(f"Object '{object_name}' not found in detections") + return None + + c = det.center + grasp_pose = Pose(Vector3(c.x, c.y, c.z), Quaternion.from_euler(Vector3(0.0, math.pi, 0.0))) + logger.info(f"Heuristic grasp for '{object_name}' at ({c.x:.3f}, {c.y:.3f}, {c.z:.3f})") + return [grasp_pose] + + # ========================================================================= + # Perception Skills + # ========================================================================= + + @skill + def get_scene_info(self, robot_name: str | None = None) -> str: + """Get current robot state, detected objects, and scene information. + + Returns a summary of the robot's joint positions, end-effector pose, + gripper state, detected objects, and obstacle count. + + Args: + robot_name: Robot to query (only needed for multi-arm setups). + """ + lines: list[str] = [] + + # Robot state + joints = self.get_current_joints(robot_name) + if joints is not None: + lines.append(f"Joints: [{', '.join(f'{j:.3f}' for j in joints)}]") + else: + lines.append("Joints: unavailable (no state received)") + + ee_pose = self.get_ee_pose(robot_name) + if ee_pose is not None: + p = ee_pose.position + lines.append(f"EE pose: ({p.x:.4f}, {p.y:.4f}, {p.z:.4f})") + else: + lines.append("EE pose: unavailable") + + # Gripper + gripper_pos = self.get_gripper(robot_name) + if gripper_pos is not None: + lines.append(f"Gripper: {gripper_pos:.3f}m") + else: + lines.append("Gripper: not configured") + + # Perception + perception = self.get_perception_status() + lines.append( + f"Perception: {perception.get('cached', 0)} cached, {perception.get('added', 0)} obstacles added" + ) + + detections = self._detection_snapshot + if detections: + lines.append(f"Detected objects ({len(detections)}):") + for det in detections: + c = det.center + lines.append(f" - {det.name}: ({c.x:.3f}, {c.y:.3f}, {c.z:.3f})") + else: + lines.append("Detected objects: none") + + # Visualization + url = self.get_visualization_url() + if url: + lines.append(f"Visualization: {url}") + + # State + lines.append(f"State: {self.get_state()}") + + return "\n".join(lines) + + @skill + def scan_objects(self, min_duration: float = 1.0, robot_name: str | None = None) -> str: + """Scan the scene and list detected objects with their 3D positions. + + Refreshes perception obstacles from the latest sensor data and returns + a formatted list of all detected objects. + + Args: + min_duration: Minimum time in seconds to wait for stable detections. + robot_name: Robot context (only needed for multi-arm setups). + """ + obstacles = self.refresh_obstacles(min_duration) + + detections = self._detection_snapshot + if not detections: + return "No objects detected in scene" + + lines = [f"Detected {len(detections)} object(s):"] + for det in detections: + c = det.center + lines.append(f" - {det.name}: ({c.x:.3f}, {c.y:.3f}, {c.z:.3f})") + + if obstacles: + lines.append(f"\n{len(obstacles)} obstacle(s) added to planning world") + + return "\n".join(lines) + + # ========================================================================= + # Long-Horizon Skills — Pick and Place + # ========================================================================= + + @skill + def pick( + self, + object_name: str, + object_id: str | None = None, + robot_name: str | None = None, + ) -> str: + """Pick up an object by name using grasp planning and motion execution. + + Generates grasp poses, plans collision-free approach/grasp/retract motions, + and executes them. + + Args: + object_name: Name of the object to pick (e.g. "cup", "bottle", "can"). + object_id: Optional unique object ID from perception for precise identification. + robot_name: Robot to use (only needed for multi-arm setups). + """ + robot = self._get_robot(robot_name) + if robot is None: + return "Error: Robot not found" + rname, _, config, _ = robot + pre_grasp_offset = config.pre_grasp_offset + + # 1. Generate grasps (uses already-cached detections — call scan_objects first) + logger.info(f"Generating grasp poses for '{object_name}'...") + grasp_poses = self._generate_grasps_for_pick(object_name, object_id) + if not grasp_poses: + return f"Error: No grasp poses found for '{object_name}'. Object may not be detected." + + # 2. Try each grasp candidate + max_attempts = min(len(grasp_poses), 5) + for i, grasp_pose in enumerate(grasp_poses[:max_attempts]): + pre_grasp_pose = self._compute_pre_grasp_pose(grasp_pose, pre_grasp_offset) + + logger.info(f"Planning approach to pre-grasp (attempt {i + 1}/{max_attempts})...") + if not self.plan_to_pose(pre_grasp_pose, rname): + logger.info(f"Grasp candidate {i + 1} approach planning failed, trying next") + continue # Try next candidate + + # Open gripper before approach + logger.info("Opening gripper...") + self._set_gripper_position(0.85, rname) + time.sleep(0.5) + + # 3. Preview + execute approach + err = self._preview_execute_wait(rname) + if err: + return err + + # 4. Move to grasp pose + logger.info("Moving to grasp position...") + if not self.plan_to_pose(grasp_pose, rname): + return "Error: Grasp pose planning failed" + err = self._preview_execute_wait(rname) + if err: + return err + + # 5. Close gripper + logger.info("Closing gripper...") + self._set_gripper_position(0.0, rname) + time.sleep(1.5) # Wait for gripper to close + + # 6. Retract to pre-grasp + logger.info("Retracting with object...") + if not self.plan_to_pose(pre_grasp_pose, rname): + return "Error: Retract planning failed" + err = self._preview_execute_wait(rname) + if err: + return err + + # Store pick position so place_back() can return the object + self._last_pick_position = grasp_pose.position + + return f"Pick complete — grasped '{object_name}' successfully" + + return f"Error: All {max_attempts} grasp attempts failed for '{object_name}'" + + @skill + def place( + self, + x: float, + y: float, + z: float, + robot_name: str | None = None, + ) -> str: + """Place a held object at the specified position. + + Plans and executes an approach, lowers to the target, releases the gripper, + and retracts. + + Args: + x: Target X position in meters. + y: Target Y position in meters. + z: Target Z position in meters. + robot_name: Robot to use (only needed for multi-arm setups). + """ + robot = self._get_robot(robot_name) + if robot is None: + return "Error: Robot not found" + rname, _, config, _ = robot + pre_place_offset = config.pre_grasp_offset + + # Compute place pose (top-down approach) + place_pose = Pose(Vector3(x, y, z), Quaternion.from_euler(Vector3(0.0, math.pi, 0.0))) + pre_place_pose = self._compute_pre_grasp_pose(place_pose, pre_place_offset) + + # 1. Move to pre-place + logger.info(f"Planning approach to place position ({x:.3f}, {y:.3f}, {z:.3f})...") + if not self.plan_to_pose(pre_place_pose, rname): + return "Error: Pre-place approach planning failed" + + err = self._preview_execute_wait(rname) + if err: + return err + + # 2. Lower to place position + logger.info("Lowering to place position...") + if not self.plan_to_pose(place_pose, rname): + return "Error: Place pose planning failed" + err = self._preview_execute_wait(rname) + if err: + return err + + # 3. Release + logger.info("Releasing object...") + self._set_gripper_position(0.85, rname) + time.sleep(1.0) + + # 4. Retract + logger.info("Retracting...") + if not self.plan_to_pose(pre_place_pose, rname): + return "Error: Retract planning failed" + err = self._preview_execute_wait(rname) + if err: + return err + + return f"Place complete — object released at ({x:.3f}, {y:.3f}, {z:.3f})" + + @skill + def place_back(self, robot_name: str | None = None) -> str: + """Place the held object back at its original pick position. + + Uses the position stored from the last successful pick operation. + + Args: + robot_name: Robot to use (only needed for multi-arm setups). + """ + if self._last_pick_position is None: + return "Error: No previous pick position stored — run pick() first" + + p = self._last_pick_position + logger.info(f"Placing back at original position ({p.x:.3f}, {p.y:.3f}, {p.z:.3f})...") + return self.place(p.x, p.y, p.z, robot_name) + + @skill + def pick_and_place( + self, + object_name: str, + place_x: float, + place_y: float, + place_z: float, + object_id: str | None = None, + robot_name: str | None = None, + ) -> str: + """Pick up an object and place it at a target location. + + Combines the pick and place skills into a single end-to-end operation. + + Args: + object_name: Name of the object to pick (e.g. "cup", "bottle"). + place_x: Target X position to place the object (meters). + place_y: Target Y position to place the object (meters). + place_z: Target Z position to place the object (meters). + object_id: Optional unique object ID from perception. + robot_name: Robot to use (only needed for multi-arm setups). + """ + logger.info( + f"Starting pick and place: pick '{object_name}' → place at ({place_x:.3f}, {place_y:.3f}, {place_z:.3f})" + ) + + # Pick phase + result = self.pick(object_name, object_id, robot_name) + if result.startswith("Error:"): + return result + + # Place phase + return self.place(place_x, place_y, place_z, robot_name) + + # ========================================================================= + # Lifecycle + # ========================================================================= + + @rpc + def stop(self) -> None: + """Stop the pick-and-place module (cleanup GraspGen + delegate to base).""" + logger.info("Stopping PickAndPlaceModule") + + # Stop GraspGen Docker container (thread-safe access to shared state) + with self._lock: + if self._graspgen is not None: + self._graspgen.stop() + self._graspgen = None + + super().stop() + + +# Expose blueprint for declarative composition +pick_and_place_module = PickAndPlaceModule.blueprint From 9e53cf765658e6c707373a51310936cba8a356a0 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Tue, 24 Feb 2026 19:33:11 -0800 Subject: [PATCH 4/6] updated blueprints --- dimos/manipulation/manipulation_blueprints.py | 57 +++++++++++++++++-- dimos/robot/all_blueprints.py | 2 + 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/dimos/manipulation/manipulation_blueprints.py b/dimos/manipulation/manipulation_blueprints.py index e95e41537..396f2046e 100644 --- a/dimos/manipulation/manipulation_blueprints.py +++ b/dimos/manipulation/manipulation_blueprints.py @@ -33,6 +33,7 @@ from dimos.core.transport import LCMTransport from dimos.hardware.sensors.camera.realsense import realsense_camera from dimos.manipulation.manipulation_module import manipulation_module +from dimos.manipulation.pick_and_place_module import pick_and_place_module from dimos.manipulation.planning.spec import RobotModelConfig from dimos.msgs.geometry_msgs import PoseStamped, Quaternion, Transform, Vector3 from dimos.msgs.sensor_msgs import JointState @@ -328,6 +329,43 @@ def _make_piper_config( ) +# XArm7 planner + LLM agent for testing base ManipulationModule skills +# No perception — uses the base module's planning + gripper skills only. +# Usage: dimos run coordinator-mock, then dimos run xarm7-planner-coordinator-agent +_BASE_MANIPULATION_AGENT_SYSTEM_PROMPT = """\ +You are a robotic manipulation assistant controlling an xArm7 robot arm. + +Available skills: +- get_robot_state: Get current joint positions, end-effector pose, and gripper state. +- move_to_pose: Move end-effector to ABSOLUTE x, y, z (meters) with optional roll, pitch, yaw (radians). +- move_to_joints: Move to a joint configuration (comma-separated radians). +- open_gripper / close_gripper / set_gripper: Control the gripper. +- go_home: Move to the home/observe position. +- go_init: Return to the startup position. +- reset: Clear a FAULT state and return to IDLE. Use this when a motion fails. + +COORDINATE SYSTEM (world frame, meters): +- X axis = forward (away from the robot base) +- Y axis = left +- Z axis = up +- Z=0 is the robot base level; typical working height is Z = 0.2-0.5 + +CRITICAL WORKFLOW for relative movement requests (e.g. "move 20cm forward"): +1. Call get_robot_state to get the current EE pose. +2. Add the requested offset to the CURRENT position. Example: if EE is at \ +(0.3, 0.0, 0.4) and user says "move 20cm forward", target is (0.5, 0.0, 0.4). +3. Call move_to_pose with the computed ABSOLUTE target. +NEVER pass only the offset as coordinates — that would send the robot to near-origin. + +ERROR RECOVERY: If a motion fails or the state becomes FAULT, call reset before retrying. +""" + +xarm7_planner_coordinator_agent = autoconnect( + xarm7_planner_coordinator, + Agent.blueprint(system_prompt=_BASE_MANIPULATION_AGENT_SYSTEM_PROMPT), +) + + # XArm7 with eye-in-hand RealSense camera for perception-based manipulation # TF chain: world → link7 (ManipulationModule) → camera_link (RealSense) # Usage: dimos run coordinator-mock, then dimos run xarm-perception @@ -338,7 +376,7 @@ def _make_piper_config( xarm_perception = ( autoconnect( - manipulation_module( + pick_and_place_module( robots=[ _make_xarm7_config( "arm", @@ -375,22 +413,30 @@ def _make_piper_config( _MANIPULATION_AGENT_SYSTEM_PROMPT = """\ You are a robotic manipulation assistant controlling an xArm7 robot arm. -Use ONLY these ManipulationModule skills for manipulation tasks: +Available skills: +- get_robot_state: Get current joint positions, end-effector pose, and gripper state. - scan_objects: Scan scene and list detected objects with 3D positions. Always call this first. - pick: Pick up an object by name. Requires scan_objects first. - place: Place a held object at x, y, z position. - place_back: Place a held object back at its original pick position. - pick_and_place: Pick an object and place it at a target location. -- move_to_pose: Move end-effector to x, y, z with optional roll, pitch, yaw. +- move_to_pose: Move end-effector to ABSOLUTE x, y, z (meters) with optional roll, pitch, yaw (radians). - move_to_joints: Move to a joint configuration (comma-separated radians). - open_gripper / close_gripper / set_gripper: Control the gripper. - go_home: Move to the home/observe position. - go_init: Return to the startup position. - get_scene_info: Get full robot state, detected objects, and scene info. +- reset: Clear a FAULT state and return to IDLE. +- clear_perception_obstacles: Clear detected obstacles from the planning world. \ +Use when planning fails with COLLISION_AT_START. + +COORDINATE SYSTEM (world frame, meters): X=forward, Y=left, Z=up. Z=0 is robot base. + +ERROR RECOVERY: If planning fails with COLLISION_AT_START, call clear_perception_obstacles \ +then reset, then retry. Detected objects may overlap the robot's current position. -Do NOT use the 'detect' or 'select' skills — use scan_objects instead. -For robot_name parameters, always omit or pass None (single-arm setup). After pick or place, return to init with go_init unless another action follows immediately. +Do NOT use the 'detect' or 'select' skills — use scan_objects instead. """ xarm_perception_agent = autoconnect( @@ -405,6 +451,7 @@ def _make_piper_config( "dual_xarm6_planner", "xarm6_planner_only", "xarm7_planner_coordinator", + "xarm7_planner_coordinator_agent", "xarm_perception", "xarm_perception_agent", ] diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index ed9098280..3119b0ead 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -84,6 +84,7 @@ "xarm-perception-agent": "dimos.manipulation.manipulation_blueprints:xarm_perception_agent", "xarm6-planner-only": "dimos.manipulation.manipulation_blueprints:xarm6_planner_only", "xarm7-planner-coordinator": "dimos.manipulation.manipulation_blueprints:xarm7_planner_coordinator", + "xarm7-planner-coordinator-agent": "dimos.manipulation.manipulation_blueprints:xarm7_planner_coordinator_agent", "xarm7-trajectory-sim": "dimos.simulation.sim_blueprints:xarm7_trajectory_sim", } @@ -122,6 +123,7 @@ "person-follow-skill": "dimos.agents.skills.person_follow", "person-tracker-module": "dimos.perception.detection.person_tracker", "phone-teleop-module": "dimos.teleop.phone.phone_teleop_module", + "pick-and-place-module": "dimos.manipulation.pick_and_place_module", "quest-teleop-module": "dimos.teleop.quest.quest_teleop_module", "realsense-camera": "dimos.hardware.sensors.camera.realsense.camera", "replanning-a-star-planner": "dimos.navigation.replanning_a_star.module", From 0a87c29002e5809af531d14de5ef6152ffc162c7 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Tue, 24 Feb 2026 19:33:23 -0800 Subject: [PATCH 5/6] added unit test --- dimos/manipulation/test_manipulation_unit.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dimos/manipulation/test_manipulation_unit.py b/dimos/manipulation/test_manipulation_unit.py index de551d99c..4aa232c74 100644 --- a/dimos/manipulation/test_manipulation_unit.py +++ b/dimos/manipulation/test_manipulation_unit.py @@ -100,7 +100,6 @@ def _make_module(): module._planner = None module._kinematics = None module._coordinator_client = None - module._graspgen = None return module @@ -129,12 +128,14 @@ def test_reset_not_during_execution(self): module._state = ManipulationState.FAULT module._error_message = "Error" - assert module.reset() is True + result = module.reset() + assert "IDLE" in result assert module._state == ManipulationState.IDLE assert module._error_message == "" module._state = ManipulationState.EXECUTING - assert module.reset() is False + result = module.reset() + assert "Error" in result def test_fail_sets_fault_state(self): """_fail helper sets FAULT state and message.""" From 93ecafda2e7aaab9afb9d3c9bbadcf611cbeb19d Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 25 Feb 2026 09:42:06 -0800 Subject: [PATCH 6/6] Partial orientation specification will preserve current orientation for missing angles --- dimos/manipulation/manipulation_module.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 93f25281b..f5036b2df 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -1012,7 +1012,8 @@ def move_to_pose( """ logger.info(f"Planning motion to ({x:.3f}, {y:.3f}, {z:.3f})...") - # If no orientation specified, preserve the current EE orientation + # If no orientation specified, preserve the current EE orientation. + # If partially specified, fill unspecified angles from current orientation. if roll is None and pitch is None and yaw is None: current_pose = self.get_ee_pose(robot_name) if current_pose is not None: @@ -1020,7 +1021,18 @@ def move_to_pose( else: orientation = Quaternion(0, 0, 0, 1) # identity fallback else: - orientation = Quaternion.from_euler(Vector3(roll or 0.0, pitch or 0.0, yaw or 0.0)) + current_pose = self.get_ee_pose(robot_name) + if current_pose is not None: + current_euler = current_pose.orientation.to_euler() + orientation = Quaternion.from_euler( + Vector3( + roll if roll is not None else current_euler.x, + pitch if pitch is not None else current_euler.y, + yaw if yaw is not None else current_euler.z, + ) + ) + else: + orientation = Quaternion.from_euler(Vector3(roll or 0.0, pitch or 0.0, yaw or 0.0)) pose = Pose(Vector3(x, y, z), orientation)