본 코드는 video 영상을 다운받아 detection을 실행하기 위한 코드로, topic의 publisher를 image_tools을 이용하지 않고, 영상을 이용하도록 한 코드입니다.

1. 영상 준비하기

  1. test용 영상을 구글 드라이브에 업로드합니다.

  2. 업로드가 완료되었다면 해당 파일을 우측 클릭하여, "공유" 항목을 클릭합니다.

  3. 공유 항목 창이 뜨면, 일반 엑세스 항목 아래 "제한됨"으로 되어있는 항목을 "링크가 있는 모든 사용자"로 변경해줍니다.

  4. 그 후 좌측 하단에 링크 복사 버튼을 클릭 및 우측 하단에 완료 버튼을 눌러줍니다.

  5. 링크를 성공적으로 복사해오셨으면 메모장에 붙여넣습니다. 아마 링크가 다음과 같이 생성될 것입니다.

    <https://drive.google.com/file/d/**{고유id}**/view?usp=share_lin>
    
  6. gdown 명령어를 이용하여 파일을 다운받습니다.

    $ cd {피일 저장할 경로}
    $ gdown <https://drive.google.com/uc?id=**{고유id}**&export=download>
    

2. 비디오 publish를 위한 코드 작성

경로 이동하기

$ cd ~/ros2_ws/src/ssd_detection/ssd_detection/
$ gedit video_publisher.py

video_publisher.py 작성하기

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile # QoS 설정을 위해 
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2

bridge = CvBridge()

class ImagePublisher(Node): # Node 클래스를 상속
    def __init__(self):
        super().__init__('image_publisher') # 부모 클래스(Node)의 생성자를 호출하고 이름을 helloworld_publisher로 지정
        qos_profile = QoSProfile(depth=10) # 통신상태가 원활하지 못할 경우 퍼블리시 할 데이터를 버퍼에 10개까지 저장하라는 설정
        self.publisher = self.create_publisher(# create_publisher함수를 이용해 helloworld_publisher 설정
            Image, # 토픽 메시지 타입: Imqge,
            'image', #  토픽 이름: image
            qos_profile)#, QoS설정
        self.timer = self.create_timer(0.1, self.time_callback) # 콜백 함수를 실행. 0.1초마다 time_callback 함수 실행
        self.video_path = input("Video Absolute Path:")
        self.cap = cv2.VideoCapture(self.video_path)

    def time_callback(self) :
        ret, frame = self.cap.read()
        cv2.imwrite('test.jpg', frame)
        if ret == True :
            fra = bridge.cv2_to_imgmsg(frame,"bgr8")
            self.publisher.publish(fra)
            # cv2.imshow('droidcamframe', frame)
            # cv2.waitKey(2)
        self.get_logger().info('Publishing Droidcam Image')
    

def main(args=None):
    rclpy.init(args=args) # 초기화
    node = ImagePublisher() # ImagePublisher를 node라는 이름으로 생성
    try:
        rclpy.spin(node) # rclpy에게 이 Node를 반복해서 실행 (=spin) 하라고 전달
    except KeyboardInterrupt: # `Ctrl + c`가 동작했을 때
        node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally:
        node.destroy_node()  # 노드 소멸
        rclpy.shutdown() # rclpy.shutdown 함수로 노드 종료

if __name__ == '__main__':
    main()

Add an entry point


entry_points={
        'console_scripts': [
        'detection_example = ssd_detection.ssd_detection:main',
        'video_publisher = ssd_detection.video_publisher:main',
        ],

Build and run