ros1-development

ROS1 Development Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "ros1-development" with this command: npx skills add arpitg1304/robotics-agent-skills/arpitg1304-robotics-agent-skills-ros1-development

ROS1 Development Skill

When to Use This Skill

  • Building or maintaining ROS1 packages and nodes

  • Writing launch files, message types, or services

  • Debugging ROS1 communication (topics, services, actions)

  • Configuring catkin workspaces and build systems

  • Working with tf/tf2 transforms, URDF, or robot models

  • Using actionlib for long-running tasks

  • Optimizing nodelets for zero-copy transport

  • Planning ROS1 → ROS2 migration

Core Architecture Principles

  1. Node Design

Single Responsibility Nodes: Each node should do ONE thing well. Resist the temptation to build monolithic "do-everything" nodes.

BAD: Monolithic node

class RobotNode: def init(self): self.sub_camera = rospy.Subscriber('/camera/image', Image, self.camera_cb) self.sub_lidar = rospy.Subscriber('/lidar/points', PointCloud2, self.lidar_cb) self.pub_cmd = rospy.Publisher('/cmd_vel', Twist, queue_size=10) self.pub_map = rospy.Publisher('/map', OccupancyGrid, queue_size=1) # This node does perception, planning, AND control

GOOD: Decomposed nodes

class PerceptionNode: # Fuses sensor data → publishes /obstacles class PlannerNode: # Subscribes /obstacles → publishes /path class ControllerNode: # Subscribes /path → publishes /cmd_vel

Node Initialization Pattern:

#!/usr/bin/env python import rospy from std_msgs.msg import String

class MyNode: def init(self): rospy.init_node('my_node', anonymous=False)

    # 1. Load parameters FIRST
    self.rate = rospy.get_param('~rate', 10.0)
    self.frame_id = rospy.get_param('~frame_id', 'base_link')

    # 2. Set up publishers BEFORE subscribers
    #    (prevents callbacks firing before publisher is ready)
    self.pub = rospy.Publisher('~output', String, queue_size=10)

    # 3. Set up subscribers LAST
    self.sub = rospy.Subscriber('~input', String, self.callback)

    rospy.loginfo(f"[{rospy.get_name()}] Initialized with rate={self.rate}")

def callback(self, msg):
    # Process and republish
    result = String(data=msg.data.upper())
    self.pub.publish(result)

def run(self):
    rate = rospy.Rate(self.rate)
    while not rospy.is_shutdown():
        # Periodic work here
        rate.sleep()

if name == 'main': try: node = MyNode() node.run() except rospy.ROSInterruptException: pass

  1. Topic Design

Naming Conventions:

/robot_name/sensor_type/data_type

Examples:

/ur5/joint_states # Robot joint states /realsense/color/image_raw # Camera color image /realsense/depth/points # Depth point cloud /mobile_base/cmd_vel # Velocity commands /gripper/command # Gripper commands

Queue Sizes Matter:

For sensor data (high frequency, OK to drop old messages):

rospy.Subscriber('/camera/image', Image, self.cb, queue_size=1)

For commands (don't want to miss any):

rospy.Publisher('/cmd_vel', Twist, queue_size=10)

For large data (point clouds, images) - use small queues to prevent memory bloat:

rospy.Subscriber('/lidar/points', PointCloud2, self.cb, queue_size=1)

NEVER use queue_size=0 (infinite) for high-frequency topics

This WILL cause memory leaks under load

Latched Topics for data that changes infrequently:

Robot description, static maps, calibration data

pub = rospy.Publisher('/robot_description', String, queue_size=1, latch=True)

  1. Launch File Best Practices

<launch> <!-- ALWAYS use args for configurability --> <arg name="robot_name" default="ur5"/> <arg name="sim" default="false"/> <arg name="debug" default="false"/>

<!-- Group by subsystem with namespaces --> <group ns="$(arg robot_name)">

&#x3C;!-- Conditional loading based on sim vs real -->
&#x3C;group if="$(arg sim)">
  &#x3C;include file="$(find my_pkg)/launch/sim_drivers.launch"/>
&#x3C;/group>
&#x3C;group unless="$(arg sim)">
  &#x3C;include file="$(find my_pkg)/launch/real_drivers.launch"/>
&#x3C;/group>

&#x3C;!-- Node with proper remapping -->
&#x3C;node pkg="my_pkg" type="perception_node.py" name="perception"
      output="screen" respawn="true" respawn_delay="5">
  &#x3C;param name="rate" value="30.0"/>
  &#x3C;param name="frame_id" value="$(arg robot_name)_base_link"/>
  &#x3C;remap from="~input_image" to="/$(arg robot_name)/camera/image_raw"/>
  &#x3C;remap from="~output_detections" to="detections"/>
  &#x3C;!-- Load a YAML param file -->
  &#x3C;rosparam file="$(find my_pkg)/config/perception.yaml" command="load"/>
&#x3C;/node>

</group>

<!-- Debug tools (conditionally loaded) --> <group if="$(arg debug)"> <node pkg="rviz" type="rviz" name="rviz" args="-d $(find my_pkg)/rviz/debug.rviz"/> <node pkg="rqt_graph" type="rqt_graph" name="rqt_graph"/> </group> </launch>

  1. TF Transform Tree

Rules:

  • Every frame has EXACTLY one parent (tree, not graph)

  • Static transforms use static_transform_publisher

  • Dynamic transforms publish at consistent rates

  • ALWAYS set timestamps correctly

import tf2_ros

Publishing transforms

br = tf2_ros.TransformBroadcaster() t = TransformStamped() t.header.stamp = rospy.Time.now() # CRITICAL: Use current time t.header.frame_id = "odom" t.child_frame_id = "base_link" t.transform.translation.x = x t.transform.translation.y = y t.transform.rotation = quaternion_from_euler(0, 0, theta) br.sendTransform(t)

Listening for transforms (with timeout and exception handling)

tf_buffer = tf2_ros.Buffer() listener = tf2_ros.TransformListener(tf_buffer)

try: trans = tf_buffer.lookup_transform( 'map', 'base_link', rospy.Time(0), # Get latest available rospy.Duration(1.0) # Wait up to 1 second ) except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException) as e: rospy.logwarn(f"TF lookup failed: {e}")

  1. Actionlib for Long-Running Tasks

import actionlib from my_msgs.msg import PickPlaceAction, PickPlaceGoal, PickPlaceResult

Server

class PickPlaceServer: def init(self): self.server = actionlib.SimpleActionServer( 'pick_place', PickPlaceAction, execute_cb=self.execute, auto_start=False # ALWAYS set auto_start=False ) self.server.start()

def execute(self, goal):
    feedback = PickPlaceFeedback()

    # Check for preemption INSIDE your loop
    for step in self.plan_steps(goal):
        if self.server.is_preempt_requested():
            self.server.set_preempted()
            return
        self.execute_step(step)
        feedback.progress = step.progress
        self.server.publish_feedback(feedback)

    result = PickPlaceResult(success=True)
    self.server.set_succeeded(result)

Common Pitfalls & Failure Modes

Time Synchronization

BAD: Comparing timestamps from different clocks

if camera_msg.header.stamp == lidar_msg.header.stamp: # Almost never true

GOOD: Use message_filters for approximate time sync

import message_filters sub_cam = message_filters.Subscriber('/camera/image', Image) sub_lidar = message_filters.Subscriber('/lidar/points', PointCloud2) sync = message_filters.ApproximateTimeSynchronizer( [sub_cam, sub_lidar], queue_size=10, slop=0.05 # 50ms tolerance ) sync.registerCallback(self.synced_callback)

Callback Threading

ROS1 uses a single-threaded spinner by default.

Long-running callbacks BLOCK all other callbacks.

BAD:

def callback(self, msg): result = self.expensive_computation(msg) # Blocks for 2 seconds! self.pub.publish(result)

GOOD: Use a MultiThreadedSpinner or process in a separate thread

rospy.init_node('my_node')

... setup ...

spinner = rospy.MultiThreadedSpinner(num_threads=4) spinner.spin()

Or use a processing thread:

import threading, queue class MyNode: def init(self): self.work_queue = queue.Queue(maxsize=1) self.worker = threading.Thread(target=self._process_loop, daemon=True) self.worker.start()

def callback(self, msg):
    try:
        self.work_queue.put_nowait(msg)  # Non-blocking
    except queue.Full:
        pass  # Drop old data

def _process_loop(self):
    while not rospy.is_shutdown():
        msg = self.work_queue.get()
        result = self.expensive_computation(msg)
        self.pub.publish(result)

Parameter Server Anti-Patterns

BAD: Hardcoded values

self.threshold = 0.5

BAD: Global params without namespace

self.threshold = rospy.get_param('threshold', 0.5) # Collides across nodes

GOOD: Private params with defaults

self.threshold = rospy.get_param('~threshold', 0.5)

GOOD: Dynamic reconfigure for runtime tuning

from dynamic_reconfigure.server import Server from my_pkg.cfg import MyNodeConfig self.dyn_server = Server(MyNodeConfig, self.dyn_callback)

Nodelets for Zero-Copy Transport

When nodes exchange large data (images, point clouds) within the same process, nodelets eliminate serialization overhead:

// my_nodelet.h #include <nodelet/nodelet.h> #include <pluginlib/class_list_macros.h>

class MyNodelet : public nodelet::Nodelet { virtual void onInit() { ros::NodeHandle& nh = getNodeHandle(); ros::NodeHandle& pnh = getPrivateNodeHandle(); // Use shared_ptr for zero-copy: pass pointers, not copies pub_ = nh.advertise<sensor_msgs::Image>("output", 1); sub_ = nh.subscribe("input", 1, &MyNodelet::callback, this); } }; PLUGINLIB_EXPORT_CLASS(MyNodelet, nodelet::Nodelet)

Package Structure

my_robot_pkg/ ├── CMakeLists.txt ├── package.xml ├── setup.py # For Python packages ├── config/ │ ├── robot_params.yaml # Default parameters │ └── dynamic_reconfigure/ # .cfg files ├── launch/ │ ├── robot.launch # Top-level launcher │ ├── drivers.launch # Hardware drivers │ └── perception.launch # Perception pipeline ├── msg/ # Custom message definitions │ └── Detection.msg ├── srv/ # Service definitions │ └── GetPose.srv ├── action/ # Action definitions │ └── PickPlace.action ├── src/ # C++ source │ └── my_node.cpp ├── scripts/ # Python nodes (executable) │ └── perception_node.py ├── include/my_robot_pkg/ # C++ headers │ └── my_node.h ├── rviz/ # RViz configs │ └── debug.rviz ├── urdf/ # Robot model │ └── robot.urdf.xacro └── test/ # Unit and integration tests ├── test_perception.py └── test_perception.test # rostest launch file

Debugging Toolkit

Essential diagnostic commands

rostopic list # See all active topics rostopic hz /camera/image_raw # Check publish rate rostopic bw /lidar/points # Check bandwidth rostopic echo /joint_states -n 1 # Inspect one message

rosnode list # Active nodes rosnode info /perception # Connections and subscriptions

roswtf # Automated diagnostics

rqt_graph # Visual node/topic graph rqt_console # Log viewer with filtering

TF debugging

rosrun tf tf_monitor # Monitor TF tree health rosrun tf view_frames # Generate TF tree PDF rosrun tf tf_echo map base_link # Print transform continuously

Bag file operations

rosbag record -a # Record everything (careful with disk!) rosbag record /camera/image /tf # Record specific topics rosbag info recording.bag # Inspect bag contents rosbag play recording.bag --clock # Playback with simulated time

ROS1 → ROS2 Migration Checklist

When planning a migration, note these key differences:

  • rospy → rclpy , roscpp → rclcpp

  • catkin_make → colcon build

  • roslaunch XML → ROS2 Python launch files

  • Global parameter server → Per-node parameters

  • rospy.Rate → node.create_timer()

  • Single roscore → DDS discovery (no central master)

  • message_filters works in both, but API differs

  • Custom messages: same .msg format, different build system

  • Nodelets → ROS2 Components (intra-process communication)

  • dynamic_reconfigure → ROS2 parameters with callbacks

Start migration from leaf nodes (sensors, actuators) and work inward. Use the ros1_bridge package to run both stacks simultaneously during transition.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

ros2-development

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

docker-ros2-development

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

robotics-design-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

robotics-testing

No summary provided by upstream source.

Repository SourceNeeds Review