Planner Integration
This guide shows how to integrate your MAPF planner with SMART.
Overview
SMART is designed to work with any MAPF planner that produces timestamped paths. The integration process:
Run your planner on a map/scenario
Convert output to SMART path format
Execute in SMART simulator
Analyze results
Planner Output Format
Your MAPF planner should output paths in the following formats:
Agent 0:(5,16,0)->(5,17,1)->(5,18,2)->(6,18,3)->...
Agent 1:(10,5,0)->(11,5,1)->(12,5,2)->...
Integration Template
Create a wrapper script for your planner:
# my_planner_wrapper.py
import subprocess
import json
from smart_client import SMARTClient
def run_my_planner(map_file, scen_file, num_agents):
"""
Run your MAPF planner.
Returns:
List of paths for each agent
"""
# Call your planner executable
result = subprocess.run([
'./my_planner',
'--map', map_file,
'--scen', scen_file,
'--agents', str(num_agents),
'--output', 'paths.json'
], capture_output=True)
# Parse planner output
with open('paths.json') as f:
planner_output = json.load(f)
# Convert to SMART format
paths = convert_to_smart_format(planner_output)
return paths
def convert_to_smart_format(planner_output):
"""
Convert your planner's output to SMART format.
"""
paths = []
for agent in planner_output['agents']:
path = []
for waypoint in agent['path']:
x, y, t = waypoint
path.append((x, y, t))
paths.append(path)
return paths
def evaluate_planner(map_file, scen_file, num_agents):
"""
Run planner and evaluate in SMART.
"""
# Run your planner
print(f"Running planner on {num_agents} agents...")
paths = run_my_planner(map_file, scen_file, num_agents)
# Connect to SMART
client = SMARTClient(port=8182)
client.load_simulation(map_file, scen_file, num_agents)
client.set_paths(paths)
# Run simulation
print("Executing in SMART simulator...")
stats = client.run(headless=True)
# Print results
print(f"Makespan: {stats['makespan']}")
print(f"Success rate: {stats['success_rate']}")
print(f"Throughput: {stats['throughput']:.2f} agents/sec")
return stats
if __name__ == '__main__':
stats = evaluate_planner(
'random-32-32-20.map',
'random-32-32-20-random-1.scen',
50
)
Common Planners
CBS (Conflict-Based Search)
# Assuming CBS outputs paths as list of lists
def convert_cbs_output(cbs_paths):
smart_paths = []
for agent_path in cbs_paths:
timestamped = []
for t, (x, y) in enumerate(agent_path):
timestamped.append((x, y, t))
smart_paths.append(timestamped)
return smart_paths
EECBS (Enhanced CBS)
from eecbs import EECBS
# Run EECBS
eecbs = EECBS()
solution = eecbs.solve(map_file, scen_file, num_agents)
# Convert to SMART format
paths = []
for agent_id, agent_solution in enumerate(solution):
path = [(x, y, t) for t, (x, y) in enumerate(agent_solution)]
paths.append(path)
MAPF-LNS (Large Neighborhood Search)
from mapf_lns import MAPFLNS
lns = MAPFLNS()
solution = lns.solve(
map_file=map_file,
scen_file=scen_file,
num_agents=num_agents,
time_limit=60
)
# Already in correct format
paths = solution['paths']
Batch Evaluation
Evaluate your planner across multiple scenarios:
import pandas as pd
scenarios = [
('random-32-32-20.map', 'random-32-32-20-random-1.scen', 50),
('random-32-32-20.map', 'random-32-32-20-random-2.scen', 100),
('maze-32-32-4.map', 'maze-32-32-4-random-1.scen', 50),
]
results = []
for map_file, scen_file, num_agents in scenarios:
print(f"\\nEvaluating: {scen_file} with {num_agents} agents")
stats = evaluate_planner(map_file, scen_file, num_agents)
results.append({
'map': map_file,
'scenario': scen_file,
'agents': num_agents,
'makespan': stats['makespan'],
'success_rate': stats['success_rate'],
'throughput': stats['throughput']
})
# Save results
df = pd.DataFrame(results)
df.to_csv('planner_evaluation.csv', index=False)
print("\\nResults saved to planner_evaluation.csv")
Coordinate System Handling
If your planner uses row-column (y,x) format:
def convert_yx_to_xy(yx_paths):
"""Convert (y,x,t) to (x,y,t) format."""
xy_paths = []
for agent_path in yx_paths:
xy_path = []
for y, x, t in agent_path:
xy_path.append((x, y, t))
xy_paths.append(xy_path)
return xy_paths
Or use the --flip_coord=1 flag when running SMART directly.
Dealing with Continuous Time
If your planner outputs continuous time paths:
def discretize_paths(continuous_paths, timestep=1.0):
"""Convert continuous time paths to discrete timesteps."""
discrete_paths = []
for agent_path in continuous_paths:
discrete = []
for x, y, t in agent_path:
discrete_t = int(round(t / timestep))
discrete.append((x, y, discrete_t))
discrete_paths.append(discrete)
return discrete_paths
Performance Tips
Use headless mode for batch experiments
Pre-compile paths to avoid repeated planning
Use appropriate robot parameters matching your assumptions
Run multiple trials for statistical significance
Example: Full Integration
Complete example integrating a custom planner:
# integrate_my_planner.py
import argparse
from pathlib import Path
from smart_client import SMARTClient
from my_planner import MyMAPFPlanner
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--map', required=True)
parser.add_argument('--scen', required=True)
parser.add_argument('--agents', type=int, required=True)
parser.add_argument('--headless', action='store_true')
parser.add_argument('--output', default='results.csv')
args = parser.parse_args()
# Initialize planner
planner = MyMAPFPlanner()
# Solve MAPF problem
print(f"Planning for {args.agents} agents...")
solution = planner.solve(
map_file=args.map,
scen_file=args.scen,
num_agents=args.agents
)
if not solution:
print("Planner failed to find solution!")
return
# Convert to SMART format
paths = solution.get_paths()
# Execute in SMART
print("Running simulation...")
client = SMARTClient()
client.load_simulation(args.map, args.scen, args.agents)
client.set_paths(paths)
stats = client.run(headless=args.headless)
# Report results
print(f"\\nResults:")
print(f" Makespan: {stats['makespan']}")
print(f" Sum of costs: {stats['sum_of_costs']}")
print(f" Success rate: {stats['success_rate']*100:.1f}%")
print(f" Throughput: {stats['throughput']:.2f} agents/sec")
# Save detailed stats
import json
with open(args.output, 'w') as f:
json.dump(stats, f, indent=2)
print(f"\\nDetailed stats saved to {args.output}")
if __name__ == '__main__':
main()
Next Steps
Core APIs - Python API reference
Examples - Example integrations
Usage Guide - Running simulations