Skip to main content

ROS 2 Nodes and Python Agent Integration

Learning Objectives

By the end of this chapter, you will be able to:

  • Create and run ROS 2 nodes using the rclpy Python client library
  • Publish and subscribe to topics from Python AI agents
  • Use services for command-and-control workflows between AI agents and robot controllers
  • Bridge LLM/AI logic to ROS-based robot controllers

Creating ROS 2 Nodes Using rclpy

The Python client library for ROS 2, rclpy, provides the interface between Python programs and the ROS 2 middleware. Let's start by understanding how to create a basic ROS 2 node in Python.

Basic Node Structure

Here's the minimal structure for a ROS 2 Python node:

import rclpy
from rclpy.node import Node

class MinimalNode(Node):
def __init__(self):
super().__init__('minimal_node')
self.get_logger().info('Hello from minimal node!')

def main(args=None):
rclpy.init(args=args)
node = MinimalNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Node Lifecycle

A ROS 2 node follows a specific lifecycle:

  1. Initialization: Node is created with a unique name
  2. Configuration: Publishers, subscribers, services, etc. are created
  3. Activation: Node becomes active and can communicate
  4. Spinning: Node processes callbacks and communications
  5. Shutdown: Resources are cleaned up

Creating Publishers and Subscribers

Here's how to create a node with both publishing and subscribing capabilities:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0

def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1

class ListenerNode(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning

def listener_callback(self, msg):
self.get_logger().info(f'I heard: "{msg.data}"')

def main(args=None):
rclpy.init(args=args)

talker = TalkerNode()
listener = ListenerNode()

try:
rclpy.spin(talker)
except KeyboardInterrupt:
pass
finally:
talker.destroy_node()
rclpy.shutdown()

Publishing and Subscribing to Topics from Python AI Agents

Publisher Pattern for AI Agents

When integrating AI agents with ROS 2, publishers are often used to send AI decisions or commands to robot controllers:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
import random

class AIAgentNode(Node):
def __init__(self):
super().__init__('ai_agent')

# Publisher for robot commands
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)

# Publisher for AI decisions
self.decision_publisher = self.create_publisher(String, '/ai_decision', 10)

# Timer to simulate AI decision making
self.timer = self.create_timer(1.0, self.make_decision)

self.get_logger().info('AI Agent node started')

def make_decision(self):
# Simulate AI decision making
decision = self.simple_navigation_logic()

# Publish decision
decision_msg = String()
decision_msg.data = decision
self.decision_publisher.publish(decision_msg)

# Convert decision to robot command
cmd_msg = self.decision_to_command(decision)
self.cmd_vel_publisher.publish(cmd_msg)

self.get_logger().info(f'AI Decision: {decision}')

def simple_navigation_logic(self):
# Simple navigation logic - in real AI agents this would be more complex
actions = ['move_forward', 'turn_left', 'turn_right', 'stop']
return random.choice(actions)

def decision_to_command(self, decision):
cmd = Twist()
if decision == 'move_forward':
cmd.linear.x = 0.5 # Move forward at 0.5 m/s
elif decision == 'turn_left':
cmd.angular.z = 0.5 # Turn left at 0.5 rad/s
elif decision == 'turn_right':
cmd.angular.z = -0.5 # Turn right at 0.5 rad/s
else: # stop
cmd.linear.x = 0.0
cmd.angular.z = 0.0
return cmd

def main(args=None):
rclpy.init(args=args)
ai_agent = AIAgentNode()

try:
rclpy.spin(ai_agent)
except KeyboardInterrupt:
pass
finally:
ai_agent.destroy_node()
rclpy.shutdown()

Subscriber Pattern for AI Agents

AI agents often need to subscribe to sensor data to make informed decisions:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import String

class AIPerceptionNode(Node):
def __init__(self):
super().__init__('ai_perception')

# Subscriber for laser scan data
self.scan_subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)

# Publisher for AI decisions
self.cmd_publisher = self.create_publisher(Twist, '/cmd_vel', 10)

# Publisher for AI state
self.state_publisher = self.create_publisher(String, '/ai_state', 10)

self.get_logger().info('AI Perception node started')

def scan_callback(self, msg):
# Process laser scan data to detect obstacles
min_distance = min(msg.ranges)

# Make AI decision based on sensor data
if min_distance < 0.5: # Obstacle within 0.5m
decision = "OBSTACLE_DETECTED"
cmd = self.avoid_obstacle()
else:
decision = "CLEAR_PATH"
cmd = self.move_forward()

# Publish AI decision
state_msg = String()
state_msg.data = decision
self.state_publisher.publish(state_msg)

# Publish robot command
self.cmd_publisher.publish(cmd)

self.get_logger().info(f'AI Decision: {decision}, Distance: {min_distance:.2f}m')

def avoid_obstacle(self):
cmd = Twist()
cmd.angular.z = 0.5 # Turn to avoid obstacle
return cmd

def move_forward(self):
cmd = Twist()
cmd.linear.x = 0.3 # Move forward at 0.3 m/s
return cmd

def main(args=None):
rclpy.init(args=args)
ai_perception = AIPerceptionNode()

try:
rclpy.spin(ai_perception)
except KeyboardInterrupt:
pass
finally:
ai_perception.destroy_node()
rclpy.shutdown()

Using Services for Command-and-Control Workflows

Services provide synchronous request-response communication, which is useful for command-and-control workflows:

Creating a Service Server

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)

def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(f'Incoming request\na: {request.a}, b: {request.b}\n')
return response

def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()

try:
rclpy.spin(minimal_service)
except KeyboardInterrupt:
pass
finally:
minimal_service.destroy_node()
rclpy.shutdown()

Creating a Service Client

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class MinimalClient(Node):
def __init__(self):
super().__init__('minimal_client')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.req = AddTwoInts.Request()

def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()

def main(args=None):
rclpy.init(args=args)
minimal_client = MinimalClient()
response = minimal_client.send_request(1, 2)
minimal_client.get_logger().info(f'Result of add_two_ints: {response.sum}')

minimal_client.destroy_node()
rclpy.shutdown()

AI Service Integration

Here's an example of how AI agents can use services for command-and-control:

import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from example_interfaces.srv import Trigger
from std_msgs.msg import String

class AICommandNode(Node):
def __init__(self):
super().__init__('ai_command')

# Service clients for robot control
self.emergency_stop_client = self.create_client(
SetBool, '/emergency_stop')
self.navigation_start_client = self.create_client(
Trigger, '/start_navigation')
self.navigation_stop_client = self.create_client(
Trigger, '/stop_navigation')

# Publisher for AI state
self.state_publisher = self.create_publisher(String, '/ai_state', 10)

# Wait for services to be available
while not self.emergency_stop_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Emergency stop service not available, waiting...')

self.get_logger().info('AI Command node ready')

def request_emergency_stop(self):
request = SetBool.Request()
request.data = True # Activate emergency stop
future = self.emergency_stop_client.call_async(request)
return future

def start_navigation(self):
request = Trigger.Request()
future = self.navigation_start_client.call_async(request)
return future

def stop_navigation(self):
request = Trigger.Request()
future = self.navigation_stop_client.call_async(request)
return future

def main(args=None):
rclpy.init(args=args)
ai_command = AICommandNode()

# Example: Start navigation
future = ai_command.start_navigation()
rclpy.spin_until_future_complete(ai_command, future)

if future.result() is not None:
result = future.result()
ai_command.get_logger().info(f'Navigation start result: {result.success}')
else:
ai_command.get_logger().error('Navigation start service call failed')

ai_command.destroy_node()
rclpy.shutdown()

Bridging LLM/AI Logic to ROS-Based Robot Controllers

Simple LLM Integration Example

Here's an example of how to integrate Large Language Model (LLM) responses with ROS-based robot controllers:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
import requests # For API calls to LLM services

class LLMRobotController(Node):
def __init__(self):
super().__init__('llm_robot_controller')

# Publisher for robot commands
self.cmd_publisher = self.create_publisher(Twist, '/cmd_vel', 10)

# Publisher for AI decisions
self.decision_publisher = self.create_publisher(String, '/llm_decision', 10)

# Subscriber for commands or queries
self.command_subscription = self.create_subscription(
String,
'/user_command',
self.command_callback,
10)

self.get_logger().info('LLM Robot Controller node started')

def command_callback(self, msg):
# Process user command through LLM
llm_response = self.query_llm(msg.data)

# Convert LLM response to robot action
robot_action = self.llm_response_to_robot_action(llm_response)

# Execute robot action
self.execute_robot_action(robot_action)

# Publish decision
decision_msg = String()
decision_msg.data = f'Action: {robot_action}, Command: {msg.data}'
self.decision_publisher.publish(decision_msg)

def query_llm(self, user_command):
# This is a simplified example - in practice you'd use an actual LLM API
# like OpenAI, Hugging Face, or a local LLM service
mock_responses = {
'move forward': 'FORWARD',
'turn left': 'LEFT',
'turn right': 'RIGHT',
'stop': 'STOP',
'dance': 'DANCE'
}

command_lower = user_command.lower()
for key, value in mock_responses.items():
if key in command_lower:
return value

return 'STOP' # Default action

def llm_response_to_robot_action(self, llm_response):
action_map = {
'FORWARD': 'move_forward',
'LEFT': 'turn_left',
'RIGHT': 'turn_right',
'STOP': 'stop',
'DANCE': 'dance'
}
return action_map.get(llm_response, 'stop')

def execute_robot_action(self, action):
cmd = Twist()

if action == 'move_forward':
cmd.linear.x = 0.5
elif action == 'turn_left':
cmd.angular.z = 0.5
elif action == 'turn_right':
cmd.angular.z = -0.5
elif action == 'dance':
cmd.linear.x = 0.3
cmd.angular.z = 1.0
else: # stop
cmd.linear.x = 0.0
cmd.angular.z = 0.0

self.cmd_publisher.publish(cmd)
self.get_logger().info(f'Executing action: {action}')

def main(args=None):
rclpy.init(args=args)
llm_controller = LLMRobotController()

try:
rclpy.spin(llm_controller)
except KeyboardInterrupt:
pass
finally:
llm_controller.destroy_node()
rclpy.shutdown()

Advanced AI Integration Pattern

For more complex AI integration, consider this pattern:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
import numpy as np
import time

class AdvancedAIController(Node):
def __init__(self):
super().__init__('advanced_ai_controller')

# Publishers
self.cmd_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
self.state_publisher = self.create_publisher(String, '/ai_state', 10)
self.confidence_publisher = self.create_publisher(Float32, '/ai_confidence', 10)

# Subscribers
self.scan_subscription = self.create_subscription(
LaserScan, '/scan', self.scan_callback, 10)
self.goal_subscription = self.create_subscription(
String, '/navigation_goal', self.goal_callback, 10)

# Internal state
self.scan_data = None
self.current_goal = None
self.ai_state = "IDLE"

# AI processing timer
self.ai_timer = self.create_timer(0.1, self.ai_processing_callback)

self.get_logger().info('Advanced AI Controller node started')

def scan_callback(self, msg):
self.scan_data = msg.ranges

def goal_callback(self, msg):
self.current_goal = msg.data
self.ai_state = "NAVIGATING"

def ai_processing_callback(self):
if self.scan_data is not None and self.current_goal is not None:
# Simulate AI processing
ai_decision = self.process_ai_logic()

# Publish command based on AI decision
cmd = self.decision_to_command(ai_decision)
self.cmd_publisher.publish(cmd)

# Publish AI state and confidence
state_msg = String()
state_msg.data = f'State: {self.ai_state}, Goal: {self.current_goal}'
self.state_publisher.publish(state_msg)

confidence_msg = Float32()
confidence_msg.data = 0.95 # Simulated high confidence
self.confidence_publisher.publish(confidence_msg)

def process_ai_logic(self):
# This is where complex AI logic would go
# For this example, we'll implement simple obstacle avoidance
if self.scan_data:
min_distance = min(self.scan_data) if self.scan_data else float('inf')

if min_distance < 0.5: # Obstacle detected
return "AVOID_OBSTACLE"
elif self.current_goal == "explore":
return "EXPLORE_FORWARD"
else:
return "MOVE_TO_GOAL"
return "IDLE"

def decision_to_command(self, decision):
cmd = Twist()

if decision == "AVOID_OBSTACLE":
cmd.angular.z = 0.5 # Turn to avoid
elif decision == "EXPLORE_FORWARD":
cmd.linear.x = 0.3 # Move forward
elif decision == "MOVE_TO_GOAL":
cmd.linear.x = 0.5 # Move toward goal
else: # IDLE
cmd.linear.x = 0.0
cmd.angular.z = 0.0

return cmd

def main(args=None):
rclpy.init(args=args)
ai_controller = AdvancedAIController()

try:
rclpy.spin(ai_controller)
except KeyboardInterrupt:
pass
finally:
ai_controller.destroy_node()
rclpy.shutdown()

Best Practices for AI-ROS Integration

Error Handling

  • Always check if services are available before calling them
  • Implement timeouts for service calls
  • Handle message serialization/deserialization errors
  • Gracefully degrade when AI components fail

Performance Considerations

  • Use appropriate QoS settings for different message types
  • Consider message frequency and bandwidth requirements
  • Implement caching for expensive AI computations
  • Use threading for computationally intensive AI tasks

Safety Considerations

  • Implement safety checks before executing AI decisions
  • Use emergency stop services when needed
  • Validate AI outputs before sending to robot controllers
  • Implement monitoring and logging for AI decisions

Summary

This chapter has covered the essential techniques for integrating Python-based AI agents with ROS-based robot controllers. You've learned how to create ROS 2 nodes using rclpy, implement publisher-subscriber patterns for sensor integration and command output, use services for synchronous command-and-control workflows, and bridge LLM/AI logic to robot controllers.

In the next chapter, we'll explore how to model humanoid robots using URDF (Unified Robot Description Format).