openCV beidge with ROS2
ROS2 does not provide native support to image type topic. So, we need to use a bridge.
what is a bridge?
A bridge in real world connect 2 differnt places together. Similarly, a ROS2 bridge is a software component that connects two different systems or parts of the ROS ecosystem so they can exchange messages and work together.
example code
camera pubisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
class CameraPublisher(Node):
def __init__(self):
super().__init__('camera_publisher')
self.publisher_ = self.create_publisher(Image, '/camera/image_raw', 10)
self.timer = self.create_timer(0.05, self.timer_callback) # ~20 FPS
self.br = CvBridge()
self.cap = cv2.VideoCapture(0)
# sometimes the default camera (0) might not be available,
# then we change to camera 1
if not self.cap.isOpened():
print("Camera 0 not opened. Trying camera 1.")
self.cap = cv2.VideoCapture(1)
if not self.cap.isOpened():
print("Camera 1 not opened.")
else:
print("Camera 1 opened!")
# Set camera resolution to 640x480
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
# Check if the camera opened successfully
if not self.cap.isOpened():
self.get_logger().error("Could not open camera.")
raise RuntimeError("Could not open camera.")
self.get_logger().info("Camera publisher node has started.")
def timer_callback(self):
ret, frame = self.cap.read()
if ret:
# Display the frame for deubugging
cv2.imshow("Camera Feed", frame)
cv2.waitKey(1) # Needed to update the OpenCV window
# Publish to ROS2
img_msg = self.br.cv2_to_imgmsg(frame, encoding='bgr8')
self.publisher_.publish(img_msg)
else:
self.get_logger().warning("Failed to capture image from camera.")
def destroy_node(self):
self.cap.release()
cv2.destroyAllWindows()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = CameraPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
greyscale camera subscriber
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
class GreyscaleImageSubscriber(Node):
def __init__(self):
super().__init__('greyscale_camera_subscriber')
self.subscription = self.create_subscription(
Image,
'/camera/image_raw',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
self.br = CvBridge()
self.get_logger().info("Greyscale image subscriber node has started.")
def listener_callback(self, msg):
try:
# Convert ROS Image message to OpenCV image
cv_image = self.br.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Convert to greyscale
grey_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
# Display for debugging
cv2.imshow("Greyscale Camera Feed", grey_image)
cv2.waitKey(1)
except Exception as e:
self.get_logger().error('Could not convert image: %r' % (e,))
def destroy_node(self):
cv2.destroyAllWindows()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = GreyscaleImageSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
to run the code, make sure you have modify the entry points part in setup.py.
entry_points={
'console_scripts': [
'camera_publisher = camera_feed.camera_publisher:main',
'greyscale_image_subscriber = camera_feed.greyscale_camera_subscriber:main',
],
},
Last modified: 12 June 2025