diff --git a/humanoid_robot_intelligence_control_system_configure/src/face_follower.py b/humanoid_robot_intelligence_control_system_configure/src/face_follower.py index e6d4b4f..f12b5a4 100644 --- a/humanoid_robot_intelligence_control_system_configure/src/face_follower.py +++ b/humanoid_robot_intelligence_control_system_configure/src/face_follower.py @@ -24,9 +24,10 @@ from geometry_msgs.msg import Point from humanoid_robot_intelligence_control_system_walking_module_msgs.msg import WalkingParam from humanoid_robot_intelligence_control_system_walking_module_msgs.srv import GetWalkingParam -from humanoid_robot_intelligence_control_system_detection.follower import calculate_distance_to_face, calculate_footstep, calculate_delta_time +from humanoid_robot_intelligence_control_system_detection.follower import calculate_distance_to, calculate_footstep, calculate_delta_time from humanoid_robot_intelligence_control_system_detection.follower_config import FollowerConfig, FollowerInitializeConfig + class FaceFollower: def __init__(self, ros_version): self.ros_version = ros_version @@ -116,7 +117,7 @@ class FaceFollower: self.initialize_config.approach_face_position = "OutOfRange" - distance_to_face = calculate_distance_to_face( + distance_to_face = calculate_distance_to( self.config.CAMERA_HEIGHT, self.initialize_config.current_tilt, self.config.hip_pitch_offset, face_size) distance_to_approach = 0.22 diff --git a/humanoid_robot_intelligence_control_system_configure/src/face_tracker.py b/humanoid_robot_intelligence_control_system_configure/src/face_tracker.py index 138d645..a7776ab 100644 --- a/humanoid_robot_intelligence_control_system_configure/src/face_tracker.py +++ b/humanoid_robot_intelligence_control_system_configure/src/face_tracker.py @@ -15,31 +15,17 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os -import sys +import rospy import math from sensor_msgs.msg import JointState -from std_msgs.msg import String +from std_msgs.msg import String, Float64MultiArray +from humanoid_robot_intelligence_control_system_ball_detector.msg import CircleSetStamped from geometry_msgs.msg import Point class FaceTracker: - def __init__(self, ros_version): - self.ros_version = ros_version - if self.ros_version == '1': - rospy.init_node('face_tracker') - self.get_param = rospy.get_param - self.logger = rospy - else: - rclpy.init() - self.node = Node('face_tracker') - self.get_param = self.node.get_parameter - self.logger = self.node.get_logger() - - self.initialize_parameters() - self.setup_ros_communication() - - - def initialize_parameters(self): + def __init__(self): + rospy.init_node('face_tracker') + self.FOV_WIDTH = 35.2 * math.pi / 180 self.FOV_HEIGHT = 21.6 * math.pi / 180 self.NOT_FOUND_THRESHOLD = 50 @@ -51,38 +37,31 @@ class FaceTracker: self.current_face_tilt = 0 self.x_error_sum = 0 self.y_error_sum = 0 + self.current_face_bottom = 0 self.tracking_status = "NotFound" self.DEBUG_PRINT = False - - self.p_gain = self.get_param('p_gain', 0.4) - self.i_gain = self.get_param('i_gain', 0.0) - self.d_gain = self.get_param('d_gain', 0.0) - self.logger.info(f"Face tracking Gain : {self.p_gain}, {self.i_gain}, {self.d_gain}") + param_nh = rospy.get_param("~") + self.p_gain = param_nh.get("p_gain", 0.4) + self.i_gain = param_nh.get("i_gain", 0.0) + self.d_gain = param_nh.get("d_gain", 0.0) - def setup_ros_communication(self): - if self.ros_version == '1': - self.head_joint_pub = rospy.Publisher( - "/humanoid_robot_intelligence_control_system/head_control/set_joint_states", JointState, queue_size=1) - self.head_scan_pub = rospy.Publisher( - "/humanoid_robot_intelligence_control_system/head_control/scan_command", String, queue_size=1) - self.face_position_sub = rospy.Subscriber( - "/face_detector/face_position", Point, self.face_position_callback) - self.face_tracking_command_sub = rospy.Subscriber( - "/face_tracker/command", String, self.face_tracker_command_callback) - self.prev_time = rospy.Time.now() - else: - self.head_joint_pub = self.node.create_publisher( - JointState, "/humanoid_robot_intelligence_control_system/head_control/set_joint_states", 1) - self.head_scan_pub = self.node.create_publisher( - String, "/humanoid_robot_intelligence_control_system/head_control/scan_command", 1) - self.face_position_sub = self.node.create_subscription( - Point, "/face_detector/face_position", self.face_position_callback, 10) - self.face_tracking_command_sub = self.node.create_subscription( - String, "/face_tracker/command", self.face_tracker_command_callback, 10) - self.prev_time = self.node.get_clock().now() + rospy.loginfo(f"face tracking Gain : {self.p_gain}, {self.i_gain}, {self.d_gain}") + + self.head_joint_offset_pub = rospy.Publisher( + "/humanoid_robot_intelligence_control_system/head_control/set_joint_states_offset", JointState, queue_size=0) + self.head_joint_pub = rospy.Publisher( + "/humanoid_robot_intelligence_control_system/head_control/set_joint_states", JointState, queue_size=0) + self.head_scan_pub = rospy.Publisher( + "/humanoid_robot_intelligence_control_system/head_control/scan_command", String, queue_size=0) + + self.face_position_sub = rospy.Subscriber( + "/face_detector_node/face_position", Point, self.face_position_callback) + self.face_tracking_command_sub = rospy.Subscriber( + "/face_tracker/command", String, self.face_tracker_command_callback) self.face_position = Point() + self.prev_time = rospy.Time.now() def face_position_callback(self, msg): self.face_position = msg @@ -100,17 +79,20 @@ class FaceTracker: def start_tracking(self): self.on_tracking = True - self.logger.info("Start Face tracking") + rospy.loginfo("Start face tracking") if self.DEBUG_PRINT else None def stop_tracking(self): self.go_init() self.on_tracking = False - self.logger.info("Stop Face tracking") + rospy.loginfo("Stop face tracking") if self.DEBUG_PRINT else None self.current_face_pan = 0 self.current_face_tilt = 0 self.x_error_sum = 0 self.y_error_sum = 0 + def set_using_head_scan(self, use_scan): + self.use_head_scan = use_scan + def process_tracking(self): if not self.on_tracking: self.face_position.z = 0 @@ -120,7 +102,10 @@ class FaceTracker: if self.face_position.z <= 0: self.count_not_found += 1 if self.count_not_found < self.WAITING_THRESHOLD: - tracking_status = "Waiting" + if self.tracking_status in ["Found", "Waiting"]: + tracking_status = "Waiting" + else: + tracking_status = "NotFound" elif self.count_not_found > self.NOT_FOUND_THRESHOLD: self.scan_face() self.count_not_found = 0 @@ -133,17 +118,18 @@ class FaceTracker: if tracking_status != "Found": self.tracking_status = tracking_status + self.current_face_pan = 0 + self.current_face_tilt = 0 + self.x_error_sum = 0 + self.y_error_sum = 0 return tracking_status x_error = -math.atan(self.face_position.x * math.tan(self.FOV_WIDTH)) y_error = -math.atan(self.face_position.y * math.tan(self.FOV_HEIGHT)) + face_size = self.face_position.z - if self.ros_version == '1': - curr_time = rospy.Time.now() - delta_time = (curr_time - self.prev_time).to_sec() - else: - curr_time = self.node.get_clock().now() - delta_time = (curr_time - self.prev_time).nanoseconds / 1e9 + curr_time = rospy.Time.now() + delta_time = (curr_time - self.prev_time).to_sec() self.prev_time = curr_time x_error_diff = (x_error - self.current_face_pan) / delta_time @@ -154,15 +140,18 @@ class FaceTracker: y_error_target = y_error * self.p_gain + y_error_diff * self.d_gain + self.y_error_sum * self.i_gain if self.DEBUG_PRINT: - self.logger.info(f"Error: {x_error * 180 / math.pi} | {y_error * 180 / math.pi}") - self.logger.info(f"Error diff: {x_error_diff * 180 / math.pi} | {y_error_diff * 180 / math.pi} | {delta_time}") - self.logger.info(f"Error sum: {self.x_error_sum * 180 / math.pi} | {self.y_error_sum * 180 / math.pi}") - self.logger.info(f"Error target: {x_error_target * 180 / math.pi} | {y_error_target * 180 / math.pi}") + rospy.loginfo(f"Error: {x_error * 180 / math.pi} | {y_error * 180 / math.pi}") + rospy.loginfo(f"Error diff: {x_error_diff * 180 / math.pi} | {y_error_diff * 180 / math.pi} | {delta_time}") + rospy.loginfo(f"Error sum: {self.x_error_sum * 180 / math.pi} | {self.y_error_sum * 180 / math.pi}") + rospy.loginfo(f"Error target: {x_error_target * 180 / math.pi} | {y_error_target * 180 / math.pi}") self.publish_head_joint(x_error_target, y_error_target) self.current_face_pan = x_error self.current_face_tilt = y_error + self.current_face_bottom = face_size + + self.face_position.z = 0 self.tracking_status = tracking_status return tracking_status @@ -176,7 +165,7 @@ class FaceTracker: head_angle_msg.name = ["head_pan", "head_tilt"] head_angle_msg.position = [pan, tilt] - self.head_joint_pub.publish(head_angle_msg) + self.head_joint_offset_pub.publish(head_angle_msg) def go_init(self): head_angle_msg = JointState() @@ -194,44 +183,12 @@ class FaceTracker: self.head_scan_pub.publish(scan_msg) - def run(self): - if self.ros_version == '1': - rate = rospy.Rate(30) # 30Hz - while not rospy.is_shutdown(): - self.process_tracking() - rate.sleep() - else: - while rclpy.ok(): - self.process_tracking() - rclpy.spin_once(self.node, timeout_sec=0.03) # Approximately 30Hz - -def main(ros_version): - try: - tracker = FaceTracker(ros_version) - tracker.run() - except Exception as e: - if ros_version == '1': - rospy.logerr(f"An error occurred: {e}") - else: - rclpy.shutdown() - print(f"An error occurred: {e}") - if __name__ == '__main__': - ros_version = os.getenv("ROS_VERSION") - if ros_version == "1": - try: - import rospy - main(ros_version) - except rospy.ROSInterruptException: - print("Error in ROS1 main") - elif ros_version == "2": - try: - import rclpy - from rclpy.node import Node - main(ros_version) - except KeyboardInterrupt: - rclpy.shutdown() - print("Error in ROS2 main") - else: - print("Unsupported ROS version. Please set the ROS_VERSION environment variable to '1' for ROS 1 or '2' for ROS 2.") - sys.exit(1) + try: + tracker = FaceTracker() + rate = rospy.Rate(30) + while not rospy.is_shutdown(): + tracker.process_tracking() + rate.sleep() + except rospy.ROSInterruptException: + pass diff --git a/humanoid_robot_intelligence_control_system_configure/src/object_follower.py b/humanoid_robot_intelligence_control_system_configure/src/object_follower.py index 5377ce5..ebe736f 100644 --- a/humanoid_robot_intelligence_control_system_configure/src/object_follower.py +++ b/humanoid_robot_intelligence_control_system_configure/src/object_follower.py @@ -24,9 +24,10 @@ from geometry_msgs.msg import Point from humanoid_robot_intelligence_control_system_walking_module_msgs.msg import WalkingParam from humanoid_robot_intelligence_control_system_walking_module_msgs.srv import GetWalkingParam -from humanoid_robot_intelligence_control_system_detection.follower import calculate_distance_to_object, calculate_footstep, calculate_delta_time +from humanoid_robot_intelligence_control_system_detection.follower import calculate_distance_to, calculate_footstep, calculate_delta_time from humanoid_robot_intelligence_control_system_detection.follower_config import FollowerConfig, FollowerInitializeConfig + class ObjectFollower: def __init__(self, ros_version): self.ros_version = ros_version @@ -116,7 +117,7 @@ class ObjectFollower: self.initialize_config.approach_object_position = "OutOfRange" - distance_to_object = calculate_distance_to_object( + distance_to_object = calculate_distance_to( self.config.CAMERA_HEIGHT, self.initialize_config.current_tilt, self.config.hip_pitch_offset, object_size) distance_to_approach = 0.22 diff --git a/humanoid_robot_intelligence_control_system_configure/src/object_tracker.py b/humanoid_robot_intelligence_control_system_configure/src/object_tracker.py index 2d54fba..4f00592 100644 --- a/humanoid_robot_intelligence_control_system_configure/src/object_tracker.py +++ b/humanoid_robot_intelligence_control_system_configure/src/object_tracker.py @@ -15,56 +15,68 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import rospy +import os +import sys import math from sensor_msgs.msg import JointState -from std_msgs.msg import String, Float64MultiArray -from humanoid_robot_intelligence_control_system_ball_detector.msg import CircleSetStamped +from std_msgs.msg import String from geometry_msgs.msg import Point +from humanoid_robot_intelligence_control_system_detection.tracker import calculate_error, calculate_error_target, calculate_delta_time +from humanoid_robot_intelligence_control_system_detection.tracker_config import TrackerConfig, TrackerInitializeConfig + class ObjectTracker: - def __init__(self): - rospy.init_node('object_tracker') + def __init__(self, ros_version): + self.ros_version = ros_version + self.initialize_config = TrackerInitializeConfig() + self.config = TrackerConfig() + self.initialize_ros() + self.setup_ros_communication() + + def initialize_ros(self): + if self.ros_version == '1': + rospy.init_node('object_tracker') + self.get_param = rospy.get_param + self.logger = rospy + else: + rclpy.init() + self.node = rclpy.create_node('object_tracker') + self.get_param = self.node.get_parameter + self.logger = self.node.get_logger() - self.FOV_WIDTH = 35.2 * math.pi / 180 - self.FOV_HEIGHT = 21.6 * math.pi / 180 - self.NOT_FOUND_THRESHOLD = 50 - self.WAITING_THRESHOLD = 5 - self.use_head_scan = True - self.count_not_found = 0 - self.on_tracking = False - self.current_object_pan = 0 - self.current_object_tilt = 0 - self.x_error_sum = 0 - self.y_error_sum = 0 - self.current_object_bottom = 0 - self.tracking_status = "NotFound" - self.DEBUG_PRINT = False + self.config.update_from_params(self.get_param) + self.initialize_config.update_from_params(self.get_param) - param_nh = rospy.get_param("~") - self.p_gain = param_nh.get("p_gain", 0.4) - self.i_gain = param_nh.get("i_gain", 0.0) - self.d_gain = param_nh.get("d_gain", 0.0) - - rospy.loginfo(f"Object tracking Gain : {self.p_gain}, {self.i_gain}, {self.d_gain}") - - self.head_joint_offset_pub = rospy.Publisher( - "/humanoid_robot_intelligence_control_system/head_control/set_joint_states_offset", JointState, queue_size=0) - self.head_joint_pub = rospy.Publisher( - "/humanoid_robot_intelligence_control_system/head_control/set_joint_states", JointState, queue_size=0) - self.head_scan_pub = rospy.Publisher( - "/humanoid_robot_intelligence_control_system/head_control/scan_command", String, queue_size=0) - - self.object_position_sub = rospy.Subscriber( - "/object_detector_node/object_position", Point, self.object_position_callback) - self.object_tracking_command_sub = rospy.Subscriber( - "/object_tracker/command", String, self.object_tracker_command_callback) - - self.object_position = Point() - self.prev_time = rospy.Time.now() + def setup_ros_communication(self): + if self.ros_version == '1': + self.head_joint_offset_pub = rospy.Publisher( + "/humanoid_robot_intelligence_control_system/head_control/set_joint_states_offset", JointState, queue_size=0) + self.head_joint_pub = rospy.Publisher( + "/humanoid_robot_intelligence_control_system/head_control/set_joint_states", JointState, queue_size=0) + self.head_scan_pub = rospy.Publisher( + "/humanoid_robot_intelligence_control_system/head_control/scan_command", String, queue_size=0) + self.object_position_sub = rospy.Subscriber( + "/object_detector_node/object_position", Point, self.object_position_callback) + self.object_tracking_command_sub = rospy.Subscriber( + "/object_tracker/command", String, self.object_tracker_command_callback) + self.prev_time = rospy.Time.now() + else: + self.head_joint_offset_pub = self.node.create_publisher( + JointState, "/humanoid_robot_intelligence_control_system/head_control/set_joint_states_offset", 10) + self.head_joint_pub = self.node.create_publisher( + JointState, "/humanoid_robot_intelligence_control_system/head_control/set_joint_states", 10) + self.head_scan_pub = self.node.create_publisher( + String, "/humanoid_robot_intelligence_control_system/head_control/scan_command", 10) + self.object_position_sub = self.node.create_subscription( + Point, "/object_detector_node/object_position", self.object_position_callback, 10) + self.object_tracking_command_sub = self.node.create_subscription( + String, "/object_tracker/command", self.object_tracker_command_callback, 10) + self.prev_time = self.node.get_clock().now() def object_position_callback(self, msg): - self.object_position = msg + self.initialize_config.object_position = msg + if self.initialize_config.on_tracking: + self.process_tracking() def object_tracker_command_callback(self, msg): if msg.data == "start": @@ -72,88 +84,90 @@ class ObjectTracker: elif msg.data == "stop": self.stop_tracking() elif msg.data == "toggle_start": - if not self.on_tracking: + if not self.initialize_config.on_tracking: self.start_tracking() else: self.stop_tracking() def start_tracking(self): - self.on_tracking = True - rospy.loginfo("Start Object tracking") if self.DEBUG_PRINT else None + self.initialize_config.on_tracking = True + self.logger.info("Start Object tracking") def stop_tracking(self): self.go_init() - self.on_tracking = False - rospy.loginfo("Stop Object tracking") if self.DEBUG_PRINT else None - self.current_object_pan = 0 - self.current_object_tilt = 0 - self.x_error_sum = 0 - self.y_error_sum = 0 + self.initialize_config.on_tracking = False + self.logger.info("Stop Object tracking") + self.initialize_config.current_object_pan = 0 + self.initialize_config.current_object_tilt = 0 + self.initialize_config.x_error_sum = 0 + self.initialize_config.y_error_sum = 0 def set_using_head_scan(self, use_scan): - self.use_head_scan = use_scan + self.config.use_head_scan = use_scan def process_tracking(self): - if not self.on_tracking: - self.object_position.z = 0 - self.count_not_found = 0 + if not self.initialize_config.on_tracking: + self.initialize_config.object_position.z = 0 + self.initialize_config.count_not_found = 0 return "NotFound" - if self.object_position.z <= 0: - self.count_not_found += 1 - if self.count_not_found < self.WAITING_THRESHOLD: - if self.tracking_status in ["Found", "Waiting"]: + if self.initialize_config.object_position.z <= 0: + self.initialize_config.count_not_found += 1 + if self.initialize_config.count_not_found < self.config.WAITING_THRESHOLD: + if self.initialize_config.tracking_status in ["Found", "Waiting"]: tracking_status = "Waiting" else: tracking_status = "NotFound" - elif self.count_not_found > self.NOT_FOUND_THRESHOLD: + elif self.initialize_config.count_not_found > self.config.NOT_FOUND_THRESHOLD: self.scan_object() - self.count_not_found = 0 + self.initialize_config.count_not_found = 0 tracking_status = "NotFound" else: tracking_status = "NotFound" else: - self.count_not_found = 0 + self.initialize_config.count_not_found = 0 tracking_status = "Found" if tracking_status != "Found": - self.tracking_status = tracking_status - self.current_object_pan = 0 - self.current_object_tilt = 0 - self.x_error_sum = 0 - self.y_error_sum = 0 + self.initialize_config.tracking_status = tracking_status + self.initialize_config.current_object_pan = 0 + self.initialize_config.current_object_tilt = 0 + self.initialize_config.x_error_sum = 0 + self.initialize_config.y_error_sum = 0 return tracking_status - x_error = -math.atan(self.object_position.x * math.tan(self.FOV_WIDTH)) - y_error = -math.atan(self.object_position.y * math.tan(self.FOV_HEIGHT)) - object_size = self.object_position.z + x_error, y_error = calculate_error(self.initialize_config.object_position, self.config.FOV_WIDTH, self.config.FOV_HEIGHT) + object_size = self.initialize_config.object_position.z - curr_time = rospy.Time.now() - delta_time = (curr_time - self.prev_time).to_sec() + curr_time = rospy.Time.now() if self.ros_version == '1' else self.node.get_clock().now() + delta_time = calculate_delta_time(curr_time, self.prev_time) self.prev_time = curr_time - x_error_diff = (x_error - self.current_object_pan) / delta_time - y_error_diff = (y_error - self.current_object_tilt) / delta_time - self.x_error_sum += x_error - self.y_error_sum += y_error - x_error_target = x_error * self.p_gain + x_error_diff * self.d_gain + self.x_error_sum * self.i_gain - y_error_target = y_error * self.p_gain + y_error_diff * self.d_gain + self.y_error_sum * self.i_gain + x_error_diff = (x_error - self.initialize_config.current_object_pan) / delta_time + y_error_diff = (y_error - self.initialize_config.current_object_tilt) / delta_time + self.initialize_config.x_error_sum += x_error + self.initialize_config.y_error_sum += y_error + + x_error_target = calculate_error_target(x_error, x_error_diff, self.initialize_config.x_error_sum, + self.config.p_gain, self.config.i_gain, self.config.d_gain) + y_error_target = calculate_error_target(y_error, y_error_diff, self.initialize_config.y_error_sum, + self.config.p_gain, self.config.i_gain, self.config.d_gain) - if self.DEBUG_PRINT: - rospy.loginfo(f"Error: {x_error * 180 / math.pi} | {y_error * 180 / math.pi}") - rospy.loginfo(f"Error diff: {x_error_diff * 180 / math.pi} | {y_error_diff * 180 / math.pi} | {delta_time}") - rospy.loginfo(f"Error sum: {self.x_error_sum * 180 / math.pi} | {self.y_error_sum * 180 / math.pi}") - rospy.loginfo(f"Error target: {x_error_target * 180 / math.pi} | {y_error_target * 180 / math.pi}") + if self.config.DEBUG_PRINT: + self.logger.info(f"Error: {x_error * 180 / math.pi} | {y_error * 180 / math.pi}") + self.logger.info(f"Error diff: {x_error_diff * 180 / math.pi} | {y_error_diff * 180 / math.pi} | {delta_time}") + self.logger.info(f"Error sum: {self.initialize_config.x_error_sum * 180 / math.pi} | {self.initialize_config.y_error_sum * 180 / math.pi}") + self.logger.info(f"Error target: {x_error_target * 180 / math.pi} | {y_error_target * 180 / math.pi}") self.publish_head_joint(x_error_target, y_error_target) - self.current_object_pan = x_error - self.current_object_tilt = y_error - self.current_object_bottom = object_size + self.initialize_config.current_object_pan = x_error + self.initialize_config.current_object_tilt = y_error + self.initialize_config.current_object_bottom = object_size - self.object_position.z = 0 + self.initialize_config.object_position.z = 0 - self.tracking_status = tracking_status + self.initialize_config.tracking_status = tracking_status return tracking_status def publish_head_joint(self, pan, tilt): @@ -175,7 +189,7 @@ class ObjectTracker: self.head_joint_pub.publish(head_angle_msg) def scan_object(self): - if not self.use_head_scan: + if not self.config.use_head_scan: return scan_msg = String() @@ -183,12 +197,39 @@ class ObjectTracker: self.head_scan_pub.publish(scan_msg) -if __name__ == '__main__': + def run(self): + if self.ros_version == '1': + rospy.spin() + else: + rclpy.spin(self.node) + +def main(ros_version): try: - tracker = ObjectTracker() - rate = rospy.Rate(30) - while not rospy.is_shutdown(): - tracker.process_tracking() - rate.sleep() - except rospy.ROSInterruptException: - pass + tracker = ObjectTracker(ros_version) + tracker.start_tracking() + tracker.run() + except Exception as e: + if ros_version == '1': + rospy.logerr(f"An error occurred: {e}") + else: + rclpy.shutdown() + print(f"An error occurred: {e}") + +if __name__ == '__main__': + ros_version = os.getenv("ROS_VERSION") + if ros_version == "1": + import rospy + try: + main(ros_version) + except rospy.ROSInterruptException: + print("Error in ROS1 main") + elif ros_version == "2": + import rclpy + try: + main(ros_version) + except KeyboardInterrupt: + rclpy.shutdown() + print("Error in ROS2 main") + else: + print("Unsupported ROS version. Please set the ROS_VERSION environment variable to '1' for ROS 1 or '2' for ROS 2.") + sys.exit(1)