Merge pull request #27 from RonaldsonBellande/main

seperate
This commit is contained in:
Ronaldson Bellande 2024-07-26 17:18:18 -04:00 committed by GitHub
commit 7cf234124c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 594 additions and 712 deletions

View File

@ -17,7 +17,7 @@
import math
def calculate_distance_to_object(camera_height, tilt, hip_pitch_offset, object_size):
def calculate_distance_to(camera_height, tilt, hip_pitch_offset, object_size):
distance = camera_height * math.tan(math.pi * 0.5 + tilt - hip_pitch_offset - object_size)
return abs(distance)

View File

@ -58,6 +58,92 @@ class FollowerConfig:
self.curr_period_time = get_param('curr_period_time', self.curr_period_time)
self.DEBUG_PRINT = get_param('debug_print', self.DEBUG_PRINT)
def reset(self):
"""Reset all values to their initial state."""
self.__init__()
def set_fov(self, width, height):
"""Set the field of view."""
self.FOV_WIDTH = width
self.FOV_HEIGHT = height
def set_camera_height(self, height):
"""Set the camera height."""
self.CAMERA_HEIGHT = height
def set_thresholds(self, not_found_threshold):
"""Set the thresholds."""
self.NOT_FOUND_THRESHOLD = not_found_threshold
def set_step_parameters(self, max_fb, max_rl, in_place_fb, min_fb, min_rl, unit_fb, unit_rl):
"""Set the step parameters."""
self.MAX_FB_STEP = max_fb
self.MAX_RL_TURN = max_rl
self.IN_PLACE_FB_STEP = in_place_fb
self.MIN_FB_STEP = min_fb
self.MIN_RL_TURN = min_rl
self.UNIT_FB_STEP = unit_fb
self.UNIT_RL_TURN = unit_rl
def set_spot_offsets(self, fb_offset, rl_offset, angle_offset):
"""Set the spot offsets."""
self.SPOT_FB_OFFSET = fb_offset
self.SPOT_RL_OFFSET = rl_offset
self.SPOT_ANGLE_OFFSET = angle_offset
def set_hip_pitch_offset(self, offset):
"""Set the hip pitch offset."""
self.hip_pitch_offset = offset
def set_period_time(self, period_time):
"""Set the current period time."""
self.curr_period_time = period_time
def update_accum_period_time(self, delta_time):
"""Update the accumulated period time."""
self.accum_period_time += delta_time
if self.accum_period_time > (self.curr_period_time / 4):
self.accum_period_time = 0.0
return self.accum_period_time
def reset_accum_period_time(self):
"""Reset the accumulated period time."""
self.accum_period_time = 0.0
def set_debug_print(self, debug_print):
"""Set the debug print flag."""
self.DEBUG_PRINT = debug_print
def get_config_dict(self):
"""Return a dictionary of all configuration parameters."""
return {
'FOV_WIDTH': self.FOV_WIDTH,
'FOV_HEIGHT': self.FOV_HEIGHT,
'CAMERA_HEIGHT': self.CAMERA_HEIGHT,
'NOT_FOUND_THRESHOLD': self.NOT_FOUND_THRESHOLD,
'MAX_FB_STEP': self.MAX_FB_STEP,
'MAX_RL_TURN': self.MAX_RL_TURN,
'IN_PLACE_FB_STEP': self.IN_PLACE_FB_STEP,
'MIN_FB_STEP': self.MIN_FB_STEP,
'MIN_RL_TURN': self.MIN_RL_TURN,
'UNIT_FB_STEP': self.UNIT_FB_STEP,
'UNIT_RL_TURN': self.UNIT_RL_TURN,
'SPOT_FB_OFFSET': self.SPOT_FB_OFFSET,
'SPOT_RL_OFFSET': self.SPOT_RL_OFFSET,
'SPOT_ANGLE_OFFSET': self.SPOT_ANGLE_OFFSET,
'hip_pitch_offset': self.hip_pitch_offset,
'curr_period_time': self.curr_period_time,
'accum_period_time': self.accum_period_time,
'DEBUG_PRINT': self.DEBUG_PRINT
}
def load_config_dict(self, config_dict):
"""Load configuration from a dictionary."""
for key, value in config_dict.items():
if hasattr(self, key):
setattr(self, key, value)
class FollowerInitializeConfig:
def __init__(self):

View File

@ -40,6 +40,104 @@ class TrackerConfig:
self.i_gain = get_param('i_gain', self.i_gain)
self.d_gain = get_param('d_gain', self.d_gain)
def reset(self):
"""Reset all values to their initial state."""
self.__init__()
def set_fov(self, width, height):
"""Set the field of view."""
self.FOV_WIDTH = width
self.FOV_HEIGHT = height
def set_thresholds(self, not_found_threshold, waiting_threshold):
"""Set the thresholds."""
self.NOT_FOUND_THRESHOLD = not_found_threshold
self.WAITING_THRESHOLD = waiting_threshold
def set_use_head_scan(self, use_head_scan):
"""Set whether to use head scan."""
self.use_head_scan = use_head_scan
def set_debug_print(self, debug_print):
"""Set the debug print flag."""
self.DEBUG_PRINT = debug_print
def set_pid_gains(self, p_gain, i_gain, d_gain):
"""Set the PID gains."""
self.p_gain = p_gain
self.i_gain = i_gain
self.d_gain = d_gain
def get_pid_gains(self):
"""Get the PID gains."""
return self.p_gain, self.i_gain, self.d_gain
def adjust_pid_gains(self, p_adjust=0, i_adjust=0, d_adjust=0):
"""Adjust the PID gains by the given amounts."""
self.p_gain += p_adjust
self.i_gain += i_adjust
self.d_gain += d_adjust
def get_config_dict(self):
"""Return a dictionary of all configuration parameters."""
return {
'FOV_WIDTH': self.FOV_WIDTH,
'FOV_HEIGHT': self.FOV_HEIGHT,
'NOT_FOUND_THRESHOLD': self.NOT_FOUND_THRESHOLD,
'WAITING_THRESHOLD': self.WAITING_THRESHOLD,
'use_head_scan': self.use_head_scan,
'DEBUG_PRINT': self.DEBUG_PRINT,
'p_gain': self.p_gain,
'i_gain': self.i_gain,
'd_gain': self.d_gain
}
def load_config_dict(self, config_dict):
"""Load configuration from a dictionary."""
for key, value in config_dict.items():
if hasattr(self, key):
setattr(self, key, value)
def validate_config(self):
"""Validate the configuration parameters."""
assert 0 < self.FOV_WIDTH < math.pi, "FOV_WIDTH must be between 0 and pi"
assert 0 < self.FOV_HEIGHT < math.pi, "FOV_HEIGHT must be between 0 and pi"
assert self.NOT_FOUND_THRESHOLD > 0, "NOT_FOUND_THRESHOLD must be positive"
assert self.WAITING_THRESHOLD > 0, "WAITING_THRESHOLD must be positive"
assert isinstance(self.use_head_scan, bool), "use_head_scan must be a boolean"
assert isinstance(self.DEBUG_PRINT, bool), "DEBUG_PRINT must be a boolean"
assert self.p_gain >= 0, "p_gain must be non-negative"
assert self.i_gain >= 0, "i_gain must be non-negative"
assert self.d_gain >= 0, "d_gain must be non-negative"
def print_config(self):
"""Print the current configuration."""
for key, value in self.get_config_dict().items():
print(f"{key}: {value}")
def to_ros_param(self):
"""Convert the configuration to a format suitable for ROS parameters."""
return {
'tracker': {
'fov': {
'width': self.FOV_WIDTH,
'height': self.FOV_HEIGHT
},
'thresholds': {
'not_found': self.NOT_FOUND_THRESHOLD,
'waiting': self.WAITING_THRESHOLD
},
'use_head_scan': self.use_head_scan,
'debug_print': self.DEBUG_PRINT,
'pid': {
'p': self.p_gain,
'i': self.i_gain,
'd': self.d_gain
}
}
}
class TrackerInitializeConfig:
def __init__(self):

View File

@ -13,24 +13,16 @@
# the License.
cmake_minimum_required(VERSION 3.0.2)
project(humanoid_robot_intelligence_control_system_object_detector)
project(humanoid_robot_intelligence_control_system_movement)
if($ENV{ROS_VERSION} EQUAL 1)
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
sensor_msgs
geometry_msgs
cv_bridge
)
catkin_package(
CATKIN_DEPENDS
rospy
std_msgs
sensor_msgs
geometry_msgs
cv_bridge
)
else()
@ -38,16 +30,14 @@ else()
find_package(ament_cmake_python REQUIRED)
find_package(rclpy REQUIRED)
find_package(std_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(cv_bridge REQUIRED)
endif()
if($ENV{ROS_VERSION} EQUAL 1)
catkin_python_setup()
catkin_install_python(PROGRAMS
src/object_detection_processor.py
src/MovementController.py
src/HumanoidRobotController.py
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
@ -59,7 +49,8 @@ else()
ament_python_install_package(${PROJECT_NAME})
install(PROGRAMS
src/object_detection_processor.py
src/MovementController.py
src/HumanoidRobotController.py
DESTINATION lib/${PROJECT_NAME}
)

View File

@ -16,10 +16,10 @@ the License.
-->
<package format="3">
<name>humanoid_robot_intelligence_control_system_speech_detector</name>
<name>humanoid_robot_intelligence_control_system_movement</name>
<version>0.0.1</version>
<description>
This Package is for Object Detection, detecting speechs like tools, or utilities
This Package is for Detection Math
</description>
<license>Apache 2.0</license>
<maintainer email="ronaldsonbellande@gmail.com">Ronaldson Bellande</maintainer>
@ -30,24 +30,9 @@ the License.
<!-- ROS 1 dependencies -->
<depend condition="$ROS_VERSION == 1">rospy</depend>
<depend condition="$ROS_VERSION == 1">std_msgs</depend>
<depend condition="$ROS_VERSION == 1">sensor_msgs</depend>
<depend condition="$ROS_VERSION == 1">geometry_msgs</depend>
<depend condition="$ROS_VERSION == 1">cv_bridge</depend>
<depend condition="$ROS_VERSION == 1">image_transport</depend>
<depend condition="$ROS_VERSION == 1">dynamic_reconfigure</depend>
<depend condition="$ROS_VERSION == 1">message_generation</depend>
<depend condition="$ROS_VERSION == 1">message_runtime</depend>
<!-- ROS 2 dependencies -->
<depend condition="$ROS_VERSION == 2">rclpy</depend>
<depend condition="$ROS_VERSION == 2">std_msgs</depend>
<depend condition="$ROS_VERSION == 2">sensor_msgs</depend>
<depend condition="$ROS_VERSION == 2">geometry_msgs</depend>
<depend condition="$ROS_VERSION == 2">cv_bridge</depend>
<depend condition="$ROS_VERSION == 2">image_transport</depend>
<depend condition="$ROS_VERSION == 2">rosidl_default_generators</depend>
<depend condition="$ROS_VERSION == 2">rosidl_default_runtime</depend>
<!-- Common dependencies -->
<depend>python3-opencv</depend>

View File

@ -18,8 +18,8 @@ from catkin_pkg.python_setup import generate_distutils_setup
# fetch values from package.xml
setup_args = generate_distutils_setup(
scripts=['src/object_detection_processor.py'],
packages=['humanoid_robot_intelligence_control_system_object_detector'],
scripts=["src/MovementController.py", "src/HumanoidRobotController.py"],
packages=['humanoid_robot_intelligence_control_system_movement'],
package_dir={'': 'src'},
)

View File

@ -0,0 +1,327 @@
#!/usr/bin/env python3
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import math
from typing import Dict, List, Tuple, Any
from humanoid_robot_intelligence_control_system_controller.PIDController import PIDController
from humanoid_robot_intelligence_control_system_detection.tracker_config import TrackerConfig
from humanoid_robot_intelligence_control_system_movement.MovementController import MovementController
class HumanoidRobotController:
def __init__(self, config: TrackerConfig, controller_type: str = "PID"):
self.config = config
self.controller_type = controller_type
self.controllers: Dict[str, Any] = {}
self.joint_positions: Dict[str, float] = {}
self.target_positions: Dict[str, float] = {}
self.movement_controller = MovementController(config, controller_type)
self.current_balance = 0.0
self.current_speed = 0.0
self.current_angle = 0.0
self.current_position = (0.0, 0.0)
self.last_joint_positions = {}
self.control_loop_time = 0.01 # 100 Hz
self.initialize_controllers()
self.initialize_sensors()
def initialize_controllers(self):
joints = [
"head_pan", "head_tilt",
"left_shoulder_pitch", "left_shoulder_roll", "left_elbow",
"right_shoulder_pitch", "right_shoulder_roll", "right_elbow",
"left_hip_yaw", "left_hip_roll", "left_hip_pitch", "left_knee", "left_ankle_pitch", "left_ankle_roll",
"right_hip_yaw", "right_hip_roll", "right_hip_pitch", "right_knee", "right_ankle_pitch", "right_ankle_roll"
]
for joint in joints:
self.controllers[joint] = self._create_controller(joint)
self.joint_positions[joint] = 0.0
self.target_positions[joint] = 0.0
self.controllers["balance"] = self._create_controller("balance")
self.controllers["walk_forward"] = self._create_controller("walk_forward")
self.controllers["turn"] = self._create_controller("turn")
self.roll_controller = self._create_controller("roll")
self.pitch_controller = self._create_controller("pitch")
def _create_controller(self, controller_name: str) -> Any:
if self.controller_type == "PID":
return PIDController(
getattr(self.config, f"get_{controller_name}_pid_gains")(),
f"{controller_name}_controller",
output_limits=self.config.get_joint_limits(controller_name)
)
elif self.controller_type == "BellandeController":
return BellandeController(
getattr(self.config, f"get_{controller_name}_bellande_params")(),
f"{controller_name}_controller"
)
else:
raise ValueError(f"Unsupported controller type: {self.controller_type}")
def initialize_sensors(self):
self.imu = None
self.power_sensor = None
self.joint_sensors = None
def update_config(self, new_config: TrackerConfig):
self.config = new_config
for joint, controller in self.controllers.items():
self._update_controller(controller, joint)
self.movement_controller.update_config(new_config)
def _update_controller(self, controller: Any, controller_name: str):
if self.controller_type == "PID":
controller.update_config(getattr(self.config, f"get_{controller_name}_pid_gains")())
elif self.controller_type == "BellandeController":
controller.update_config(getattr(self.config, f"get_{controller_name}_bellande_params")())
def set_target_position(self, joint: str, position: float):
if joint in self.target_positions:
self.target_positions[joint] = position
else:
raise ValueError(f"Unknown joint: {joint}")
def update_joint_position(self, joint: str, position: float):
if joint in self.joint_positions:
self.joint_positions[joint] = position
else:
raise ValueError(f"Unknown joint: {joint}")
def compute_joint_control(self, joint: str) -> float:
if joint not in self.controllers:
raise ValueError(f"Unknown joint: {joint}")
return self.controllers[joint].compute(
self.target_positions[joint],
self.joint_positions[joint]
)
def compute_all_joint_controls(self) -> Dict[str, float]:
return {joint: self.compute_joint_control(joint) for joint in self.joint_positions.keys()}
def compute_balance_control(self, current_balance: float) -> float:
return self.controllers["balance"].compute(0, current_balance)
def compute_walk_forward_control(self, current_speed: float, target_speed: float) -> float:
return self.controllers["walk_forward"].compute(target_speed, current_speed)
def compute_turn_control(self, current_angle: float, target_angle: float) -> float:
return self.controllers["turn"].compute(target_angle, current_angle)
def reset_all_controllers(self):
for controller in self.controllers.values():
controller.reset()
def update_robot_state(self, joint_positions: Dict[str, float], balance: float, speed: float, angle: float, position: Tuple[float, float]):
for joint, position in joint_positions.items():
self.update_joint_position(joint, position)
self.current_balance = balance
self.current_speed = speed
self.current_angle = angle
self.current_position = position
def compute_full_robot_control(self, target_speed: float, target_angle: float, target_position: Tuple[float, float]) -> Dict[str, float]:
joint_controls = self.compute_all_joint_controls()
balance_control = self.compute_balance_control(self.current_balance)
walk_control = self.compute_walk_forward_control(self.current_speed, target_speed)
turn_control = self.compute_turn_control(self.current_angle, target_angle)
translation, rotation = self.movement_controller.compute_path_control(self.current_position, target_position)
return {
"joints": joint_controls,
"balance": balance_control,
"walk": walk_control,
"turn": turn_control,
"translation": translation,
"rotation": rotation
}
def execute_control(self, control: Dict[str, float]):
try:
# Send joint control commands
for joint, value in control['joints'].items():
self.robot_interface.set_joint_position(joint, value)
# Send balance control command
self.robot_interface.set_balance_adjustment(control['balance'])
# Send walk and turn commands
self.robot_interface.set_walk_speed(control['walk'])
self.robot_interface.set_turn_rate(control['turn'])
# Send translation and rotation commands
self.robot_interface.set_body_translation(control['translation'])
self.robot_interface.set_body_rotation(control['rotation'])
# Verify that commands were executed
if not self.robot_interface.verify_last_command():
raise Exception("Failed to execute control commands")
except Exception as e:
print(f"Error executing control: {e}")
self.emergency_stop()
def get_robot_state(self) -> Dict[str, Any]:
joint_positions = self.joint_sensors.get_positions()
imu_data = self.imu.get_data()
return {
"joint_positions": joint_positions,
"balance": imu_data.acceleration.z,
"speed": math.sqrt(imu_data.velocity.x**2 + imu_data.velocity.y**2),
"angle": math.atan2(imu_data.orientation.y, imu_data.orientation.x),
"position": (imu_data.position.x, imu_data.position.y)
}
def balance_adjustment(self):
roll, pitch, yaw = self.imu.get_orientation()
roll_rate, pitch_rate, yaw_rate = self.imu.get_angular_velocity()
roll_correction = self.roll_controller.compute(0, roll)
pitch_correction = self.pitch_controller.compute(0, pitch)
self.set_target_position("left_hip_roll", self.joint_positions["left_hip_roll"] - roll_correction)
self.set_target_position("right_hip_roll", self.joint_positions["right_hip_roll"] + roll_correction)
self.set_target_position("left_ankle_roll", self.joint_positions["left_ankle_roll"] - roll_correction)
self.set_target_position("right_ankle_roll", self.joint_positions["right_ankle_roll"] + roll_correction)
self.set_target_position("left_ankle_pitch", self.joint_positions["left_ankle_pitch"] - pitch_correction)
self.set_target_position("right_ankle_pitch", self.joint_positions["right_ankle_pitch"] - pitch_correction)
if abs(pitch) > self.config.get_max_stable_pitch():
hip_pitch_correction = self.pitch_controller.compute(0, pitch) * 0.5
self.set_target_position("left_hip_pitch", self.joint_positions["left_hip_pitch"] + hip_pitch_correction)
self.set_target_position("right_hip_pitch", self.joint_positions["right_hip_pitch"] + hip_pitch_correction)
damping_factor = self.config.get_angular_velocity_damping_factor()
self.set_target_position("left_hip_roll", self.joint_positions["left_hip_roll"] - roll_rate * damping_factor)
self.set_target_position("right_hip_roll", self.joint_positions["right_hip_roll"] + roll_rate * damping_factor)
self.set_target_position("left_ankle_roll", self.joint_positions["left_ankle_roll"] - roll_rate * damping_factor)
self.set_target_position("right_ankle_roll", self.joint_positions["right_ankle_roll"] + roll_rate * damping_factor)
self.set_target_position("left_ankle_pitch", self.joint_positions["left_ankle_pitch"] - pitch_rate * damping_factor)
self.set_target_position("right_ankle_pitch", self.joint_positions["right_ankle_pitch"] - pitch_rate * damping_factor)
def check_joint_status(self) -> bool:
for joint, position in self.joint_positions.items():
limits = self.config.get_joint_limits(joint)
if position < limits[0] or position > limits[1]:
print(f"Joint {joint} out of limits: {position}")
return False
if joint in self.last_joint_positions:
velocity = (position - self.last_joint_positions[joint]) / self.control_loop_time
if abs(velocity) > self.config.get_max_joint_velocity(joint):
print(f"Joint {joint} velocity too high: {velocity}")
return False
self.last_joint_positions = self.joint_positions.copy()
return True
def check_power_status(self) -> bool:
voltage = self.power_sensor.get_voltage()
current = self.power_sensor.get_current()
if voltage < self.config.get_min_voltage():
print(f"Voltage too low: {voltage}V")
return False
if current > self.config.get_max_current():
print(f"Current too high: {current}A")
return False
return True
def check_sensor_status(self) -> bool:
return self.imu.is_operational() and self.power_sensor.is_operational() and self.joint_sensors.is_operational()
def self_diagnosis(self) -> Dict[str, bool]:
return {
"joints": self.check_joint_status(),
"sensors": self.check_sensor_status(),
"power": self.check_power_status(),
}
def emergency_stop(self):
self.stop()
self.reset_all_controllers()
print("Emergency stop activated!")
def stop(self):
zero_control = {key: 0.0 for key in self.controllers.keys()}
zero_control.update({"translation": 0.0, "rotation": 0.0})
self.execute_control(zero_control)
def run_control_loop(self):
while True:
try:
loop_start_time = time.time()
state = self.get_robot_state()
self.update_robot_state(**state)
self.balance_adjustment()
diagnosis = self.self_diagnosis()
if not all(diagnosis.values()):
print("System check failed. Initiating emergency stop.")
self.emergency_stop()
break
control = self.compute_full_robot_control(self.current_speed, self.current_angle, self.current_position)
self.execute_control(control)
loop_end_time = time.time()
loop_duration = loop_end_time - loop_start_time
if loop_duration < self.control_loop_time:
time.sleep(self.control_loop_time - loop_duration)
except Exception as e:
print(f"Unexpected error: {e}")
self.emergency_stop()
break
print("HumanoidRobotController shutting down.")

View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import math
from typing import Tuple
from humanoid_robot_intelligence_control_system_detection.PIDController import PIDController
from humanoid_robot_intelligence_control_system_detection.tracker_config import TrackerConfig
class MovementController:
def __init__(self, config: TrackerConfig, controller_type: str = "PID"):
self.config = config
self.controller_type = controller_type
self.translation_controller = self._create_controller("translation")
self.rotation_controller = self._create_controller("rotation")
self.path_controller = self._create_controller("path")
def _create_controller(self, controller_name: str) -> Any:
if self.controller_type == "PID":
return PIDController(
getattr(self.config, f"get_{controller_name}_pid_gains")(),
f"{controller_name}_controller",
output_limits=(-1, 1)
)
elif self.controller_type == "BellandeController":
return BellandeController(
getattr(self.config, f"get_{controller_name}_bellande_params")(),
f"{controller_name}_controller"
)
else:
raise ValueError(f"Unsupported controller type: {self.controller_type}")
def update_config(self, new_config: TrackerConfig):
self.config = new_config
self._update_controller(self.translation_controller, "translation")
self._update_controller(self.rotation_controller, "rotation")
self._update_controller(self.path_controller, "path")
def _update_controller(self, controller: Any, controller_name: str):
if self.controller_type == "PID":
controller.update_config(getattr(self.config, f"get_{controller_name}_pid_gains")())
elif self.controller_type == "BellandeController":
controller.update_config(getattr(self.config, f"get_{controller_name}_bellande_params")())
def compute_translation_control(self, current_position: float, target_position: float) -> float:
return self.translation_controller.compute(target_position, current_position)
def compute_rotation_control(self, current_angle: float, target_angle: float) -> float:
return self.rotation_controller.compute(target_angle, current_angle)
def compute_path_control(self, current_position: Tuple[float, float], target_position: Tuple[float, float]) -> Tuple[float, float]:
dx = target_position[0] - current_position[0]
dy = target_position[1] - current_position[1]
distance = (dx**2 + dy**2)**0.5
angle = math.atan2(dy, dx)
translation = self.translation_controller.compute(0, -distance)
rotation = self.rotation_controller.compute(angle, 0)
return translation, rotation

View File

@ -1,108 +0,0 @@
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import subprocess
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def ros1_launch_description():
# Get command-line arguments
args = sys.argv[1:]
# Construct the ROS 1 launch command
roslaunch_command = ["roslaunch", "humanoid_robot_intelligence_control_system_object_detector", "object_detector_processor.launch"] + args
roslaunch_command.extend([
"usb_cam", "usb_cam_node", "name:=camera",
"video_device:=/dev/video0",
"image_width:=640",
"image_height:=480",
"pixel_format:=yuyv",
"camera_frame_id:=usb_cam",
"io_method:=mmap"
])
roslaunch_command.extend([
"ros_web_api_bellande_2d_computer_vision", "bellande_2d_computer_vision_object_detection.py", "name:=object_detection_node"
])
roslaunch_command.extend([
"humanoid_robot_intelligence_control_system_ball_detector", "object_detection_processor.py", "name:=object_detection_processor_node"
])
roslaunch_command.extend([
"rviz", "rviz", "name:=rviz",
"args:=-d $(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz"
])
# Execute the launch command
subprocess.call(roslaunch_command)
def ros2_launch_description():
nodes_to_launch = []
nodes_to_launch.append(Node(
package='usb_cam',
executable='usb_cam_node',
name='camera',
output='screen',
parameters=[{
'video_device': '/dev/video0',
'image_width': 640,
'image_height': 480,
'pixel_format': 'yuyv',
'camera_frame_id': 'usb_cam',
'io_method': 'mmap'
}]
))
nodes_to_launch.append(Node(
package='ros_web_api_bellande_2d_computer_vision',
executable='bellande_2d_computer_vision_object_detection.py',
name='object_detection_node',
output='screen',
remappings=[('camera/image_raw', '/usb_cam/image_raw')]
))
nodes_to_launch.append(Node(
package='humanoid_robot_intelligence_control_system_object_detector',
executable='object_detection_processor.py',
name='object_detection_processor_node',
output='screen',
parameters=[{'yaml_path': '$(find ros_web_api_bellande_2d_computer_vision)/yaml/object_detection_params.yaml'}]
))
nodes_to_launch.append(Node(
package='rviz2',
executable='rviz2',
name='rviz',
arguments=['-d', '$(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz']
))
return LaunchDescription(nodes_to_launch)
if __name__ == "__main__":
ros_version = os.getenv("ROS_VERSION")
if ros_version == "1":
ros1_launch_description()
elif ros_version == "2":
ros2_launch_description()
else:
print("Unsupported ROS version. Please set the ROS_VERSION environment variable to '1' for ROS 1 or '2' for ROS 2.")
sys.exit(1)

View File

@ -1,41 +0,0 @@
<?xml version="1.0"?>
<!--
Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
-->
<launch>
<!-- Launch USB camera -->
<node name="usb_cam" pkg="usb_cam" type="usb_cam_node" output="screen">
<param name="video_device" value="/dev/video0" />
<param name="image_width" value="640" />
<param name="image_height" value="480" />
<param name="pixel_format" value="yuyv" />
<param name="camera_frame_id" value="usb_cam" />
<param name="io_method" value="mmap"/>
</node>
<!-- Launch object detection node -->
<node name="object_detection_node" pkg="ros_web_api_bellande_2d_computer_vision" type="bellande_2d_computer_vision_object_detection.py" output="screen">
<remap from="camera/image_raw" to="/usb_cam/image_raw"/>
</node>
<!-- Launch object detection processor node -->
<node name="object_detection_processor_node" pkg="humanoid_robot_intelligence_control_system_ball_detector" type="object_detection_processor.py" output="screen">
<param name="yaml_path" value="$(find ros_web_api_bellande_2d_computer_vision)/yaml/object_detection_params.yaml"/>
</node>
<!-- Launch RViz -->
<node name="rviz" pkg="rviz" type="rviz" args="-d $(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz" />
</launch>

View File

@ -1,61 +0,0 @@
<?xml version="1.0"?>
<!--
Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->
<package format="3">
<name>humanoid_robot_intelligence_control_system_object_detector</name>
<version>0.0.1</version>
<description>
This Package is for Object Detection, detecting objects like tools, or utilities
</description>
<license>Apache 2.0</license>
<maintainer email="ronaldsonbellande@gmail.com">Ronaldson Bellande</maintainer>
<buildtool_depend condition="$ROS_VERSION == 1">catkin</buildtool_depend>
<buildtool_depend condition="$ROS_VERSION == 2">ament_cmake</buildtool_depend>
<buildtool_depend condition="$ROS_VERSION == 2">ament_cmake_python</buildtool_depend>
<!-- ROS 1 dependencies -->
<depend condition="$ROS_VERSION == 1">rospy</depend>
<depend condition="$ROS_VERSION == 1">std_msgs</depend>
<depend condition="$ROS_VERSION == 1">sensor_msgs</depend>
<depend condition="$ROS_VERSION == 1">geometry_msgs</depend>
<depend condition="$ROS_VERSION == 1">cv_bridge</depend>
<depend condition="$ROS_VERSION == 1">image_transport</depend>
<depend condition="$ROS_VERSION == 1">dynamic_reconfigure</depend>
<depend condition="$ROS_VERSION == 1">message_generation</depend>
<depend condition="$ROS_VERSION == 1">message_runtime</depend>
<!-- ROS 2 dependencies -->
<depend condition="$ROS_VERSION == 2">rclpy</depend>
<depend condition="$ROS_VERSION == 2">std_msgs</depend>
<depend condition="$ROS_VERSION == 2">sensor_msgs</depend>
<depend condition="$ROS_VERSION == 2">geometry_msgs</depend>
<depend condition="$ROS_VERSION == 2">cv_bridge</depend>
<depend condition="$ROS_VERSION == 2">image_transport</depend>
<depend condition="$ROS_VERSION == 2">rosidl_default_generators</depend>
<depend condition="$ROS_VERSION == 2">rosidl_default_runtime</depend>
<!-- Common dependencies -->
<depend>python3-opencv</depend>
<depend>python3-yaml</depend>
<depend>usb_cam</depend>
<export>
<build_type condition="$ROS_VERSION == 1">catkin</build_type>
<build_type condition="$ROS_VERSION == 2">ament_cmake</build_type>
</export>
</package>

View File

@ -1,111 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import rospy
import cv2
import numpy as np
from sensor_msgs.msg import Image, CameraInfo
from std_msgs.msg import Bool, String
from cv_bridge import CvBridge
import yaml
class ObjectDetectionProcessor:
def __init__(self):
rospy.init_node('object_detection_processor')
self.bridge = CvBridge()
self.enable = True
self.new_image_flag = False
self.load_params()
self.setup_ros()
def load_params(self):
param_path = rospy.get_param('~yaml_path', '')
if param_path:
with open(param_path, 'r') as file:
self.params = yaml.safe_load(file)
else:
self.set_default_params()
def set_default_params(self):
self.params = {
'debug': False,
'ellipse_size': 2,
# Add other default parameters as needed
}
def setup_ros(self):
self.image_pub = rospy.Publisher('image_out', Image, queue_size=10)
self.camera_info_pub = rospy.Publisher('camera_info', CameraInfo, queue_size=10)
rospy.Subscriber('enable', Bool, self.enable_callback)
rospy.Subscriber('image_in', Image, self.image_callback)
rospy.Subscriber('cameraInfo_in', CameraInfo, self.camera_info_callback)
rospy.Subscriber('object_detection_result', String, self.object_detection_callback)
def enable_callback(self, msg):
self.enable = msg.data
def image_callback(self, msg):
if not self.enable:
return
self.cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
self.new_image_flag = True
self.image_header = msg.header
def camera_info_callback(self, msg):
if not self.enable:
return
self.camera_info_msg = msg
def object_detection_callback(self, msg):
if not self.enable or not hasattr(self, 'cv_image'):
return
objects = eval(msg.data) # Assuming the data is a string representation of a list
self.process_detected_objects(objects)
def process_detected_objects(self, objects):
output_image = self.cv_image.copy()
for obj in objects:
x, y, w, h = obj['bbox']
cv2.rectangle(output_image, (int(x), int(y)), (int(x+w), int(y+h)), (0, 255, 0), 2)
cv2.putText(output_image, f"{obj['label']}: {obj['confidence']:.2f}",
(int(x), int(y-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
self.publish_image(output_image)
def publish_image(self, image):
img_msg = self.bridge.cv2_to_imgmsg(image, encoding="bgr8")
img_msg.header = self.image_header
self.image_pub.publish(img_msg)
if hasattr(self, 'camera_info_msg'):
self.camera_info_pub.publish(self.camera_info_msg)
def run(self):
rate = rospy.Rate(30) # 30 Hz
while not rospy.is_shutdown():
if self.new_image_flag:
# The processing is done in object_detection_callback
self.new_image_flag = False
rate.sleep()
if __name__ == '__main__':
try:
processor = ObjectDetectionProcessor()
processor.run()
except rospy.ROSInterruptException:
pass

View File

@ -1,71 +0,0 @@
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
cmake_minimum_required(VERSION 3.0.2)
project(humanoid_robot_intelligence_control_system_speech_detector)
if($ENV{ROS_VERSION} EQUAL 1)
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
sensor_msgs
geometry_msgs
cv_bridge
)
catkin_package(
CATKIN_DEPENDS
rospy
std_msgs
sensor_msgs
geometry_msgs
cv_bridge
)
else()
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclpy REQUIRED)
find_package(std_msgs REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(cv_bridge REQUIRED)
endif()
if($ENV{ROS_VERSION} EQUAL 1)
catkin_python_setup()
catkin_install_python(PROGRAMS
src/speech_detection_processor.py
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
install(DIRECTORY config launch rviz
DESTINATION ${CATKIN_PACKAGE_SHARE_DESTINATION}
)
else()
ament_python_install_package(${PROJECT_NAME})
install(PROGRAMS
src/speech_detection_processor.py
DESTINATION lib/${PROJECT_NAME}
)
install(DIRECTORY config launch rviz
DESTINATION share/${PROJECT_NAME}
)
ament_package()
endif()

View File

@ -1,41 +0,0 @@
<?xml version="1.0"?>
<!--
Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
-->
<launch>
<!-- Launch USB camera -->
<node name="usb_cam" pkg="usb_cam" type="usb_cam_node" output="screen">
<param name="video_device" value="/dev/video0" />
<param name="image_width" value="640" />
<param name="image_height" value="480" />
<param name="pixel_format" value="yuyv" />
<param name="camera_frame_id" value="usb_cam" />
<param name="io_method" value="mmap"/>
</node>
<!-- Launch speech detection node -->
<node name="speech_detection_node" pkg="ros_web_api_bellande_2d_computer_vision" type="bellande_2d_computer_vision_speech_detection.py" output="screen">
<remap from="camera/image_raw" to="/usb_cam/image_raw"/>
</node>
<!-- Launch speech detection processor node -->
<node name="speech_detection_processor_node" pkg="humanoid_robot_intelligence_control_system_ball_detector" type="speech_detection_processor.py" output="screen">
<param name="yaml_path" value="$(find ros_web_api_bellande_2d_computer_vision)/yaml/speech_detection_params.yaml"/>
</node>
<!-- Launch RViz -->
<node name="rviz" pkg="rviz" type="rviz" args="-d $(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz" />
</launch>

View File

@ -1,108 +0,0 @@
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import subprocess
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def ros1_launch_description():
# Get command-line arguments
args = sys.argv[1:]
# Construct the ROS 1 launch command
roslaunch_command = ["roslaunch", "humanoid_robot_intelligence_control_system_speech_detector", "speech_detector_processor.launch"] + args
roslaunch_command.extend([
"usb_cam", "usb_cam_node", "name:=camera",
"video_device:=/dev/video0",
"image_width:=640",
"image_height:=480",
"pixel_format:=yuyv",
"camera_frame_id:=usb_cam",
"io_method:=mmap"
])
roslaunch_command.extend([
"ros_web_api_bellande_2d_computer_vision", "bellande_2d_computer_vision_speech_detection.py", "name:=speech_detection_node"
])
roslaunch_command.extend([
"humanoid_robot_intelligence_control_system_ball_detector", "speech_detection_processor.py", "name:=speech_detection_processor_node"
])
roslaunch_command.extend([
"rviz", "rviz", "name:=rviz",
"args:=-d $(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz"
])
# Execute the launch command
subprocess.call(roslaunch_command)
def ros2_launch_description():
nodes_to_launch = []
nodes_to_launch.append(Node(
package='usb_cam',
executable='usb_cam_node',
name='camera',
output='screen',
parameters=[{
'video_device': '/dev/video0',
'image_width': 640,
'image_height': 480,
'pixel_format': 'yuyv',
'camera_frame_id': 'usb_cam',
'io_method': 'mmap'
}]
))
nodes_to_launch.append(Node(
package='ros_web_api_bellande_2d_computer_vision',
executable='bellande_2d_computer_vision_speech_detection.py',
name='speech_detection_node',
output='screen',
remappings=[('camera/image_raw', '/usb_cam/image_raw')]
))
nodes_to_launch.append(Node(
package='humanoid_robot_intelligence_control_system_speech_detector',
executable='speech_detection_processor.py',
name='speech_detection_processor_node',
output='screen',
parameters=[{'yaml_path': '$(find ros_web_api_bellande_2d_computer_vision)/yaml/speech_detection_params.yaml'}]
))
nodes_to_launch.append(Node(
package='rviz2',
executable='rviz2',
name='rviz',
arguments=['-d', '$(find ros_web_api_bellande_2d_computer_vision)/rviz/visualization.rviz']
))
return LaunchDescription(nodes_to_launch)
if __name__ == "__main__":
ros_version = os.getenv("ROS_VERSION")
if ros_version == "1":
ros1_launch_description()
elif ros_version == "2":
ros2_launch_description()
else:
print("Unsupported ROS version. Please set the ROS_VERSION environment variable to '1' for ROS 1 or '2' for ROS 2.")
sys.exit(1)

View File

@ -1,26 +0,0 @@
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from distutils.core import setup
from catkin_pkg.python_setup import generate_distutils_setup
# fetch values from package.xml
setup_args = generate_distutils_setup(
scripts=['src/speech_detection_processor.py'],
packages=['humanoid_robot_intelligence_control_system_speech_detector'],
package_dir={'': 'src'},
)
setup(**setup_args)

View File

@ -1,111 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2024 Bellande Robotics Sensors Research Innovation Center, Ronaldson Bellande
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import rospy
import cv2
import numpy as np
from sensor_msgs.msg import Image, CameraInfo
from std_msgs.msg import Bool, String
from cv_bridge import CvBridge
import yaml
class ObjectDetectionProcessor:
def __init__(self):
rospy.init_node('speech_detection_processor')
self.bridge = CvBridge()
self.enable = True
self.new_image_flag = False
self.load_params()
self.setup_ros()
def load_params(self):
param_path = rospy.get_param('~yaml_path', '')
if param_path:
with open(param_path, 'r') as file:
self.params = yaml.safe_load(file)
else:
self.set_default_params()
def set_default_params(self):
self.params = {
'debug': False,
'ellipse_size': 2,
# Add other default parameters as needed
}
def setup_ros(self):
self.image_pub = rospy.Publisher('image_out', Image, queue_size=10)
self.camera_info_pub = rospy.Publisher('camera_info', CameraInfo, queue_size=10)
rospy.Subscriber('enable', Bool, self.enable_callback)
rospy.Subscriber('image_in', Image, self.image_callback)
rospy.Subscriber('cameraInfo_in', CameraInfo, self.camera_info_callback)
rospy.Subscriber('speech_detection_result', String, self.speech_detection_callback)
def enable_callback(self, msg):
self.enable = msg.data
def image_callback(self, msg):
if not self.enable:
return
self.cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
self.new_image_flag = True
self.image_header = msg.header
def camera_info_callback(self, msg):
if not self.enable:
return
self.camera_info_msg = msg
def speech_detection_callback(self, msg):
if not self.enable or not hasattr(self, 'cv_image'):
return
speechs = eval(msg.data) # Assuming the data is a string representation of a list
self.process_detected_speechs(speechs)
def process_detected_speechs(self, speechs):
output_image = self.cv_image.copy()
for obj in speechs:
x, y, w, h = obj['bbox']
cv2.rectangle(output_image, (int(x), int(y)), (int(x+w), int(y+h)), (0, 255, 0), 2)
cv2.putText(output_image, f"{obj['label']}: {obj['confidence']:.2f}",
(int(x), int(y-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
self.publish_image(output_image)
def publish_image(self, image):
img_msg = self.bridge.cv2_to_imgmsg(image, encoding="bgr8")
img_msg.header = self.image_header
self.image_pub.publish(img_msg)
if hasattr(self, 'camera_info_msg'):
self.camera_info_pub.publish(self.camera_info_msg)
def run(self):
rate = rospy.Rate(30) # 30 Hz
while not rospy.is_shutdown():
if self.new_image_flag:
# The processing is done in speech_detection_callback
self.new_image_flag = False
rate.sleep()
if __name__ == '__main__':
try:
processor = ObjectDetectionProcessor()
processor.run()
except rospy.ROSInterruptException:
pass