Техническое зрение

ros2_ws/src/lesson_08/lesson_08/SimpleComputerVision.py

class SimpleComputerVision(Node):
    def __init__(self):
        super().__init__('simple_computer_vision')

        # HSV: H (hue) - цвет, S (saturation) - насыщенность, V (value) - яркость
        self.__COLOR_SCHEME = cv2.COLOR_RGB2HSV
        self.__MASK = ((90, 0, 0), (180, 255, 255))
        self.__MIN_SIZE = 500
        self.__POOL_DEPTH = 6  # 6m for simulator
        self.__CAMERA_AZIMUTH = 0  # [0; 2pi]

        vfov = 55
        hfov = 65
        self.__SIZE = (self.__WIDTH, self.__HEIGHT) = (640, 480)
        # it's work 0.0
        self.__METER_PER_PIXEL = [
            (math.tan(math.radians(vfov/2)) * self.__POOL_DEPTH) / (self.__HEIGHT / 2),
            (math.tan(math.radians(hfov/2)) * self.__POOL_DEPTH) / (self.__WIDTH / 2)
        ]

        self.__position = [0, 0]
        self.__heading = 0

        self.create_subscription(Image, "camera_down_cam/image_raw", self.__image_callback, 1)
        self.create_subscription(PointStamped, 'position', self.__position_callback, 1)

        self.__pub_debug = self.create_publisher(CompressedImage, f"cv/modified_image/compressed" , 10)
        self.__pub_target_pos = self.create_publisher(PointStamped, 'target_pos', 1)

        self.__br = CvBridge()

        self.log_info('CV started')

    def __image_callback(self, data: Image) -> None:
        frame = self.__br.imgmsg_to_cv2(data)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        debug_frame = frame.copy()

        # get mask
        binary = self.__get_binary(frame)
        # find counturs on mask
        counturs = self.__get_counturs(binary)
        # find objects at counturs
        local_position = self.__get_object_bottom(counturs, debug_frame)
        if local_position is not None:  # pub position if find
            global_position = self.__to_global_position(local_position)
            self.__send_target(global_position)

        self.__draw_mask(debug_frame, binary)

        self.__pub_debug.publish(self.__br.cv2_to_compressed_imgmsg(debug_frame))

    def __get_binary(self, frame):
        # Переходим в другое цветовое пространство и выделяем маску
        frame = cv2.cvtColor(frame, self.__COLOR_SCHEME)
        binary = cv2.inRange(frame, *self.__MASK)

        return binary

    def __get_counturs(self, binary):
        # Получение контуров из маски
        contours = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[0]

        count_counturs = 1
        is_max_size = True
        # Трешхолд по размеру контура
        couturs_size_checker = lambda contour: cv2.moments(contour)["m00"] >= self.__MIN_SIZE
        contours = list(filter(couturs_size_checker, sorted(contours, key=cv2.contourArea, reverse=is_max_size)[:count_counturs]))

        return contours

    def __get_object_bottom(self, contours, frame=None):
        point = None

        for contour in contours:
            moments = cv2.moments(contour)
            # Координаты центра контура
            if moments["m00"]:
                cx = int(moments["m10"] / moments["m00"])
                cy = int(moments["m01"] / moments["m00"])
            else:
                continue

            # Вычисление положения
            x = -(cy - self.__HEIGHT / 2) * self.__METER_PER_PIXEL[0]
            y = -(cx - self.__WIDTH / 2) * self.__METER_PER_PIXEL[1]
            point = [x, y]

            if frame is not None:  # Вывод информации о контуре
                # Выделение контура
                #                                     B G R
                cv2.drawContours(frame, contour, -1, (0, 255, 0), 3)

        return point

    def __to_global_position(self, local_position):
        global_position = self.__position[:]

        dist, _azimuth = get_dist_course(local_position)
        azimuth = (self.__heading + _azimuth + self.__CAMERA_AZIMUTH) % (2* math.pi)

        global_position[0] += dist * math.cos(azimuth)
        global_position[1] += dist * -math.sin(azimuth)

        return global_position

    def __send_target(self, pos: List[float], object_name: str="red_point_bottom") -> None:
        target = PointStamped()
        target.header.frame_id = object_name
        target.point.x = float(round(pos[0], 1))
        target.point.y = float(round(pos[1], 1))
        self.__pub_target_pos.publish(target)

    def __draw_mask(self, debug_frame, binary):
        self.mask_id = 0
        self.__COUNT_MASKS = 3

        miniBin = cv2.resize(binary, (int(binary.shape[1] / (self.__COUNT_MASKS + 1)), int(binary.shape[0] / (self.__COUNT_MASKS + 1))),
                                interpolation=cv2.INTER_AREA)
        miniBin = cv2.cvtColor(miniBin, cv2.COLOR_GRAY2RGB)
        debug_frame[-2 - miniBin.shape[0]:-2, 2 + miniBin.shape[1] * self.mask_id:2 + miniBin.shape[1] * (self.mask_id + 1)] = miniBin
        self.mask_id += 1

    def __position_callback(self, msg: PointStamped) -> None:
        pos = msg.point
        self.__position = [pos.x, pos.y]
        self.__heading = pos.z

    def log_info(self, msg: str) -> None:
        self.get_logger().info(msg)