diff --git a/src/heartbeat/heartbeat/heartbeat.py b/src/heartbeat/heartbeat/heartbeat.py index 91aefd3d..76c6afdc 100644 --- a/src/heartbeat/heartbeat/heartbeat.py +++ b/src/heartbeat/heartbeat/heartbeat.py @@ -1,19 +1,12 @@ -#!/usr/bin/env python3 +import time import rclpy from rclpy.node import Node from std_msgs.msg import Bool -import time - -import sys - -sys.path.append("/home/trickfire/urc-2023/src") +from lib import configs from lib.color_codes import ColorCodes, colorStr - from lib.interface.robot_interface import RobotInterface -from lib.interface.robot_info import RobotInfo -from lib import configs # Credit: Most of this code is credit to Anna. I (Hong) just # add some finishing code and clean up the class. @@ -28,7 +21,7 @@ class Heartbeat(Node): Node (Node): ROS Node """ - def __init__(self): + def __init__(self) -> None: """ Initialize the heartbeat node and provides it with a timer to keep track of the connection status of the rover. @@ -59,13 +52,19 @@ def __init__(self): 10, # Quality of Service (QoS) profile ) + self._is_alive_publisher = self.create_publisher( + Bool, # message type (might change later) + "/hbr", # topic name + 10, # Quality of Service (QoS) profile + ) + # store the most recent hearbeat time self._last_heartbeat_time = time.time() # check connection every 1 second self._timer = self.create_timer(1.0, self.check_connection) - def heartbeat_callback(self, msg): + def heartbeat_callback(self, msg: Bool) -> None: """ A call back method for the heartbeat everytime the mission control publish a message indicating that the @@ -85,11 +84,16 @@ def heartbeat_callback(self, msg): # Log connection active as before # doesn't matter the data, pub always pub True # just check to make sure nothing is wrong with pub - if msg.data == True: + if msg.data: self.get_logger().info(colorStr("Connection active", ColorCodes.GREEN_OK)) self._connection_lost = False - def check_connection(self): + # Tell the client that the heartbeat was received. + msg = Bool() + msg.data = True + self._is_alive_publisher.publish(msg) + + def check_connection(self) -> None: """ A method for checking the connection status. If the heartbeat does not receive a message within 1s, meaning the connection was @@ -111,7 +115,7 @@ def check_connection(self): # *************** # Private helper method # *************** - def _stop_all_motors(self): + def _stop_all_motors(self) -> None: """ Get all motors from the motor configs file and stop all of them. """ @@ -132,7 +136,7 @@ def _stop_all_motors(self): self.get_logger().info(colorStr("Stop all motors!", ColorCodes.FAIL_RED)) -def main(args=None): +def main(args: list[str] | None = None) -> None: rclpy.init(args=args) heartbeat_node = Heartbeat() rclpy.spin(heartbeat_node) @@ -142,4 +146,4 @@ def main(args=None): if __name__ == "__main__": - Heartbeat.main() + main()