ROS 2 Nodes and Python Agent Integration
Learning Objectives
By the end of this chapter, you will be able to:
- Create and run ROS 2 nodes using the rclpy Python client library
- Publish and subscribe to topics from Python AI agents
- Use services for command-and-control workflows between AI agents and robot controllers
- Bridge LLM/AI logic to ROS-based robot controllers
Creating ROS 2 Nodes Using rclpy
The Python client library for ROS 2, rclpy, provides the interface between Python programs and the ROS 2 middleware. Let's start by understanding how to create a basic ROS 2 node in Python.
Basic Node Structure
Here's the minimal structure for a ROS 2 Python node:
import rclpy
from rclpy.node import Node
class MinimalNode(Node):
def __init__(self):
super().__init__('minimal_node')
self.get_logger().info('Hello from minimal node!')
def main(args=None):
rclpy.init(args=args)
node = MinimalNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Node Lifecycle
A ROS 2 node follows a specific lifecycle:
- Initialization: Node is created with a unique name
- Configuration: Publishers, subscribers, services, etc. are created
- Activation: Node becomes active and can communicate
- Spinning: Node processes callbacks and communications
- Shutdown: Resources are cleaned up
Creating Publishers and Subscribers
Here's how to create a node with both publishing and subscribing capabilities:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.i}'
self.publisher.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
class ListenerNode(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
talker = TalkerNode()
listener = ListenerNode()
try:
rclpy.spin(talker)
except KeyboardInterrupt:
pass
finally:
talker.destroy_node()
rclpy.shutdown()
Publishing and Subscribing to Topics from Python AI Agents
Publisher Pattern for AI Agents
When integrating AI agents with ROS 2, publishers are often used to send AI decisions or commands to robot controllers:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
import random
class AIAgentNode(Node):
def __init__(self):
super().__init__('ai_agent')
# Publisher for robot commands
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
# Publisher for AI decisions
self.decision_publisher = self.create_publisher(String, '/ai_decision', 10)
# Timer to simulate AI decision making
self.timer = self.create_timer(1.0, self.make_decision)
self.get_logger().info('AI Agent node started')
def make_decision(self):
# Simulate AI decision making
decision = self.simple_navigation_logic()
# Publish decision
decision_msg = String()
decision_msg.data = decision
self.decision_publisher.publish(decision_msg)
# Convert decision to robot command
cmd_msg = self.decision_to_command(decision)
self.cmd_vel_publisher.publish(cmd_msg)
self.get_logger().info(f'AI Decision: {decision}')
def simple_navigation_logic(self):
# Simple navigation logic - in real AI agents this would be more complex
actions = ['move_forward', 'turn_left', 'turn_right', 'stop']
return random.choice(actions)
def decision_to_command(self, decision):
cmd = Twist()
if decision == 'move_forward':
cmd.linear.x = 0.5 # Move forward at 0.5 m/s
elif decision == 'turn_left':
cmd.angular.z = 0.5 # Turn left at 0.5 rad/s
elif decision == 'turn_right':
cmd.angular.z = -0.5 # Turn right at 0.5 rad/s
else: # stop
cmd.linear.x = 0.0
cmd.angular.z = 0.0
return cmd
def main(args=None):
rclpy.init(args=args)
ai_agent = AIAgentNode()
try:
rclpy.spin(ai_agent)
except KeyboardInterrupt:
pass
finally:
ai_agent.destroy_node()
rclpy.shutdown()
Subscriber Pattern for AI Agents
AI agents often need to subscribe to sensor data to make informed decisions:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import String
class AIPerceptionNode(Node):
def __init__(self):
super().__init__('ai_perception')
# Subscriber for laser scan data
self.scan_subscription = self.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10)
# Publisher for AI decisions
self.cmd_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
# Publisher for AI state
self.state_publisher = self.create_publisher(String, '/ai_state', 10)
self.get_logger().info('AI Perception node started')
def scan_callback(self, msg):
# Process laser scan data to detect obstacles
min_distance = min(msg.ranges)
# Make AI decision based on sensor data
if min_distance < 0.5: # Obstacle within 0.5m
decision = "OBSTACLE_DETECTED"
cmd = self.avoid_obstacle()
else:
decision = "CLEAR_PATH"
cmd = self.move_forward()
# Publish AI decision
state_msg = String()
state_msg.data = decision
self.state_publisher.publish(state_msg)
# Publish robot command
self.cmd_publisher.publish(cmd)
self.get_logger().info(f'AI Decision: {decision}, Distance: {min_distance:.2f}m')
def avoid_obstacle(self):
cmd = Twist()
cmd.angular.z = 0.5 # Turn to avoid obstacle
return cmd
def move_forward(self):
cmd = Twist()
cmd.linear.x = 0.3 # Move forward at 0.3 m/s
return cmd
def main(args=None):
rclpy.init(args=args)
ai_perception = AIPerceptionNode()
try:
rclpy.spin(ai_perception)
except KeyboardInterrupt:
pass
finally:
ai_perception.destroy_node()
rclpy.shutdown()
Using Services for Command-and-Control Workflows
Services provide synchronous request-response communication, which is useful for command-and-control workflows:
Creating a Service Server
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(f'Incoming request\na: {request.a}, b: {request.b}\n')
return response
def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()
try:
rclpy.spin(minimal_service)
except KeyboardInterrupt:
pass
finally:
minimal_service.destroy_node()
rclpy.shutdown()
Creating a Service Client
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class MinimalClient(Node):
def __init__(self):
super().__init__('minimal_client')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
def main(args=None):
rclpy.init(args=args)
minimal_client = MinimalClient()
response = minimal_client.send_request(1, 2)
minimal_client.get_logger().info(f'Result of add_two_ints: {response.sum}')
minimal_client.destroy_node()
rclpy.shutdown()
AI Service Integration
Here's an example of how AI agents can use services for command-and-control:
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from example_interfaces.srv import Trigger
from std_msgs.msg import String
class AICommandNode(Node):
def __init__(self):
super().__init__('ai_command')
# Service clients for robot control
self.emergency_stop_client = self.create_client(
SetBool, '/emergency_stop')
self.navigation_start_client = self.create_client(
Trigger, '/start_navigation')
self.navigation_stop_client = self.create_client(
Trigger, '/stop_navigation')
# Publisher for AI state
self.state_publisher = self.create_publisher(String, '/ai_state', 10)
# Wait for services to be available
while not self.emergency_stop_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Emergency stop service not available, waiting...')
self.get_logger().info('AI Command node ready')
def request_emergency_stop(self):
request = SetBool.Request()
request.data = True # Activate emergency stop
future = self.emergency_stop_client.call_async(request)
return future
def start_navigation(self):
request = Trigger.Request()
future = self.navigation_start_client.call_async(request)
return future
def stop_navigation(self):
request = Trigger.Request()
future = self.navigation_stop_client.call_async(request)
return future
def main(args=None):
rclpy.init(args=args)
ai_command = AICommandNode()
# Example: Start navigation
future = ai_command.start_navigation()
rclpy.spin_until_future_complete(ai_command, future)
if future.result() is not None:
result = future.result()
ai_command.get_logger().info(f'Navigation start result: {result.success}')
else:
ai_command.get_logger().error('Navigation start service call failed')
ai_command.destroy_node()
rclpy.shutdown()
Bridging LLM/AI Logic to ROS-Based Robot Controllers
Simple LLM Integration Example
Here's an example of how to integrate Large Language Model (LLM) responses with ROS-based robot controllers:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
import requests # For API calls to LLM services
class LLMRobotController(Node):
def __init__(self):
super().__init__('llm_robot_controller')
# Publisher for robot commands
self.cmd_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
# Publisher for AI decisions
self.decision_publisher = self.create_publisher(String, '/llm_decision', 10)
# Subscriber for commands or queries
self.command_subscription = self.create_subscription(
String,
'/user_command',
self.command_callback,
10)
self.get_logger().info('LLM Robot Controller node started')
def command_callback(self, msg):
# Process user command through LLM
llm_response = self.query_llm(msg.data)
# Convert LLM response to robot action
robot_action = self.llm_response_to_robot_action(llm_response)
# Execute robot action
self.execute_robot_action(robot_action)
# Publish decision
decision_msg = String()
decision_msg.data = f'Action: {robot_action}, Command: {msg.data}'
self.decision_publisher.publish(decision_msg)
def query_llm(self, user_command):
# This is a simplified example - in practice you'd use an actual LLM API
# like OpenAI, Hugging Face, or a local LLM service
mock_responses = {
'move forward': 'FORWARD',
'turn left': 'LEFT',
'turn right': 'RIGHT',
'stop': 'STOP',
'dance': 'DANCE'
}
command_lower = user_command.lower()
for key, value in mock_responses.items():
if key in command_lower:
return value
return 'STOP' # Default action
def llm_response_to_robot_action(self, llm_response):
action_map = {
'FORWARD': 'move_forward',
'LEFT': 'turn_left',
'RIGHT': 'turn_right',
'STOP': 'stop',
'DANCE': 'dance'
}
return action_map.get(llm_response, 'stop')
def execute_robot_action(self, action):
cmd = Twist()
if action == 'move_forward':
cmd.linear.x = 0.5
elif action == 'turn_left':
cmd.angular.z = 0.5
elif action == 'turn_right':
cmd.angular.z = -0.5
elif action == 'dance':
cmd.linear.x = 0.3
cmd.angular.z = 1.0
else: # stop
cmd.linear.x = 0.0
cmd.angular.z = 0.0
self.cmd_publisher.publish(cmd)
self.get_logger().info(f'Executing action: {action}')
def main(args=None):
rclpy.init(args=args)
llm_controller = LLMRobotController()
try:
rclpy.spin(llm_controller)
except KeyboardInterrupt:
pass
finally:
llm_controller.destroy_node()
rclpy.shutdown()
Advanced AI Integration Pattern
For more complex AI integration, consider this pattern:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
import numpy as np
import time
class AdvancedAIController(Node):
def __init__(self):
super().__init__('advanced_ai_controller')
# Publishers
self.cmd_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
self.state_publisher = self.create_publisher(String, '/ai_state', 10)
self.confidence_publisher = self.create_publisher(Float32, '/ai_confidence', 10)
# Subscribers
self.scan_subscription = self.create_subscription(
LaserScan, '/scan', self.scan_callback, 10)
self.goal_subscription = self.create_subscription(
String, '/navigation_goal', self.goal_callback, 10)
# Internal state
self.scan_data = None
self.current_goal = None
self.ai_state = "IDLE"
# AI processing timer
self.ai_timer = self.create_timer(0.1, self.ai_processing_callback)
self.get_logger().info('Advanced AI Controller node started')
def scan_callback(self, msg):
self.scan_data = msg.ranges
def goal_callback(self, msg):
self.current_goal = msg.data
self.ai_state = "NAVIGATING"
def ai_processing_callback(self):
if self.scan_data is not None and self.current_goal is not None:
# Simulate AI processing
ai_decision = self.process_ai_logic()
# Publish command based on AI decision
cmd = self.decision_to_command(ai_decision)
self.cmd_publisher.publish(cmd)
# Publish AI state and confidence
state_msg = String()
state_msg.data = f'State: {self.ai_state}, Goal: {self.current_goal}'
self.state_publisher.publish(state_msg)
confidence_msg = Float32()
confidence_msg.data = 0.95 # Simulated high confidence
self.confidence_publisher.publish(confidence_msg)
def process_ai_logic(self):
# This is where complex AI logic would go
# For this example, we'll implement simple obstacle avoidance
if self.scan_data:
min_distance = min(self.scan_data) if self.scan_data else float('inf')
if min_distance < 0.5: # Obstacle detected
return "AVOID_OBSTACLE"
elif self.current_goal == "explore":
return "EXPLORE_FORWARD"
else:
return "MOVE_TO_GOAL"
return "IDLE"
def decision_to_command(self, decision):
cmd = Twist()
if decision == "AVOID_OBSTACLE":
cmd.angular.z = 0.5 # Turn to avoid
elif decision == "EXPLORE_FORWARD":
cmd.linear.x = 0.3 # Move forward
elif decision == "MOVE_TO_GOAL":
cmd.linear.x = 0.5 # Move toward goal
else: # IDLE
cmd.linear.x = 0.0
cmd.angular.z = 0.0
return cmd
def main(args=None):
rclpy.init(args=args)
ai_controller = AdvancedAIController()
try:
rclpy.spin(ai_controller)
except KeyboardInterrupt:
pass
finally:
ai_controller.destroy_node()
rclpy.shutdown()
Best Practices for AI-ROS Integration
Error Handling
- Always check if services are available before calling them
- Implement timeouts for service calls
- Handle message serialization/deserialization errors
- Gracefully degrade when AI components fail
Performance Considerations
- Use appropriate QoS settings for different message types
- Consider message frequency and bandwidth requirements
- Implement caching for expensive AI computations
- Use threading for computationally intensive AI tasks
Safety Considerations
- Implement safety checks before executing AI decisions
- Use emergency stop services when needed
- Validate AI outputs before sending to robot controllers
- Implement monitoring and logging for AI decisions
Summary
This chapter has covered the essential techniques for integrating Python-based AI agents with ROS-based robot controllers. You've learned how to create ROS 2 nodes using rclpy, implement publisher-subscriber patterns for sensor integration and command output, use services for synchronous command-and-control workflows, and bridge LLM/AI logic to robot controllers.
In the next chapter, we'll explore how to model humanoid robots using URDF (Unified Robot Description Format).