본 코드는 video 영상을 다운받아 detection을 실행하기 위한 코드로, topic의 publisher를 image_tools을 이용하지 않고, 영상을 이용하도록 한 코드입니다.
test용 영상을 구글 드라이브에 업로드합니다.
업로드가 완료되었다면 해당 파일을 우측 클릭하여, "공유" 항목을 클릭합니다.
공유 항목 창이 뜨면, 일반 엑세스 항목 아래 "제한됨"으로 되어있는 항목을 "링크가 있는 모든 사용자"로 변경해줍니다.
그 후 좌측 하단에 링크 복사 버튼을 클릭 및 우측 하단에 완료 버튼을 눌러줍니다.
링크를 성공적으로 복사해오셨으면 메모장에 붙여넣습니다. 아마 링크가 다음과 같이 생성될 것입니다.
<https://drive.google.com/file/d/**{고유id}**/view?usp=share_lin>
gdown 명령어를 이용하여 파일을 다운받습니다.
$ cd {피일 저장할 경로}
$ gdown <https://drive.google.com/uc?id=**{고유id}**&export=download>
$ cd ~/ros2_ws/src/ssd_detection/ssd_detection/
$ gedit video_publisher.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile # QoS 설정을 위해
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
bridge = CvBridge()
class ImagePublisher(Node): # Node 클래스를 상속
def __init__(self):
super().__init__('image_publisher') # 부모 클래스(Node)의 생성자를 호출하고 이름을 helloworld_publisher로 지정
qos_profile = QoSProfile(depth=10) # 통신상태가 원활하지 못할 경우 퍼블리시 할 데이터를 버퍼에 10개까지 저장하라는 설정
self.publisher = self.create_publisher(# create_publisher함수를 이용해 helloworld_publisher 설정
Image, # 토픽 메시지 타입: Imqge,
'image', # 토픽 이름: image
qos_profile)#, QoS설정
self.timer = self.create_timer(0.1, self.time_callback) # 콜백 함수를 실행. 0.1초마다 time_callback 함수 실행
self.video_path = input("Video Absolute Path:")
self.cap = cv2.VideoCapture(self.video_path)
def time_callback(self) :
ret, frame = self.cap.read()
cv2.imwrite('test.jpg', frame)
if ret == True :
fra = bridge.cv2_to_imgmsg(frame,"bgr8")
self.publisher.publish(fra)
# cv2.imshow('droidcamframe', frame)
# cv2.waitKey(2)
self.get_logger().info('Publishing Droidcam Image')
def main(args=None):
rclpy.init(args=args) # 초기화
node = ImagePublisher() # ImagePublisher를 node라는 이름으로 생성
try:
rclpy.spin(node) # rclpy에게 이 Node를 반복해서 실행 (=spin) 하라고 전달
except KeyboardInterrupt: # `Ctrl + c`가 동작했을 때
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node() # 노드 소멸
rclpy.shutdown() # rclpy.shutdown 함수로 노드 종료
if __name__ == '__main__':
main()
ros2 run
커맨드를 통해 작성한 Service node 실행시키기 위해서는 setup.py
속의 entry_points
구역에 아래의 내용을 추가해야 합니다entry_points={
'console_scripts': [
'detection_example = ssd_detection.ssd_detection:main',
'video_publisher = ssd_detection.video_publisher:main',
],