Skip to content
Commits on Source (5)
from ultralytics import YOLO
MODEL_NAME = "best"
# Загрузите модель YOLO
model = YOLO(f"{MODEL_NAME}.pt")
# Экспортируйте модель в формат RKNN
# "имя" может быть одним из rk3588, rk3576, rk3566, rk3568, rk3562, rv1103, rv1106, rv1103b, rv1106b, rk2118
model.export(format="rknn", name="rk3588") # создает папку '/yolo_rknn_model'
from ultralytics import YOLO
def train_yolo_model():
model = YOLO("yolov8n.pt")
model.train(
data="data.yaml", # путь к конфигурации вашего набора данных
epochs=75, # количество периодов обучения
imgsz=640, # размер изображения
)
return model
if __name__ == "__main__":
# Требуется для многопроцессорной обработки Windows :cite[4]
train_yolo_model()
\ No newline at end of file
......@@ -32,6 +32,7 @@ def launch_setup(context, *args, **kwargs):
package_name = "pspb_2025"
is_with_movement = to_bool(Lc('is_with_movement').perform(context))
is_with_yolo = to_bool(Lc('is_with_yolo').perform(context))
is_with_cv = to_bool(Lc('is_with_cv').perform(context))
is_with_state_machine = to_bool(Lc('is_with_state_machine').perform(context))
is_with_low_level = to_bool(Lc('is_with_low_level').perform(context))
......@@ -129,6 +130,16 @@ def launch_setup(context, *args, **kwargs):
),
]
if is_with_yolo:
res_launch += [
IncludeLaunchDescription(
PythonLaunchDescriptionSource([os.path.join(
get_package_share_directory(package_name), 'launch'),
'/sublaunch/yolo.launch.py']),
launch_arguments=launch_args
),
]
return res_launch
......@@ -137,6 +148,7 @@ def generate_launch_description():
DeclareLaunchArgument('namespace', default_value='ws'),
DeclareLaunchArgument('is_sim', default_value="false"),
DeclareLaunchArgument('is_with_yolo', default_value="false"),
DeclareLaunchArgument('is_with_low_level', default_value="true"),
DeclareLaunchArgument('is_with_state_machine', default_value="true"),
DeclareLaunchArgument('is_with_cv', default_value="true"),
......@@ -147,6 +159,7 @@ def generate_launch_description():
DeclareLaunchArgument('is_with_tools', default_value="false"),
# red, yellow, green
DeclareLaunchArgument('mask_name', default_value="red"),
DeclareLaunchArgument('model_name', default_value="best"),
OpaqueFunction(function=launch_setup),
])
......@@ -57,11 +57,12 @@ def launch_setup(context, *args, **kwargs):
Node(
package="pspb_2025",
executable='multi_photocamera',
output='screen',
prefix='xterm -e',
parameters=[
config('pspb_2025', 'photocamera.yaml'),
],
),
]
return [GroupAction(res_action)]
......
import os
from ament_index_python import get_package_share_directory
from launch import LaunchDescription
from launch.actions import GroupAction
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration as Lc
from launch.actions import DeclareLaunchArgument
from launch.actions import OpaqueFunction
from launch_ros.actions import PushRosNamespace
def to_bool(value: str):
if isinstance(value, bool):
return value
if not isinstance(value, str):
raise ValueError('String to bool, invalid type ' + str(value))
valid = {'true': True, '1': True,
'false': False, '0': False}
value = value.lower()
if value in valid:
return valid[value]
raise ValueError('String to bool, invalid value: %s' % value)
def config(pkg,filename):
path = os.path.join(get_package_share_directory(pkg), 'config', filename).split('/')
for i in range(path.index("share"), path.index("install") - 1, -1):
path.pop(i)
path.insert(i, "src")
return "/".join(path)
def launch_setup(context, *args, **kwargs):
ns = Lc('namespace').perform(context)
model_name = Lc("model_name").perform(context)
is_use_namespace = Lc('is_use_namespace').perform(context)
is_rockchip = os.getlogin() in ["orengepi"]
if to_bool(is_use_namespace):
res_action = [PushRosNamespace(ns)]
else:
res_action = []
# Выбор файла модели в подходящем формате
if is_rockchip:
yolo_filename = f"{model_name}_rknn_model"
else:
yolo_filename = f"{model_name}.pt"
yolo_path = config('pspb_2025', yolo_filename)
res_action += [
# Нода для определения объектов при помощи YOLO
Node(
package="pspb_2025",
executable='yolo_detection',
parameters=[
{"dir": yolo_path},
config('pspb_2025', 'topics.yaml'),
],
),
]
return [GroupAction(res_action)]
def generate_launch_description():
return LaunchDescription([
DeclareLaunchArgument('namespace', default_value='ws'),
DeclareLaunchArgument('model_name', default_value='best'),
DeclareLaunchArgument('is_use_namespace', default_value='true'),
OpaqueFunction(function=launch_setup)
])
from sensor_msgs.msg import CompressedImage
from pspb_2025_interfaces.msg import ObjectArray, ObjectData
from std_msgs.msg import Header
from cv_bridge import CvBridge
from rclpy.node import Node
import rclpy
try:
from ultralytics import YOLO
except KeyboardInterrupt:
pass
from typing import Dict, Tuple, Any, List
from enum import IntEnum
from numpy import ndarray
import random
import cv2
import os
class BBox(IntEnum):
LEFT = 0 # x1
TOP = 1 # y1
RIGHT = 2 # x2
BOTTOM = 3 # y2
class YoloController(Node):
def __init__(self):
super().__init__('YOLO_controller')
path = os.path.expanduser(self.declare_parameter("dir", f'/home/orangepi/pspb_rknn_model').value)
self.__br = CvBridge()
self.__model = YOLO(path)
self.__MIN_CONFIDENCE = self.declare_parameter("min_confidence", 0.3).value
self.__WIDTH, self.__HEIGHT = None, None
self.__FOV = self.declare_parameter("fov", 120).value
self.__CLASS_NAMES = self.__model.names
self.__debug_image_publisher = self.create_publisher(CompressedImage, self.declare_parameter("yolo_debug_topic", 'yolo_debug/compressed').value, 1)
self.__objects_publisher = self.create_publisher(ObjectArray, self.declare_parameter("object_position_topic", "objects").value, 1)
_ = self.create_subscription(CompressedImage, self.declare_parameter("yolo_camera_topic", 'camera_front/compressed').value, self.__camera_callback, 1)
self.info(f"YOLO started from {path}\nmin_confidence: {self.__MIN_CONFIDENCE} fov: {self.__FOV}")
def __camera_callback(self, msg: CompressedImage) -> None:
frame = self.__br.compressed_imgmsg_to_cv2(msg)
if self.__WIDTH is None:
self.__HEIGHT, self.__WIDTH = frame.shape[0:2]
self.__image_processing(frame)
def __image_processing(self, frame: ndarray) -> None:
boxes = self.__model.predict(frame, verbose=False)[0].boxes
objects = {}
for i in range(len(boxes)):
cls, conf, xyxy = self.__get_object_data(boxes, i)
if conf > self.__MIN_CONFIDENCE:
objects[cls] = self.__calc_center_x(xyxy)
self.__publish_course_to_objects(objects)
# Создаём изображение с метками обнаруженных объектов
debug_frame = self.__draw_bboxes(frame, boxes)
self.__debug_image_publisher.publish(self.__br.cv2_to_compressed_imgmsg(debug_frame))
def __get_object_data(self, boxes: Any, ind: int) -> Tuple:
'''
Метод возвращает название класса, уверенность в распозновании объекта, координаты объекта в формате bbox
'''
return self.__CLASS_NAMES[boxes.cls[ind].item()], boxes.conf[ind].item(), boxes[ind].xyxy[0].tolist()
def __calc_center_x(self, box: List[float]) -> float:
return (box[BBox.LEFT] + box[BBox.RIGHT]) / 2
def __publish_course_to_objects(self, objects: Dict[str, float]) -> None:
'''
Метод публикует курс на распознанные объекты
'''
msg = ObjectArray()
msg.header = Header()
for key in objects.keys():
new_object = ObjectData()
new_object.name = key
new_object.course = float(self.__calc_course(objects[key]))
msg.objects.append(new_object)
self.__objects_publisher.publish(msg)
def __calc_course(self, center_x: int) -> float:
HALF_WIDTH = self.__WIDTH / 2
return ((center_x - HALF_WIDTH) / HALF_WIDTH) * self.__FOV / 2
def __draw_bboxes(self, frame: ndarray, results: Any) -> ndarray:
'''
Метод отрисовывает рамки для распознанных объектов
'''
frame = frame.copy()
boxes = results.xyxy.cpu().numpy().astype(int)
classes = results.cls.cpu().numpy().astype(int)
for box, cls_id in zip(boxes, classes):
# Разные цвета для объектов разных классов
random.seed(int(cls_id) + 8)
color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
center_x = self.__calc_center_x(box)
cv2.rectangle(frame, (box[BBox.LEFT], box[BBox.TOP]), (box[BBox.RIGHT], box[BBox.BOTTOM]), color, 2)
cv2.putText(
frame,
f"{self.__CLASS_NAMES[cls_id]}",
(box[BBox.LEFT], box[BBox.TOP] - 20), # позиция текста
cv2.FONT_HERSHEY_SIMPLEX, # более красивый шрифт
0.4, # немного уменьшенный размер
(255, 255, 255), # белый цвет текста
1, # толщина текста
cv2.LINE_AA # сглаживание краёв
)
# Надпись с углом курса
cv2.putText(
frame,
f"{round(self.__calc_course(center_x), 1)} deg",
(box[BBox.LEFT], box[BBox.TOP] - 5), # позиция текста
cv2.FONT_HERSHEY_SIMPLEX, # тот же шрифт для единообразия
0.4, # тот же размер
(255, 255, 255), # белый цвет текста
1, # толщина текста
cv2.LINE_AA # сглаживание
)
return frame
def ros_time(self) -> float:
sec, nanosec = self.get_clock().now().seconds_nanoseconds()
return sec + nanosec / 1e9
def info(self, msg: str) -> None:
self.get_logger().info(str(msg))
def main() -> None:
rclpy.init()
try:
node = YoloController()
rclpy.spin(node)
except KeyboardInterrupt: # предотвращает появление ошибок при завершении ROS 2 нод сочетанием Ctrl+C
pass
finally:
node.destroy_node()
try:
rclpy.shutdown()
except Exception: # предотвращает ошибки, если ROS 2 ещё не успел запуститься
pass
if __name__ == '__main__':
main()
......@@ -110,12 +110,11 @@ def main(args=None):
except KeyboardInterrupt: # предотвращает появление ошибок при завершении ROS 2 нод сочетанием Ctrl+C
pass
finally:
node.destroy_node()
try:
node.destroy_node()
rclpy.shutdown()
except Exception: # предотвращает ошибки, если ROS 2 ещё не успел запуститься
pass
if __name__ == '__main__':
main()
......@@ -8,6 +8,15 @@ import os
from datetime import datetime
from cv_bridge import CvBridge
import sys
from select import select
if sys.platform == 'win32':
import msvcrt
else:
import termios
import tty
class Photocamera(Node):
def __init__(self) -> None:
......@@ -24,6 +33,8 @@ class Photocamera(Node):
self.create_subscription(Bool, photo_button_topic, lambda _, id=id: self.__make_photo(id), 1)
self.info(f"bind photo of topic: '{camera_topic}' on topic: '{photo_button_topic}' {id}")
self.settings = self.saveTerminalSettings()
self.br = CvBridge()
self.__frames = [None for _ in range(len(topics))]
......@@ -32,6 +43,7 @@ class Photocamera(Node):
if not os.path.exists(self.__photo_dir):
os.makedirs(self.__photo_dir)
self.create_timer(1/15, self.__update)
self.info(f"multiphotocamera started {self.__photo_dir}")
def __camera_callback(self, msg: CompressedImage, id: int) -> None:
......@@ -47,6 +59,33 @@ class Photocamera(Node):
else:
self.info(f"have no image on {id}")
def getKey(self, settings, timeout):
if sys.platform == 'win32':
# getwch() returns a string on Windows
key = msvcrt.getwch()
else:
tty.setraw(sys.stdin.fileno())
# sys.stdin.read() returns a string on Linux
rlist, _, _ = select([sys.stdin], [], [], timeout)
if rlist:
key = sys.stdin.read(1)
else:
key = ''
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
return key
def saveTerminalSettings(self):
if sys.platform == 'win32':
return None
return termios.tcgetattr(sys.stdin)
def __update(self) -> None:
key = self.getKey(self.settings, 1/15)
if key.isdigit():
index_of_camera = int(key)
if 0 <= index_of_camera < len(self.__frames):
self.__make_photo(index_of_camera)
def ros_time(self) -> float:
sec, nanosec = self.get_clock().now().seconds_nanoseconds()
return sec + nanosec / 1e9
......@@ -63,12 +102,12 @@ def main(args=None):
except KeyboardInterrupt: # предотвращает появление ошибок при завершении ROS 2 нод сочетанием Ctrl+C
pass
finally:
node.destroy_node()
try:
node.destroy_node()
rclpy.shutdown()
except Exception: # предотвращает ошибки, если ROS 2 ещё не успел запуститься
pass
if __name__ == '__main__':
main()
main()
\ No newline at end of file