Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions src/heartbeat/heartbeat/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
#!/usr/bin/env python3
import time

import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
import time

import sys

sys.path.append("/home/trickfire/urc-2023/src")

from lib import configs
from lib.color_codes import ColorCodes, colorStr

from lib.interface.robot_interface import RobotInterface
from lib.interface.robot_info import RobotInfo
from lib import configs

# Credit: Most of this code is credit to Anna. I (Hong) just
# add some finishing code and clean up the class.
Expand All @@ -28,7 +21,7 @@ class Heartbeat(Node):
Node (Node): ROS Node
"""

def __init__(self):
def __init__(self) -> None:
"""
Initialize the heartbeat node and provides it with a timer
to keep track of the connection status of the rover.
Expand Down Expand Up @@ -59,13 +52,19 @@ def __init__(self):
10, # Quality of Service (QoS) profile
)

self._is_alive_publisher = self.create_publisher(
Bool, # message type (might change later)
"/hbr", # topic name
10, # Quality of Service (QoS) profile
)

# store the most recent hearbeat time
self._last_heartbeat_time = time.time()

# check connection every 1 second
self._timer = self.create_timer(1.0, self.check_connection)

def heartbeat_callback(self, msg):
def heartbeat_callback(self, msg: Bool) -> None:
"""
A call back method for the heartbeat everytime the
mission control publish a message indicating that the
Expand All @@ -85,11 +84,16 @@ def heartbeat_callback(self, msg):
# Log connection active as before
# doesn't matter the data, pub always pub True
# just check to make sure nothing is wrong with pub
if msg.data == True:
if msg.data:
self.get_logger().info(colorStr("Connection active", ColorCodes.GREEN_OK))
self._connection_lost = False

def check_connection(self):
# Tell the client that the heartbeat was received.
msg = Bool()
msg.data = True
self._is_alive_publisher.publish(msg)

def check_connection(self) -> None:
"""
A method for checking the connection status. If the heartbeat
does not receive a message within 1s, meaning the connection was
Expand All @@ -111,7 +115,7 @@ def check_connection(self):
# ***************
# Private helper method
# ***************
def _stop_all_motors(self):
def _stop_all_motors(self) -> None:
"""
Get all motors from the motor configs file and stop all of them.
"""
Expand All @@ -132,7 +136,7 @@ def _stop_all_motors(self):
self.get_logger().info(colorStr("Stop all motors!", ColorCodes.FAIL_RED))


def main(args=None):
def main(args: list[str] | None = None) -> None:
rclpy.init(args=args)
heartbeat_node = Heartbeat()
rclpy.spin(heartbeat_node)
Expand All @@ -142,4 +146,4 @@ def main(args=None):


if __name__ == "__main__":
Heartbeat.main()
main()