diff --git a/humanoid_robot_intelligence_control_system_detection/src/follower.py b/humanoid_robot_intelligence_control_system_detection/src/follower.py index 8d32487..b19ab4c 100644 --- a/humanoid_robot_intelligence_control_system_detection/src/follower.py +++ b/humanoid_robot_intelligence_control_system_detection/src/follower.py @@ -17,7 +17,7 @@ import math -def calculate_distance_to_object(camera_height, tilt, hip_pitch_offset, object_size): +def calculate_distance_to(camera_height, tilt, hip_pitch_offset, object_size): distance = camera_height * math.tan(math.pi * 0.5 + tilt - hip_pitch_offset - object_size) return abs(distance) diff --git a/humanoid_robot_intelligence_control_system_detection/src/follower_config.py b/humanoid_robot_intelligence_control_system_detection/src/follower_config.py index 8bd53d1..ae09c01 100644 --- a/humanoid_robot_intelligence_control_system_detection/src/follower_config.py +++ b/humanoid_robot_intelligence_control_system_detection/src/follower_config.py @@ -58,6 +58,92 @@ class FollowerConfig: self.curr_period_time = get_param('curr_period_time', self.curr_period_time) self.DEBUG_PRINT = get_param('debug_print', self.DEBUG_PRINT) + def reset(self): + """Reset all values to their initial state.""" + self.__init__() + + def set_fov(self, width, height): + """Set the field of view.""" + self.FOV_WIDTH = width + self.FOV_HEIGHT = height + + def set_camera_height(self, height): + """Set the camera height.""" + self.CAMERA_HEIGHT = height + + def set_thresholds(self, not_found_threshold): + """Set the thresholds.""" + self.NOT_FOUND_THRESHOLD = not_found_threshold + + def set_step_parameters(self, max_fb, max_rl, in_place_fb, min_fb, min_rl, unit_fb, unit_rl): + """Set the step parameters.""" + self.MAX_FB_STEP = max_fb + self.MAX_RL_TURN = max_rl + self.IN_PLACE_FB_STEP = in_place_fb + self.MIN_FB_STEP = min_fb + self.MIN_RL_TURN = min_rl + self.UNIT_FB_STEP = unit_fb + self.UNIT_RL_TURN = unit_rl + + def set_spot_offsets(self, fb_offset, rl_offset, angle_offset): + """Set the spot offsets.""" + self.SPOT_FB_OFFSET = fb_offset + self.SPOT_RL_OFFSET = rl_offset + self.SPOT_ANGLE_OFFSET = angle_offset + + def set_hip_pitch_offset(self, offset): + """Set the hip pitch offset.""" + self.hip_pitch_offset = offset + + def set_period_time(self, period_time): + """Set the current period time.""" + self.curr_period_time = period_time + + def update_accum_period_time(self, delta_time): + """Update the accumulated period time.""" + self.accum_period_time += delta_time + if self.accum_period_time > (self.curr_period_time / 4): + self.accum_period_time = 0.0 + return self.accum_period_time + + def reset_accum_period_time(self): + """Reset the accumulated period time.""" + self.accum_period_time = 0.0 + + def set_debug_print(self, debug_print): + """Set the debug print flag.""" + self.DEBUG_PRINT = debug_print + + def get_config_dict(self): + """Return a dictionary of all configuration parameters.""" + return { + 'FOV_WIDTH': self.FOV_WIDTH, + 'FOV_HEIGHT': self.FOV_HEIGHT, + 'CAMERA_HEIGHT': self.CAMERA_HEIGHT, + 'NOT_FOUND_THRESHOLD': self.NOT_FOUND_THRESHOLD, + 'MAX_FB_STEP': self.MAX_FB_STEP, + 'MAX_RL_TURN': self.MAX_RL_TURN, + 'IN_PLACE_FB_STEP': self.IN_PLACE_FB_STEP, + 'MIN_FB_STEP': self.MIN_FB_STEP, + 'MIN_RL_TURN': self.MIN_RL_TURN, + 'UNIT_FB_STEP': self.UNIT_FB_STEP, + 'UNIT_RL_TURN': self.UNIT_RL_TURN, + 'SPOT_FB_OFFSET': self.SPOT_FB_OFFSET, + 'SPOT_RL_OFFSET': self.SPOT_RL_OFFSET, + 'SPOT_ANGLE_OFFSET': self.SPOT_ANGLE_OFFSET, + 'hip_pitch_offset': self.hip_pitch_offset, + 'curr_period_time': self.curr_period_time, + 'accum_period_time': self.accum_period_time, + 'DEBUG_PRINT': self.DEBUG_PRINT + } + + def load_config_dict(self, config_dict): + """Load configuration from a dictionary.""" + for key, value in config_dict.items(): + if hasattr(self, key): + setattr(self, key, value) + + class FollowerInitializeConfig: def __init__(self): diff --git a/humanoid_robot_intelligence_control_system_detection/src/tracker_config.py b/humanoid_robot_intelligence_control_system_detection/src/tracker_config.py index 5310436..43bd55a 100644 --- a/humanoid_robot_intelligence_control_system_detection/src/tracker_config.py +++ b/humanoid_robot_intelligence_control_system_detection/src/tracker_config.py @@ -40,6 +40,104 @@ class TrackerConfig: self.i_gain = get_param('i_gain', self.i_gain) self.d_gain = get_param('d_gain', self.d_gain) + def reset(self): + """Reset all values to their initial state.""" + self.__init__() + + def set_fov(self, width, height): + """Set the field of view.""" + self.FOV_WIDTH = width + self.FOV_HEIGHT = height + + def set_thresholds(self, not_found_threshold, waiting_threshold): + """Set the thresholds.""" + self.NOT_FOUND_THRESHOLD = not_found_threshold + self.WAITING_THRESHOLD = waiting_threshold + + def set_use_head_scan(self, use_head_scan): + """Set whether to use head scan.""" + self.use_head_scan = use_head_scan + + def set_debug_print(self, debug_print): + """Set the debug print flag.""" + self.DEBUG_PRINT = debug_print + + def set_pid_gains(self, p_gain, i_gain, d_gain): + """Set the PID gains.""" + self.p_gain = p_gain + self.i_gain = i_gain + self.d_gain = d_gain + + def get_pid_gains(self): + """Get the PID gains.""" + return self.p_gain, self.i_gain, self.d_gain + + def adjust_pid_gains(self, p_adjust=0, i_adjust=0, d_adjust=0): + """Adjust the PID gains by the given amounts.""" + self.p_gain += p_adjust + self.i_gain += i_adjust + self.d_gain += d_adjust + + def get_config_dict(self): + """Return a dictionary of all configuration parameters.""" + return { + 'FOV_WIDTH': self.FOV_WIDTH, + 'FOV_HEIGHT': self.FOV_HEIGHT, + 'NOT_FOUND_THRESHOLD': self.NOT_FOUND_THRESHOLD, + 'WAITING_THRESHOLD': self.WAITING_THRESHOLD, + 'use_head_scan': self.use_head_scan, + 'DEBUG_PRINT': self.DEBUG_PRINT, + 'p_gain': self.p_gain, + 'i_gain': self.i_gain, + 'd_gain': self.d_gain + } + + def load_config_dict(self, config_dict): + """Load configuration from a dictionary.""" + for key, value in config_dict.items(): + if hasattr(self, key): + setattr(self, key, value) + + def validate_config(self): + """Validate the configuration parameters.""" + assert 0 < self.FOV_WIDTH < math.pi, "FOV_WIDTH must be between 0 and pi" + assert 0 < self.FOV_HEIGHT < math.pi, "FOV_HEIGHT must be between 0 and pi" + assert self.NOT_FOUND_THRESHOLD > 0, "NOT_FOUND_THRESHOLD must be positive" + assert self.WAITING_THRESHOLD > 0, "WAITING_THRESHOLD must be positive" + assert isinstance(self.use_head_scan, bool), "use_head_scan must be a boolean" + assert isinstance(self.DEBUG_PRINT, bool), "DEBUG_PRINT must be a boolean" + assert self.p_gain >= 0, "p_gain must be non-negative" + assert self.i_gain >= 0, "i_gain must be non-negative" + assert self.d_gain >= 0, "d_gain must be non-negative" + + def print_config(self): + """Print the current configuration.""" + for key, value in self.get_config_dict().items(): + print(f"{key}: {value}") + + def to_ros_param(self): + """Convert the configuration to a format suitable for ROS parameters.""" + return { + 'tracker': { + 'fov': { + 'width': self.FOV_WIDTH, + 'height': self.FOV_HEIGHT + }, + 'thresholds': { + 'not_found': self.NOT_FOUND_THRESHOLD, + 'waiting': self.WAITING_THRESHOLD + }, + 'use_head_scan': self.use_head_scan, + 'debug_print': self.DEBUG_PRINT, + 'pid': { + 'p': self.p_gain, + 'i': self.i_gain, + 'd': self.d_gain + } + } + } + + class TrackerInitializeConfig: def __init__(self): diff --git a/humanoid_robot_intelligence_control_system_object_detector/CMakeLists.txt b/humanoid_robot_intelligence_control_system_movement/CMakeLists.txt similarity index 78% rename from humanoid_robot_intelligence_control_system_object_detector/CMakeLists.txt rename to humanoid_robot_intelligence_control_system_movement/CMakeLists.txt index 55bba24..ad4b2c6 100644 --- a/humanoid_robot_intelligence_control_system_object_detector/CMakeLists.txt +++ b/humanoid_robot_intelligence_control_system_movement/CMakeLists.txt @@ -13,24 +13,16 @@ # the License. cmake_minimum_required(VERSION 3.0.2) -project(humanoid_robot_intelligence_control_system_object_detector) +project(humanoid_robot_intelligence_control_system_movement) if($ENV{ROS_VERSION} EQUAL 1) find_package(catkin REQUIRED COMPONENTS rospy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge ) catkin_package( CATKIN_DEPENDS rospy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge ) else() @@ -38,16 +30,14 @@ else() find_package(ament_cmake_python REQUIRED) find_package(rclpy REQUIRED) find_package(std_msgs REQUIRED) - find_package(sensor_msgs REQUIRED) - find_package(geometry_msgs REQUIRED) - find_package(cv_bridge REQUIRED) endif() if($ENV{ROS_VERSION} EQUAL 1) catkin_python_setup() catkin_install_python(PROGRAMS - src/object_detection_processor.py + src/MovementController.py + src/HumanoidRobotController.py DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} ) @@ -59,7 +49,8 @@ else() ament_python_install_package(${PROJECT_NAME}) install(PROGRAMS - src/object_detection_processor.py + src/MovementController.py + src/HumanoidRobotController.py DESTINATION lib/${PROJECT_NAME} ) diff --git a/humanoid_robot_intelligence_control_system_speech_detector/package.xml b/humanoid_robot_intelligence_control_system_movement/package.xml similarity index 58% rename from humanoid_robot_intelligence_control_system_speech_detector/package.xml rename to humanoid_robot_intelligence_control_system_movement/package.xml index 416194b..7cc7687 100644 --- a/humanoid_robot_intelligence_control_system_speech_detector/package.xml +++ b/humanoid_robot_intelligence_control_system_movement/package.xml @@ -16,10 +16,10 @@ the License. --> - humanoid_robot_intelligence_control_system_speech_detector + humanoid_robot_intelligence_control_system_movement 0.0.1 - This Package is for Object Detection, detecting speechs like tools, or utilities + This Package is for Detection Math Apache 2.0 Ronaldson Bellande @@ -30,24 +30,9 @@ the License. rospy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge - image_transport - dynamic_reconfigure - message_generation - message_runtime rclpy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge - image_transport - rosidl_default_generators - rosidl_default_runtime python3-opencv diff --git a/humanoid_robot_intelligence_control_system_object_detector/setup.py b/humanoid_robot_intelligence_control_system_movement/setup.py similarity index 86% rename from humanoid_robot_intelligence_control_system_object_detector/setup.py rename to humanoid_robot_intelligence_control_system_movement/setup.py index a2eac39..1450a13 100644 --- a/humanoid_robot_intelligence_control_system_object_detector/setup.py +++ b/humanoid_robot_intelligence_control_system_movement/setup.py @@ -18,8 +18,8 @@ from catkin_pkg.python_setup import generate_distutils_setup # fetch values from package.xml setup_args = generate_distutils_setup( - scripts=['src/object_detection_processor.py'], - packages=['humanoid_robot_intelligence_control_system_object_detector'], + scripts=["src/MovementController.py", "src/HumanoidRobotController.py"], + packages=['humanoid_robot_intelligence_control_system_movement'], package_dir={'': 'src'}, ) diff --git a/humanoid_robot_intelligence_control_system_movement/src/HumanoidRobotController.py b/humanoid_robot_intelligence_control_system_movement/src/HumanoidRobotController.py new file mode 100644 index 0000000..13adf37 --- /dev/null +++ b/humanoid_robot_intelligence_control_system_movement/src/HumanoidRobotController.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import time +import math +from typing import Dict, List, Tuple, Any + +from humanoid_robot_intelligence_control_system_controller.PIDController import PIDController +from humanoid_robot_intelligence_control_system_detection.tracker_config import TrackerConfig +from humanoid_robot_intelligence_control_system_movement.MovementController import MovementController + + +class HumanoidRobotController: + def __init__(self, config: TrackerConfig, controller_type: str = "PID"): + self.config = config + self.controller_type = controller_type + self.controllers: Dict[str, Any] = {} + self.joint_positions: Dict[str, float] = {} + self.target_positions: Dict[str, float] = {} + self.movement_controller = MovementController(config, controller_type) + self.current_balance = 0.0 + self.current_speed = 0.0 + self.current_angle = 0.0 + self.current_position = (0.0, 0.0) + self.last_joint_positions = {} + self.control_loop_time = 0.01 # 100 Hz + + self.initialize_controllers() + self.initialize_sensors() + + + def initialize_controllers(self): + joints = [ + "head_pan", "head_tilt", + "left_shoulder_pitch", "left_shoulder_roll", "left_elbow", + "right_shoulder_pitch", "right_shoulder_roll", "right_elbow", + "left_hip_yaw", "left_hip_roll", "left_hip_pitch", "left_knee", "left_ankle_pitch", "left_ankle_roll", + "right_hip_yaw", "right_hip_roll", "right_hip_pitch", "right_knee", "right_ankle_pitch", "right_ankle_roll" + ] + + for joint in joints: + self.controllers[joint] = self._create_controller(joint) + self.joint_positions[joint] = 0.0 + self.target_positions[joint] = 0.0 + + self.controllers["balance"] = self._create_controller("balance") + self.controllers["walk_forward"] = self._create_controller("walk_forward") + self.controllers["turn"] = self._create_controller("turn") + + self.roll_controller = self._create_controller("roll") + self.pitch_controller = self._create_controller("pitch") + + + def _create_controller(self, controller_name: str) -> Any: + if self.controller_type == "PID": + return PIDController( + getattr(self.config, f"get_{controller_name}_pid_gains")(), + f"{controller_name}_controller", + output_limits=self.config.get_joint_limits(controller_name) + ) + elif self.controller_type == "BellandeController": + return BellandeController( + getattr(self.config, f"get_{controller_name}_bellande_params")(), + f"{controller_name}_controller" + ) + else: + raise ValueError(f"Unsupported controller type: {self.controller_type}") + + + def initialize_sensors(self): + self.imu = None + self.power_sensor = None + self.joint_sensors = None + + + def update_config(self, new_config: TrackerConfig): + self.config = new_config + for joint, controller in self.controllers.items(): + self._update_controller(controller, joint) + self.movement_controller.update_config(new_config) + + + def _update_controller(self, controller: Any, controller_name: str): + if self.controller_type == "PID": + controller.update_config(getattr(self.config, f"get_{controller_name}_pid_gains")()) + elif self.controller_type == "BellandeController": + controller.update_config(getattr(self.config, f"get_{controller_name}_bellande_params")()) + + + def set_target_position(self, joint: str, position: float): + if joint in self.target_positions: + self.target_positions[joint] = position + else: + raise ValueError(f"Unknown joint: {joint}") + + + def update_joint_position(self, joint: str, position: float): + if joint in self.joint_positions: + self.joint_positions[joint] = position + else: + raise ValueError(f"Unknown joint: {joint}") + + + def compute_joint_control(self, joint: str) -> float: + if joint not in self.controllers: + raise ValueError(f"Unknown joint: {joint}") + + return self.controllers[joint].compute( + self.target_positions[joint], + self.joint_positions[joint] + ) + + + def compute_all_joint_controls(self) -> Dict[str, float]: + return {joint: self.compute_joint_control(joint) for joint in self.joint_positions.keys()} + + + def compute_balance_control(self, current_balance: float) -> float: + return self.controllers["balance"].compute(0, current_balance) + + + def compute_walk_forward_control(self, current_speed: float, target_speed: float) -> float: + return self.controllers["walk_forward"].compute(target_speed, current_speed) + + + def compute_turn_control(self, current_angle: float, target_angle: float) -> float: + return self.controllers["turn"].compute(target_angle, current_angle) + + + def reset_all_controllers(self): + for controller in self.controllers.values(): + controller.reset() + + + def update_robot_state(self, joint_positions: Dict[str, float], balance: float, speed: float, angle: float, position: Tuple[float, float]): + for joint, position in joint_positions.items(): + self.update_joint_position(joint, position) + + self.current_balance = balance + self.current_speed = speed + self.current_angle = angle + self.current_position = position + + + def compute_full_robot_control(self, target_speed: float, target_angle: float, target_position: Tuple[float, float]) -> Dict[str, float]: + joint_controls = self.compute_all_joint_controls() + balance_control = self.compute_balance_control(self.current_balance) + walk_control = self.compute_walk_forward_control(self.current_speed, target_speed) + turn_control = self.compute_turn_control(self.current_angle, target_angle) + + translation, rotation = self.movement_controller.compute_path_control(self.current_position, target_position) + + return { + "joints": joint_controls, + "balance": balance_control, + "walk": walk_control, + "turn": turn_control, + "translation": translation, + "rotation": rotation + } + + + def execute_control(self, control: Dict[str, float]): + try: + # Send joint control commands + for joint, value in control['joints'].items(): + self.robot_interface.set_joint_position(joint, value) + + # Send balance control command + self.robot_interface.set_balance_adjustment(control['balance']) + + # Send walk and turn commands + self.robot_interface.set_walk_speed(control['walk']) + self.robot_interface.set_turn_rate(control['turn']) + + # Send translation and rotation commands + self.robot_interface.set_body_translation(control['translation']) + self.robot_interface.set_body_rotation(control['rotation']) + + # Verify that commands were executed + if not self.robot_interface.verify_last_command(): + raise Exception("Failed to execute control commands") + + except Exception as e: + print(f"Error executing control: {e}") + self.emergency_stop() + + + def get_robot_state(self) -> Dict[str, Any]: + joint_positions = self.joint_sensors.get_positions() + imu_data = self.imu.get_data() + return { + "joint_positions": joint_positions, + "balance": imu_data.acceleration.z, + "speed": math.sqrt(imu_data.velocity.x**2 + imu_data.velocity.y**2), + "angle": math.atan2(imu_data.orientation.y, imu_data.orientation.x), + "position": (imu_data.position.x, imu_data.position.y) + } + + + def balance_adjustment(self): + roll, pitch, yaw = self.imu.get_orientation() + roll_rate, pitch_rate, yaw_rate = self.imu.get_angular_velocity() + + roll_correction = self.roll_controller.compute(0, roll) + pitch_correction = self.pitch_controller.compute(0, pitch) + + self.set_target_position("left_hip_roll", self.joint_positions["left_hip_roll"] - roll_correction) + self.set_target_position("right_hip_roll", self.joint_positions["right_hip_roll"] + roll_correction) + self.set_target_position("left_ankle_roll", self.joint_positions["left_ankle_roll"] - roll_correction) + self.set_target_position("right_ankle_roll", self.joint_positions["right_ankle_roll"] + roll_correction) + self.set_target_position("left_ankle_pitch", self.joint_positions["left_ankle_pitch"] - pitch_correction) + self.set_target_position("right_ankle_pitch", self.joint_positions["right_ankle_pitch"] - pitch_correction) + + if abs(pitch) > self.config.get_max_stable_pitch(): + hip_pitch_correction = self.pitch_controller.compute(0, pitch) * 0.5 + self.set_target_position("left_hip_pitch", self.joint_positions["left_hip_pitch"] + hip_pitch_correction) + self.set_target_position("right_hip_pitch", self.joint_positions["right_hip_pitch"] + hip_pitch_correction) + + damping_factor = self.config.get_angular_velocity_damping_factor() + self.set_target_position("left_hip_roll", self.joint_positions["left_hip_roll"] - roll_rate * damping_factor) + self.set_target_position("right_hip_roll", self.joint_positions["right_hip_roll"] + roll_rate * damping_factor) + self.set_target_position("left_ankle_roll", self.joint_positions["left_ankle_roll"] - roll_rate * damping_factor) + self.set_target_position("right_ankle_roll", self.joint_positions["right_ankle_roll"] + roll_rate * damping_factor) + self.set_target_position("left_ankle_pitch", self.joint_positions["left_ankle_pitch"] - pitch_rate * damping_factor) + self.set_target_position("right_ankle_pitch", self.joint_positions["right_ankle_pitch"] - pitch_rate * damping_factor) + + + def check_joint_status(self) -> bool: + for joint, position in self.joint_positions.items(): + limits = self.config.get_joint_limits(joint) + if position < limits[0] or position > limits[1]: + print(f"Joint {joint} out of limits: {position}") + return False + + if joint in self.last_joint_positions: + velocity = (position - self.last_joint_positions[joint]) / self.control_loop_time + if abs(velocity) > self.config.get_max_joint_velocity(joint): + print(f"Joint {joint} velocity too high: {velocity}") + return False + + self.last_joint_positions = self.joint_positions.copy() + return True + + + def check_power_status(self) -> bool: + voltage = self.power_sensor.get_voltage() + current = self.power_sensor.get_current() + + if voltage < self.config.get_min_voltage(): + print(f"Voltage too low: {voltage}V") + return False + if current > self.config.get_max_current(): + print(f"Current too high: {current}A") + return False + + return True + + + def check_sensor_status(self) -> bool: + return self.imu.is_operational() and self.power_sensor.is_operational() and self.joint_sensors.is_operational() + + + def self_diagnosis(self) -> Dict[str, bool]: + return { + "joints": self.check_joint_status(), + "sensors": self.check_sensor_status(), + "power": self.check_power_status(), + } + + + def emergency_stop(self): + self.stop() + self.reset_all_controllers() + + print("Emergency stop activated!") + + + def stop(self): + zero_control = {key: 0.0 for key in self.controllers.keys()} + zero_control.update({"translation": 0.0, "rotation": 0.0}) + self.execute_control(zero_control) + + + def run_control_loop(self): + while True: + try: + loop_start_time = time.time() + + state = self.get_robot_state() + self.update_robot_state(**state) + self.balance_adjustment() + + diagnosis = self.self_diagnosis() + if not all(diagnosis.values()): + print("System check failed. Initiating emergency stop.") + self.emergency_stop() + break + + control = self.compute_full_robot_control(self.current_speed, self.current_angle, self.current_position) + self.execute_control(control) + + loop_end_time = time.time() + loop_duration = loop_end_time - loop_start_time + if loop_duration < self.control_loop_time: + time.sleep(self.control_loop_time - loop_duration) + + except Exception as e: + print(f"Unexpected error: {e}") + self.emergency_stop() + break + + print("HumanoidRobotController shutting down.") diff --git a/humanoid_robot_intelligence_control_system_movement/src/MovementController.py b/humanoid_robot_intelligence_control_system_movement/src/MovementController.py new file mode 100644 index 0000000..d64d2c6 --- /dev/null +++ b/humanoid_robot_intelligence_control_system_movement/src/MovementController.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import math +from typing import Tuple + +from humanoid_robot_intelligence_control_system_detection.PIDController import PIDController +from humanoid_robot_intelligence_control_system_detection.tracker_config import TrackerConfig + + +class MovementController: + def __init__(self, config: TrackerConfig, controller_type: str = "PID"): + self.config = config + self.controller_type = controller_type + self.translation_controller = self._create_controller("translation") + self.rotation_controller = self._create_controller("rotation") + self.path_controller = self._create_controller("path") + + def _create_controller(self, controller_name: str) -> Any: + if self.controller_type == "PID": + return PIDController( + getattr(self.config, f"get_{controller_name}_pid_gains")(), + f"{controller_name}_controller", + output_limits=(-1, 1) + ) + elif self.controller_type == "BellandeController": + return BellandeController( + getattr(self.config, f"get_{controller_name}_bellande_params")(), + f"{controller_name}_controller" + ) + else: + raise ValueError(f"Unsupported controller type: {self.controller_type}") + + def update_config(self, new_config: TrackerConfig): + self.config = new_config + self._update_controller(self.translation_controller, "translation") + self._update_controller(self.rotation_controller, "rotation") + self._update_controller(self.path_controller, "path") + + def _update_controller(self, controller: Any, controller_name: str): + if self.controller_type == "PID": + controller.update_config(getattr(self.config, f"get_{controller_name}_pid_gains")()) + elif self.controller_type == "BellandeController": + controller.update_config(getattr(self.config, f"get_{controller_name}_bellande_params")()) + + def compute_translation_control(self, current_position: float, target_position: float) -> float: + return self.translation_controller.compute(target_position, current_position) + + def compute_rotation_control(self, current_angle: float, target_angle: float) -> float: + return self.rotation_controller.compute(target_angle, current_angle) + + def compute_path_control(self, current_position: Tuple[float, float], target_position: Tuple[float, float]) -> Tuple[float, float]: + dx = target_position[0] - current_position[0] + dy = target_position[1] - current_position[1] + distance = (dx**2 + dy**2)**0.5 + angle = math.atan2(dy, dx) + translation = self.translation_controller.compute(0, -distance) + rotation = self.rotation_controller.compute(angle, 0) + return translation, rotation diff --git a/humanoid_robot_intelligence_control_system_object_detector/launch/object_detector_processor.launch.py b/humanoid_robot_intelligence_control_system_object_detector/launch/object_detector_processor.launch.py deleted file mode 100644 index e61d87a..0000000 --- a/humanoid_robot_intelligence_control_system_object_detector/launch/object_detector_processor.launch.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import sys -import subprocess -from launch import LaunchDescription -from launch_ros.actions import Node -from launch.actions import DeclareLaunchArgument -from launch.substitutions import LaunchConfiguration - -def ros1_launch_description(): - # Get command-line arguments - args = sys.argv[1:] - - # Construct the ROS 1 launch command - roslaunch_command = ["roslaunch", "humanoid_robot_intelligence_control_system_object_detector", "object_detector_processor.launch"] + args - - roslaunch_command.extend([ - "usb_cam", "usb_cam_node", "name:=camera", - "video_device:=/dev/video0", - "image_width:=640", - "image_height:=480", - "pixel_format:=yuyv", - "camera_frame_id:=usb_cam", - "io_method:=mmap" - ]) - - roslaunch_command.extend([ - "ros_web_api_bellande_2d_computer_vision", "bellande_2d_computer_vision_object_detection.py", "name:=object_detection_node" - ]) - - roslaunch_command.extend([ - "humanoid_robot_intelligence_control_system_ball_detector", "object_detection_processor.py", "name:=object_detection_processor_node" - ]) - - roslaunch_command.extend([ - "rviz", "rviz", "name:=rviz", - "args:=-d $(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz" - ]) - - # Execute the launch command - subprocess.call(roslaunch_command) - -def ros2_launch_description(): - nodes_to_launch = [] - - nodes_to_launch.append(Node( - package='usb_cam', - executable='usb_cam_node', - name='camera', - output='screen', - parameters=[{ - 'video_device': '/dev/video0', - 'image_width': 640, - 'image_height': 480, - 'pixel_format': 'yuyv', - 'camera_frame_id': 'usb_cam', - 'io_method': 'mmap' - }] - )) - - nodes_to_launch.append(Node( - package='ros_web_api_bellande_2d_computer_vision', - executable='bellande_2d_computer_vision_object_detection.py', - name='object_detection_node', - output='screen', - remappings=[('camera/image_raw', '/usb_cam/image_raw')] - )) - - nodes_to_launch.append(Node( - package='humanoid_robot_intelligence_control_system_object_detector', - executable='object_detection_processor.py', - name='object_detection_processor_node', - output='screen', - parameters=[{'yaml_path': '$(find ros_web_api_bellande_2d_computer_vision)/yaml/object_detection_params.yaml'}] - )) - - nodes_to_launch.append(Node( - package='rviz2', - executable='rviz2', - name='rviz', - arguments=['-d', '$(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz'] - )) - - return LaunchDescription(nodes_to_launch) - -if __name__ == "__main__": - ros_version = os.getenv("ROS_VERSION") - if ros_version == "1": - ros1_launch_description() - elif ros_version == "2": - ros2_launch_description() - else: - print("Unsupported ROS version. Please set the ROS_VERSION environment variable to '1' for ROS 1 or '2' for ROS 2.") - sys.exit(1) diff --git a/humanoid_robot_intelligence_control_system_object_detector/launch/ros1/object_detector_processor.launch b/humanoid_robot_intelligence_control_system_object_detector/launch/ros1/object_detector_processor.launch deleted file mode 100644 index 6799e95..0000000 --- a/humanoid_robot_intelligence_control_system_object_detector/launch/ros1/object_detector_processor.launch +++ /dev/null @@ -1,41 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/humanoid_robot_intelligence_control_system_object_detector/package.xml b/humanoid_robot_intelligence_control_system_object_detector/package.xml deleted file mode 100644 index 706fe9e..0000000 --- a/humanoid_robot_intelligence_control_system_object_detector/package.xml +++ /dev/null @@ -1,61 +0,0 @@ - - - - - humanoid_robot_intelligence_control_system_object_detector - 0.0.1 - - This Package is for Object Detection, detecting objects like tools, or utilities - - Apache 2.0 - Ronaldson Bellande - - catkin - ament_cmake - ament_cmake_python - - - rospy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge - image_transport - dynamic_reconfigure - message_generation - message_runtime - - - rclpy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge - image_transport - rosidl_default_generators - rosidl_default_runtime - - - python3-opencv - python3-yaml - usb_cam - - - catkin - ament_cmake - - diff --git a/humanoid_robot_intelligence_control_system_object_detector/src/object_detection_processor.py b/humanoid_robot_intelligence_control_system_object_detector/src/object_detection_processor.py deleted file mode 100644 index 6f44d68..0000000 --- a/humanoid_robot_intelligence_control_system_object_detector/src/object_detection_processor.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import rospy -import cv2 -import numpy as np -from sensor_msgs.msg import Image, CameraInfo -from std_msgs.msg import Bool, String -from cv_bridge import CvBridge -import yaml - -class ObjectDetectionProcessor: - def __init__(self): - rospy.init_node('object_detection_processor') - self.bridge = CvBridge() - self.enable = True - self.new_image_flag = False - self.load_params() - self.setup_ros() - - def load_params(self): - param_path = rospy.get_param('~yaml_path', '') - if param_path: - with open(param_path, 'r') as file: - self.params = yaml.safe_load(file) - else: - self.set_default_params() - - def set_default_params(self): - self.params = { - 'debug': False, - 'ellipse_size': 2, - # Add other default parameters as needed - } - - def setup_ros(self): - self.image_pub = rospy.Publisher('image_out', Image, queue_size=10) - self.camera_info_pub = rospy.Publisher('camera_info', CameraInfo, queue_size=10) - - rospy.Subscriber('enable', Bool, self.enable_callback) - rospy.Subscriber('image_in', Image, self.image_callback) - rospy.Subscriber('cameraInfo_in', CameraInfo, self.camera_info_callback) - rospy.Subscriber('object_detection_result', String, self.object_detection_callback) - - def enable_callback(self, msg): - self.enable = msg.data - - def image_callback(self, msg): - if not self.enable: - return - self.cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") - self.new_image_flag = True - self.image_header = msg.header - - def camera_info_callback(self, msg): - if not self.enable: - return - self.camera_info_msg = msg - - def object_detection_callback(self, msg): - if not self.enable or not hasattr(self, 'cv_image'): - return - - objects = eval(msg.data) # Assuming the data is a string representation of a list - self.process_detected_objects(objects) - - def process_detected_objects(self, objects): - output_image = self.cv_image.copy() - for obj in objects: - x, y, w, h = obj['bbox'] - cv2.rectangle(output_image, (int(x), int(y)), (int(x+w), int(y+h)), (0, 255, 0), 2) - cv2.putText(output_image, f"{obj['label']}: {obj['confidence']:.2f}", - (int(x), int(y-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) - - self.publish_image(output_image) - - def publish_image(self, image): - img_msg = self.bridge.cv2_to_imgmsg(image, encoding="bgr8") - img_msg.header = self.image_header - self.image_pub.publish(img_msg) - if hasattr(self, 'camera_info_msg'): - self.camera_info_pub.publish(self.camera_info_msg) - - def run(self): - rate = rospy.Rate(30) # 30 Hz - while not rospy.is_shutdown(): - if self.new_image_flag: - # The processing is done in object_detection_callback - self.new_image_flag = False - rate.sleep() - -if __name__ == '__main__': - try: - processor = ObjectDetectionProcessor() - processor.run() - except rospy.ROSInterruptException: - pass diff --git a/humanoid_robot_intelligence_control_system_speech_detector/CMakeLists.txt b/humanoid_robot_intelligence_control_system_speech_detector/CMakeLists.txt deleted file mode 100644 index 78abc20..0000000 --- a/humanoid_robot_intelligence_control_system_speech_detector/CMakeLists.txt +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -cmake_minimum_required(VERSION 3.0.2) -project(humanoid_robot_intelligence_control_system_speech_detector) - -if($ENV{ROS_VERSION} EQUAL 1) - find_package(catkin REQUIRED COMPONENTS - rospy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge - ) - - catkin_package( - CATKIN_DEPENDS - rospy - std_msgs - sensor_msgs - geometry_msgs - cv_bridge - ) - -else() - find_package(ament_cmake REQUIRED) - find_package(ament_cmake_python REQUIRED) - find_package(rclpy REQUIRED) - find_package(std_msgs REQUIRED) - find_package(sensor_msgs REQUIRED) - find_package(geometry_msgs REQUIRED) - find_package(cv_bridge REQUIRED) -endif() - -if($ENV{ROS_VERSION} EQUAL 1) - catkin_python_setup() - - catkin_install_python(PROGRAMS - src/speech_detection_processor.py - DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} - ) - - install(DIRECTORY config launch rviz - DESTINATION ${CATKIN_PACKAGE_SHARE_DESTINATION} - ) - -else() - ament_python_install_package(${PROJECT_NAME}) - - install(PROGRAMS - src/speech_detection_processor.py - DESTINATION lib/${PROJECT_NAME} - ) - - install(DIRECTORY config launch rviz - DESTINATION share/${PROJECT_NAME} - ) - - ament_package() -endif() diff --git a/humanoid_robot_intelligence_control_system_speech_detector/launch/ros1/speech_detector_processor.launch b/humanoid_robot_intelligence_control_system_speech_detector/launch/ros1/speech_detector_processor.launch deleted file mode 100644 index 014521f..0000000 --- a/humanoid_robot_intelligence_control_system_speech_detector/launch/ros1/speech_detector_processor.launch +++ /dev/null @@ -1,41 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/humanoid_robot_intelligence_control_system_speech_detector/launch/speech_detector_processor.launch.py b/humanoid_robot_intelligence_control_system_speech_detector/launch/speech_detector_processor.launch.py deleted file mode 100644 index c11ca70..0000000 --- a/humanoid_robot_intelligence_control_system_speech_detector/launch/speech_detector_processor.launch.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import sys -import subprocess -from launch import LaunchDescription -from launch_ros.actions import Node -from launch.actions import DeclareLaunchArgument -from launch.substitutions import LaunchConfiguration - -def ros1_launch_description(): - # Get command-line arguments - args = sys.argv[1:] - - # Construct the ROS 1 launch command - roslaunch_command = ["roslaunch", "humanoid_robot_intelligence_control_system_speech_detector", "speech_detector_processor.launch"] + args - - roslaunch_command.extend([ - "usb_cam", "usb_cam_node", "name:=camera", - "video_device:=/dev/video0", - "image_width:=640", - "image_height:=480", - "pixel_format:=yuyv", - "camera_frame_id:=usb_cam", - "io_method:=mmap" - ]) - - roslaunch_command.extend([ - "ros_web_api_bellande_2d_computer_vision", "bellande_2d_computer_vision_speech_detection.py", "name:=speech_detection_node" - ]) - - roslaunch_command.extend([ - "humanoid_robot_intelligence_control_system_ball_detector", "speech_detection_processor.py", "name:=speech_detection_processor_node" - ]) - - roslaunch_command.extend([ - "rviz", "rviz", "name:=rviz", - "args:=-d $(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz" - ]) - - # Execute the launch command - subprocess.call(roslaunch_command) - -def ros2_launch_description(): - nodes_to_launch = [] - - nodes_to_launch.append(Node( - package='usb_cam', - executable='usb_cam_node', - name='camera', - output='screen', - parameters=[{ - 'video_device': '/dev/video0', - 'image_width': 640, - 'image_height': 480, - 'pixel_format': 'yuyv', - 'camera_frame_id': 'usb_cam', - 'io_method': 'mmap' - }] - )) - - nodes_to_launch.append(Node( - package='ros_web_api_bellande_2d_computer_vision', - executable='bellande_2d_computer_vision_speech_detection.py', - name='speech_detection_node', - output='screen', - remappings=[('camera/image_raw', '/usb_cam/image_raw')] - )) - - nodes_to_launch.append(Node( - package='humanoid_robot_intelligence_control_system_speech_detector', - executable='speech_detection_processor.py', - name='speech_detection_processor_node', - output='screen', - parameters=[{'yaml_path': '$(find ros_web_api_bellande_2d_computer_vision)/yaml/speech_detection_params.yaml'}] - )) - - nodes_to_launch.append(Node( - package='rviz2', - executable='rviz2', - name='rviz', - arguments=['-d', '$(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz'] - )) - - return LaunchDescription(nodes_to_launch) - -if __name__ == "__main__": - ros_version = os.getenv("ROS_VERSION") - if ros_version == "1": - ros1_launch_description() - elif ros_version == "2": - ros2_launch_description() - else: - print("Unsupported ROS version. Please set the ROS_VERSION environment variable to '1' for ROS 1 or '2' for ROS 2.") - sys.exit(1) diff --git a/humanoid_robot_intelligence_control_system_speech_detector/setup.py b/humanoid_robot_intelligence_control_system_speech_detector/setup.py deleted file mode 100644 index f4a55e3..0000000 --- a/humanoid_robot_intelligence_control_system_speech_detector/setup.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from distutils.core import setup -from catkin_pkg.python_setup import generate_distutils_setup - -# fetch values from package.xml -setup_args = generate_distutils_setup( - scripts=['src/speech_detection_processor.py'], - packages=['humanoid_robot_intelligence_control_system_speech_detector'], - package_dir={'': 'src'}, -) - -setup(**setup_args) diff --git a/humanoid_robot_intelligence_control_system_speech_detector/src/speech_detection_processor.py b/humanoid_robot_intelligence_control_system_speech_detector/src/speech_detection_processor.py deleted file mode 100644 index fcc60d2..0000000 --- a/humanoid_robot_intelligence_control_system_speech_detector/src/speech_detection_processor.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import rospy -import cv2 -import numpy as np -from sensor_msgs.msg import Image, CameraInfo -from std_msgs.msg import Bool, String -from cv_bridge import CvBridge -import yaml - -class ObjectDetectionProcessor: - def __init__(self): - rospy.init_node('speech_detection_processor') - self.bridge = CvBridge() - self.enable = True - self.new_image_flag = False - self.load_params() - self.setup_ros() - - def load_params(self): - param_path = rospy.get_param('~yaml_path', '') - if param_path: - with open(param_path, 'r') as file: - self.params = yaml.safe_load(file) - else: - self.set_default_params() - - def set_default_params(self): - self.params = { - 'debug': False, - 'ellipse_size': 2, - # Add other default parameters as needed - } - - def setup_ros(self): - self.image_pub = rospy.Publisher('image_out', Image, queue_size=10) - self.camera_info_pub = rospy.Publisher('camera_info', CameraInfo, queue_size=10) - - rospy.Subscriber('enable', Bool, self.enable_callback) - rospy.Subscriber('image_in', Image, self.image_callback) - rospy.Subscriber('cameraInfo_in', CameraInfo, self.camera_info_callback) - rospy.Subscriber('speech_detection_result', String, self.speech_detection_callback) - - def enable_callback(self, msg): - self.enable = msg.data - - def image_callback(self, msg): - if not self.enable: - return - self.cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8") - self.new_image_flag = True - self.image_header = msg.header - - def camera_info_callback(self, msg): - if not self.enable: - return - self.camera_info_msg = msg - - def speech_detection_callback(self, msg): - if not self.enable or not hasattr(self, 'cv_image'): - return - - speechs = eval(msg.data) # Assuming the data is a string representation of a list - self.process_detected_speechs(speechs) - - def process_detected_speechs(self, speechs): - output_image = self.cv_image.copy() - for obj in speechs: - x, y, w, h = obj['bbox'] - cv2.rectangle(output_image, (int(x), int(y)), (int(x+w), int(y+h)), (0, 255, 0), 2) - cv2.putText(output_image, f"{obj['label']}: {obj['confidence']:.2f}", - (int(x), int(y-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) - - self.publish_image(output_image) - - def publish_image(self, image): - img_msg = self.bridge.cv2_to_imgmsg(image, encoding="bgr8") - img_msg.header = self.image_header - self.image_pub.publish(img_msg) - if hasattr(self, 'camera_info_msg'): - self.camera_info_pub.publish(self.camera_info_msg) - - def run(self): - rate = rospy.Rate(30) # 30 Hz - while not rospy.is_shutdown(): - if self.new_image_flag: - # The processing is done in speech_detection_callback - self.new_image_flag = False - rate.sleep() - -if __name__ == '__main__': - try: - processor = ObjectDetectionProcessor() - processor.run() - except rospy.ROSInterruptException: - pass