diff --git a/src/autonomous_nav/autonomous_nav/navigation_node.py b/src/autonomous_nav/autonomous_nav/navigation_node.py index aa740c3..d656de0 100644 --- a/src/autonomous_nav/autonomous_nav/navigation_node.py +++ b/src/autonomous_nav/autonomous_nav/navigation_node.py @@ -247,6 +247,8 @@ def updateNavigation(self) -> None: elif self.global_costmap != None and len(self.path.poses) == 0: self.get_logger().warn("Running test") self.planPath(self.global_costmap) + self.get_logger().info(f"plotted to goal successfully") + self.path_pub.publish(self.path) self.get_logger().info(f"total indices queried: {self.index_count}") return self.publishStatus(f"En route to waypoint ({goal_x:.2f}, {goal_y:.2f})") @@ -276,6 +278,7 @@ def planPath( """ This algorithm looks at the global occupancy grid in order to plan a path through it for the rover using an A* style search algorithm. """ + publish_count = 0 lowest_cost_position = self.current_position previous_position = lowest_cost_position distance_to_goal = self.distance_2d( @@ -292,7 +295,8 @@ def planPath( # we have found no suitable indicies # pop the current position from the path self.get_logger().info(f"backtracking") - self.path.poses.pop() + if len(self.path.poses) != 0: + self.path.poses.pop() # make the new lowest position the item at the top of the stack lowest_cost_position = ( self.path.poses[-1].pose.position.x, @@ -303,22 +307,30 @@ def planPath( lowest_cost_node = self.find_lowest_cost_node(target_area, grid) lowest_cost_position = self.index_to_position(grid, lowest_cost_node) - self.append_path(lowest_cost_position) + if publish_count % 10 == 0: + self.get_logger().info( + f"current lowest cost position is {lowest_cost_position[0]},{lowest_cost_position[1]}" + ) + publish_count = publish_count + 1 + self.append_path(lowest_cost_position) distance_to_goal = self.distance_2d( lowest_cost_position[0], lowest_cost_position[1], self.end_goal_waypoint[0], self.end_goal_waypoint[1], ) - self.get_logger().info(f"plotted to goal successfully") - self.path_pub.publish(self.path) def find_lowest_cost_node(self, target_area: list[Tuple[int, int]], grid: OccupancyGrid) -> int: self.get_logger().info(f"target area is this large: {len(target_area)}") minimum_cost = sys.float_info.max for item in target_area: item_cost = self.distance_between_indicies(grid, item[1], self.end_goal_index) - if item_cost < minimum_cost and item_cost != 100 and item_cost != -1: + if ( + item_cost < minimum_cost + and item_cost != 100 + and item_cost != -1 + and item_cost != 50 + ): minimum_cost = item_cost minimum_index = item[1] diff --git a/src/autonomous_nav/autonomous_nav/stuck_detection_node.py b/src/autonomous_nav/autonomous_nav/stuck_detection_node.py new file mode 100644 index 0000000..6deb684 --- /dev/null +++ b/src/autonomous_nav/autonomous_nav/stuck_detection_node.py @@ -0,0 +1,226 @@ +import sys +import time +from dataclasses import dataclass +from typing import Callable, Dict, Tuple + +import rclpy +from rclpy.executors import ExternalShutdownException +from rclpy.node import Node +from rclpy.subscription import Subscription +from std_msgs.msg import Bool, Float32, String, UInt8 + +from lib.color_codes import ColorCodes, colorStr +from lib.configs import MoteusMotorConfig, MotorConfigs +from lib.moteus_motor_state import MoteusMotorState, MoteusRunSettings + +NODE_NAME = "stuck_detection_node" +DRIVE_MOTOR_CONFIGS = [ + MotorConfigs.FRONT_LEFT_DRIVE_MOTOR, + MotorConfigs.FRONT_RIGHT_DRIVE_MOTOR, + MotorConfigs.MID_LEFT_DRIVE_MOTOR, + MotorConfigs.MID_RIGHT_DRIVE_MOTOR, + MotorConfigs.REAR_LEFT_DRIVE_MOTOR, + MotorConfigs.REAR_RIGHT_DRIVE_MOTOR, +] +RMX8_25_RATED_TORQUE = 10 # N/m + + +@dataclass +class StuckReason: + ros_message: str + """ The message sent to the ROS topic. """ + stuck_duration: float + """ How many seconds tripped before marked as stuck. """ + mask: int + """ How to encode/decode this reason. """ + + def __hash__(self) -> int: + return self.mask + + +class StuckReasons: + """Reasons for why we are stuck.""" + + WHEELS_CANT_SPIN = StuckReason("wheels-cant-spin", 1.0, 1) + FREE_SPINNING = StuckReason("free-spinning", 1.0, 1 << 1) + + +class StuckDetectionNode(Node): + """ + Determines whether the rover is currently "stuck". + + Publications: + - /stuck_detection/is_stuck [Bool] + - /stuck_detection/reason [UInt8] + - Reasons are encoded by a bitmask (see StuckReasons for usage). + """ + + DEBUG = True + + # Initialization + + def __init__(self) -> None: + super().__init__(NODE_NAME) + + for config in DRIVE_MOTOR_CONFIGS: + self.subscribe_to_motor(config) + + # TODO: Add lifetimes to the values + self.target_states: Dict[int, MoteusRunSettings] = {} + self.current_states: Dict[int, MoteusMotorState] = {} + + self.stuck_start: Dict[StuckReason, float] = {} + + self.stuck_publisher = self.create_publisher(Bool, "/stuck_detection/is_stuck", 10) + self.stuck_reason_publisher = self.create_publisher(UInt8, "/stuck_detection/reason", 10) + + if self.DEBUG: + self.target_velocity_ratio_publisher = self.create_publisher( + Float32, "/stuck_detection/debug/target_velocity_ratio", 10 + ) + self.rated_torque_ratio_publisher = self.create_publisher( + Float32, "/stuck_detection/debug/rated_torque_ratio", 10 + ) + + self.timer = self.create_timer(0.1, self.update) + + def subscribe_to_motor( + self, motor_config: MoteusMotorConfig + ) -> Tuple[Subscription, Subscription]: + """ + Subscribes to the motor's target state and current state. + """ + + def get_target_callback() -> Callable[[String], None]: + return lambda msg: self.motor_target_state_callback( + motor_config.can_id, MoteusRunSettings.fromJsonMsg(msg) + ) + + def get_current_callback() -> Callable[[String], None]: + return lambda msg: self.motor_current_state_callback( + motor_config.can_id, MoteusMotorState.fromJsonMsg(msg) + ) + + return ( + self.create_subscription( + String, + motor_config.getInterfaceTopicName(), + get_target_callback(), + 10, + ), + self.create_subscription( + String, + motor_config.getCanTopicName(), + get_current_callback(), + 10, + ), + ) + + def motor_current_state_callback(self, can_id: int, state: MoteusMotorState) -> None: + self.current_states[can_id] = state + + def motor_target_state_callback(self, can_id: int, settings: MoteusRunSettings) -> None: + self.target_states[can_id] = settings + + # Detection + + def update_stuck_reason(self, reason: StuckReason, is_active: bool) -> bool: + now = time.time() + if is_active: + if reason in self.stuck_start: + return now - self.stuck_start[reason] > reason.stuck_duration + else: + self.stuck_start[reason] = now + else: + self.stuck_start.pop(reason, 0.0) + + return False + + def calculate_per_motor_effort(self) -> list[StuckReason]: + FREE_SPINNING_THRESH = (0.2, 0.9) # v + STUCK_THRESH = (0.5, 0.2) # >t, FREE_SPINNING_THRESH[1]: + free_spinning_count += 1 + + if t_norm > STUCK_THRESH[0] and v_norm < STUCK_THRESH[1]: + cant_spin_count += 1 + + stuck = [] + if self.update_stuck_reason( + StuckReasons.WHEELS_CANT_SPIN, cant_spin_count >= STUCK_WHEEL_MIN_COUNT + ): + stuck.append(StuckReasons.WHEELS_CANT_SPIN) + if self.update_stuck_reason( + StuckReasons.FREE_SPINNING, free_spinning_count >= STUCK_WHEEL_MIN_COUNT + ): + stuck.append(StuckReasons.FREE_SPINNING) + return stuck + + # Update + + def update(self) -> None: + stuck = self.calculate_per_motor_effort() + + if len(stuck) > 0: + self.stuck_publisher.publish(Bool(data=True)) + encoded = 0 + for reason in stuck: + encoded |= reason.mask + self.stuck_reason_publisher.publish(UInt8(data=encoded)) + else: + self.stuck_publisher.publish(Bool(data=False)) + + +def main(args: list[str] | None = None) -> None: + rclpy.init(args=args) + stuck_detection_node = None + try: + stuck_detection_node = StuckDetectionNode() + rclpy.spin(stuck_detection_node) + except KeyboardInterrupt: + pass + except ExternalShutdownException: + if stuck_detection_node is not None: + stuck_detection_node.get_logger().info( + colorStr(f"Shutting down {NODE_NAME}", ColorCodes.BLUE_OK) + ) + finally: + if stuck_detection_node is not None: + stuck_detection_node.destroy_node() + rclpy.shutdown() + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/src/autonomous_nav/autonomous_nav/tests/navigation_node_tests.py b/src/autonomous_nav/autonomous_nav/tests/navigation_node_tests.py index 3f7fe16..ebf1103 100644 --- a/src/autonomous_nav/autonomous_nav/tests/navigation_node_tests.py +++ b/src/autonomous_nav/autonomous_nav/tests/navigation_node_tests.py @@ -1,7 +1,7 @@ import math import sys from pathlib import Path as filePath -from typing import Tuple +from typing import Generator, Tuple import numpy as np import pytest @@ -10,7 +10,7 @@ @pytest.fixture(scope="session", autouse=True) -def ros2_init_shutdown(): +def ros2_init_shutdown() -> Generator: rclpy.init() yield rclpy.shutdown() diff --git a/src/autonomous_nav/autonomous_nav/tests/simuate_dwa_planner.py b/src/autonomous_nav/autonomous_nav/tests/simuate_dwa_planner.py index ab92198..715ea18 100644 --- a/src/autonomous_nav/autonomous_nav/tests/simuate_dwa_planner.py +++ b/src/autonomous_nav/autonomous_nav/tests/simuate_dwa_planner.py @@ -1,11 +1,11 @@ import math import sys +from pathlib import Path from typing import List, Tuple import numpy as np -# from pathlib import Path -from nav_msgs.msg import Path +# from nav_msgs.msg import Path as PathMsg # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) diff --git a/src/autonomous_nav/autonomous_nav/tests/test_stuck_detection_node.py b/src/autonomous_nav/autonomous_nav/tests/test_stuck_detection_node.py new file mode 100644 index 0000000..38aef2c --- /dev/null +++ b/src/autonomous_nav/autonomous_nav/tests/test_stuck_detection_node.py @@ -0,0 +1,159 @@ +import sys +import threading +import time +from pathlib import Path +from typing import Callable, Generator + +import pytest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from std_msgs.msg import Bool, String, UInt8 + +# TODO: This is the only way I've gotten `pytest` to work. +sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) +sys.path.insert(0, str(Path(__file__).parent.parent)) +from stuck_detection_node import DRIVE_MOTOR_CONFIGS, StuckDetectionNode, StuckReasons + +from lib.configs import MotorConfigs +from lib.moteus_motor_state import MoteusMotorState, MoteusRunSettings + +# -- Fixtures -- # + + +@pytest.fixture(scope="session", autouse=True) +def ros2_init_shutdown() -> Generator: + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def spun_node() -> Generator[StuckDetectionNode, None, None]: + node = StuckDetectionNode() + executor = SingleThreadedExecutor() + executor.add_node(node) + + running = True + + def spin() -> None: + while running: + executor.spin_once(timeout_sec=0.1) + + thread = threading.Thread(target=spin, daemon=True) + thread.start() + + try: + yield node + finally: + locals()["running"] = False + running = False + thread.join(timeout=1) + executor.remove_node(node) + node.destroy_node() + + +# -- Constants -- # + +MOTOR_CAN_IDS = [config.can_id for config in DRIVE_MOTOR_CONFIGS] + +MAX_DETECTION_LATENCY = 2 +"""The maximum latency between data publishing and stuck triggering. Dependent on `StuckReason.stuck_duration`.""" + + +# -- Tests -- # + + +class TestStuckDetectionNode: + def test_stuck(self, spun_node: StuckDetectionNode) -> None: + self.is_stuck = False + self.reason = 0 + + spun_node.create_subscription(Bool, "/stuck_detection/is_stuck", self.is_stuck_callback, 10) + spun_node.create_subscription(UInt8, "/stuck_detection/reason", self.reason_callback, 10) + + self.publishers = { + config.can_id: [ + spun_node.create_publisher(String, config.getInterfaceTopicName(), 10), + spun_node.create_publisher(String, config.getCanTopicName(), 10), + ] + for config in DRIVE_MOTOR_CONFIGS + } + + print("") + + self._test( + "At Speed, Rated Torque, All Wheels = Not Stuck", + MoteusRunSettings(velocity=10), + lambda id: MoteusMotorState(id, velocity=10, torque=10), + should_be_stuck=False, + min_latency=1, + ) + + self._test( + "At Speed, Low Torque (10% rated), All Wheels = Stuck, Free Spinning", + MoteusRunSettings(velocity=10), + lambda id: MoteusMotorState(id, velocity=10, torque=1), + should_be_stuck=True, + reasons_mask=StuckReasons.FREE_SPINNING.mask, + ) + + self._test( + "At Speed, Low Torque (10% rated), 2 Wheels = Not Stuck", + MoteusRunSettings(velocity=10), + lambda id: MoteusMotorState(id, velocity=10, torque=1), + ids=[ + MotorConfigs.FRONT_LEFT_DRIVE_MOTOR.can_id, + MotorConfigs.FRONT_RIGHT_DRIVE_MOTOR.can_id, + ], + should_be_stuck=False, + min_latency=1, + ) + + self._test( + "Low Speed, High Torque (75% rated), All Wheels = Stuck, Can't Spin", + MoteusRunSettings(velocity=10), + lambda id: MoteusMotorState(id, velocity=1, torque=7.5), + should_be_stuck=True, + reasons_mask=StuckReasons.WHEELS_CANT_SPIN.mask, + ) + + def is_stuck_callback(self, is_stuck: Bool) -> None: + self.is_stuck = is_stuck.data + + def reason_callback(self, reason: UInt8) -> None: + self.reason = reason.data + + def _test( + self, + desc: str, + target: MoteusRunSettings, + state_fn: Callable[[int], MoteusMotorState], + ids: list[int] = MOTOR_CAN_IDS, + min_latency: float = 0, + max_latency: float = MAX_DETECTION_LATENCY, + should_be_stuck: bool = True, + reasons_mask: int = 0, + ) -> None: + for id in ids: + [tx, rx] = self.publishers[id] + tx.publish(target.toMsg()) + rx.publish(state_fn(id).toMsg()) + + passed = False + start = time.time() + now = 0.0 + while (not (passed or now > max_latency)) or now < min_latency: + passed = (self.is_stuck == should_be_stuck) and ( + self.reason & reasons_mask == reasons_mask + ) + now = time.time() - start + + assert passed, desc + f" | is_stuck={self.is_stuck}, reason={self.reason}" + print("PASSED: " + desc) + self.reset() + + def reset(self) -> None: + for [id, [tx, rx]] in self.publishers.items(): + tx.publish(MoteusRunSettings(velocity=0).toMsg()) + rx.publish(MoteusMotorState(id, velocity=0).toMsg()) diff --git a/src/autonomous_nav/setup.py b/src/autonomous_nav/setup.py index 3b4e96f..0c10709 100644 --- a/src/autonomous_nav/setup.py +++ b/src/autonomous_nav/setup.py @@ -27,6 +27,7 @@ "sensor_processing_node = autonomous_nav.sensor_processing_node:main", "gps_anchor_node = autonomous_nav.gps_anchor_node:main", "decision_making_node = autonomous_nav.decision_making_node:main", + "stuck_detection_node = autonomous_nav.stuck_detection_node:main", ], }, ) diff --git a/src/viator_launch/launch/robot.launch.py b/src/viator_launch/launch/robot.launch.py index 119cec0..4796489 100644 --- a/src/viator_launch/launch/robot.launch.py +++ b/src/viator_launch/launch/robot.launch.py @@ -51,6 +51,9 @@ sensor_processing_node = Node( package="autonomous_nav", executable="sensor_processing_node", name="sensor_processing_node" ) +stuck_detection_node = Node( + package="autonomous_nav", executable="stuck_detection_node", name="stuck_detection_node" +) costmap_config = os.path.join( get_package_share_directory("autonomous_nav"), "config", "params.yaml" ) @@ -172,6 +175,7 @@ def generate_launch_description() -> launch.LaunchDescription: # pylint: disabl gps_anchor_node, navigation_node, sensor_processing_node, + stuck_detection_node, launch_include, gps_node, zed_launch,