class SimpleComputerVision(Node):
def __init__(self):
super().__init__('simple_computer_vision')
# HSV: H (hue) - цвет, S (saturation) - насыщенность, V (value) - яркость
self.__COLOR_SCHEME = cv2.COLOR_RGB2HSV
self.__MASK = ((90, 0, 0), (180, 255, 255))
self.__MIN_SIZE = 500
self.__POOL_DEPTH = 6 # 6m for simulator
self.__CAMERA_AZIMUTH = 0 # [0; 2pi]
vfov = 55
hfov = 65
self.__SIZE = (self.__WIDTH, self.__HEIGHT) = (640, 480)
# it's work 0.0
self.__METER_PER_PIXEL = [
(math.tan(math.radians(vfov/2)) * self.__POOL_DEPTH) / (self.__HEIGHT / 2),
(math.tan(math.radians(hfov/2)) * self.__POOL_DEPTH) / (self.__WIDTH / 2)
]
self.__position = [0, 0]
self.__heading = 0
self.create_subscription(Image, "camera_down_cam/image_raw", self.__image_callback, 1)
self.create_subscription(PointStamped, 'position', self.__position_callback, 1)
self.__pub_debug = self.create_publisher(CompressedImage, f"cv/modified_image/compressed" , 10)
self.__pub_target_pos = self.create_publisher(PointStamped, 'target_pos', 1)
self.__br = CvBridge()
self.log_info('CV started')
def __image_callback(self, data: Image) -> None:
frame = self.__br.imgmsg_to_cv2(data)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
debug_frame = frame.copy()
# get mask
binary = self.__get_binary(frame)
# find counturs on mask
counturs = self.__get_counturs(binary)
# find objects at counturs
local_position = self.__get_object_bottom(counturs, debug_frame)
if local_position is not None: # pub position if find
global_position = self.__to_global_position(local_position)
self.__send_target(global_position)
self.__draw_mask(debug_frame, binary)
self.__pub_debug.publish(self.__br.cv2_to_compressed_imgmsg(debug_frame))
def __get_binary(self, frame):
# Переходим в другое цветовое пространство и выделяем маску
frame = cv2.cvtColor(frame, self.__COLOR_SCHEME)
binary = cv2.inRange(frame, *self.__MASK)
return binary
def __get_counturs(self, binary):
# Получение контуров из маски
contours = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[0]
count_counturs = 1
is_max_size = True
# Трешхолд по размеру контура
couturs_size_checker = lambda contour: cv2.moments(contour)["m00"] >= self.__MIN_SIZE
contours = list(filter(couturs_size_checker, sorted(contours, key=cv2.contourArea, reverse=is_max_size)[:count_counturs]))
return contours
def __get_object_bottom(self, contours, frame=None):
point = None
for contour in contours:
moments = cv2.moments(contour)
# Координаты центра контура
if moments["m00"]:
cx = int(moments["m10"] / moments["m00"])
cy = int(moments["m01"] / moments["m00"])
else:
continue
# Вычисление положения
x = -(cy - self.__HEIGHT / 2) * self.__METER_PER_PIXEL[0]
y = -(cx - self.__WIDTH / 2) * self.__METER_PER_PIXEL[1]
point = [x, y]
if frame is not None: # Вывод информации о контуре
# Выделение контура
# B G R
cv2.drawContours(frame, contour, -1, (0, 255, 0), 3)
return point
def __to_global_position(self, local_position):
global_position = self.__position[:]
dist, _azimuth = get_dist_course(local_position)
azimuth = (self.__heading + _azimuth + self.__CAMERA_AZIMUTH) % (2* math.pi)
global_position[0] += dist * math.cos(azimuth)
global_position[1] += dist * -math.sin(azimuth)
return global_position
def __send_target(self, pos: List[float], object_name: str="red_point_bottom") -> None:
target = PointStamped()
target.header.frame_id = object_name
target.point.x = float(round(pos[0], 1))
target.point.y = float(round(pos[1], 1))
self.__pub_target_pos.publish(target)
def __draw_mask(self, debug_frame, binary):
self.mask_id = 0
self.__COUNT_MASKS = 3
miniBin = cv2.resize(binary, (int(binary.shape[1] / (self.__COUNT_MASKS + 1)), int(binary.shape[0] / (self.__COUNT_MASKS + 1))),
interpolation=cv2.INTER_AREA)
miniBin = cv2.cvtColor(miniBin, cv2.COLOR_GRAY2RGB)
debug_frame[-2 - miniBin.shape[0]:-2, 2 + miniBin.shape[1] * self.mask_id:2 + miniBin.shape[1] * (self.mask_id + 1)] = miniBin
self.mask_id += 1
def __position_callback(self, msg: PointStamped) -> None:
pos = msg.point
self.__position = [pos.x, pos.y]
self.__heading = pos.z
def log_info(self, msg: str) -> None:
self.get_logger().info(msg)