"""
This file, simulator.py, defines the SumoSimulator class,
which manages the interaction between driver agents and the SUMO traffic simulator
by handling simulation setup, path generation, vehicle management, and simulation control.
"""
import os
import janux as jx
import logging
import random
import pandas as pd
import traci
from routerl.keychain import Keychain as kc
from routerl.utilities import confirm_env_variable
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
[docs]
class SumoSimulator():
"""Sumo simulator class
A class responsible for managing the communication between our learning agents and the SUMO traffic simulator.
SUMO provides the traffic environment where vehicles travel between designated origins and destinations,
and it returns the corresponding travel times for these vehicles.
Args:
params (dict):
Dictionary of parameters for the SUMO environment. Specified `here <https://coexistence-project.github.io/RouteRL/documentation/pz_env.html#>`_.
path_gen_params (dict):
Dictionary of parameters for the SUMO environment. specified `here <https://coexistence-project.github.io/RouteRL/documentation/pz_env.html#>`_.
seed (int):
Random seed for reproducibility.
Attributes:
network_name: Network name.
simulation_length: Simulation length.
sumo_id: SUMO connection id.
sumo_connection: Traci-SUMO connection object.
timestep: Time step being simulated within the day.
"""
def __init__(self, params: dict, path_gen_params: dict, seed: int = 23423):
self.network_name = params[kc.NETWORK_NAME]
self.sumo_type = params[kc.SUMO_TYPE]
self.number_of_paths = params[kc.NUMBER_OF_PATHS]
self.simulation_length = params[kc.SIMULATION_TIMESTEPS]
curr_dir = os.path.dirname(os.path.abspath(__file__))
self.network_folder = os.path.join(curr_dir,
kc.NETWORK_FOLDER).replace("$net$", self.network_name)
self.sumo_config_path = os.path.join(curr_dir,
kc.SUMO_CONFIG_PATH).replace("$net$", self.network_name)
self.routes_xml_path = os.path.join(curr_dir,
kc.ROU_FILE_PATH).replace("$net$", self.network_name)
self.sumo_fcd = os.path.join(curr_dir,
kc.SUMO_FCD).replace("$net$", self.network_name)
self.detector_save_path = os.path.join(curr_dir,
kc.DETECTORS_CSV_PATH).replace("$net$", self.network_name)
self.conn_file_path = os.path.join(curr_dir,
kc.CONNECTION_FILE_PATH).replace("$net$", self.network_name)
self.edge_file_path = os.path.join(curr_dir,
kc.EDGE_FILE_PATH).replace("$net$", self.network_name)
self.nod_file_path = os.path.join(curr_dir,
kc.NOD_FILE_PATH).replace("$net$", self.network_name)
self.rou_xml_save_path = os.path.join(curr_dir,
kc.ROUTE_XML_PATH).replace("$net$", self.network_name)
self.det_xml_save_path = os.path.join(curr_dir,
kc.DETECTORS_XML_PATH).replace("$net$", self.network_name)
self.default_od_path = os.path.join(curr_dir,
kc.DEFAULT_ODS_PATH)
self.paths_csv_file_path = os.path.join(params[kc.RECORDS_FOLDER], kc.PATHS_CSV_FILE_NAME)
random.seed(seed)
self.seed = seed
self.sumo_id = f"{random.randint(0, 1000)}"
self.sumo_connection = None
confirm_env_variable(kc.ENV_VAR, append="tools")
if path_gen_params is not None:
self._get_paths(params, path_gen_params)
logging.info("[SUCCESS] Path generation completed.")
self._check_paths_ready()
self.detectors_name = self._get_detectors()
self.timestep = 0
self.route_id_cache = dict()
logging.info("[SUCCESS] Simulator is ready to simulate!")
################################
######## CONFIG CHECKS #########
################################
def _check_paths_ready(self) -> None:
if os.path.isfile(self.rou_xml_save_path):
logging.info("[CONFIRMED] Paths file is ready.")
else:
raise FileNotFoundError(
"Paths file is not ready. Please generate paths first.\n"
"To do this, define arguments in params.json and pass it "
"to the environment under path_generation_parameters"
)
def _get_paths(self, params: dict, path_gen_params: dict) -> None:
# Build the network
network = jx.build_digraph(self.conn_file_path, self.edge_file_path, self.routes_xml_path)
# Get origins and destinations
origins = path_gen_params[kc.ORIGINS]
destinations = path_gen_params[kc.DESTINATIONS]
# Generate paths
path_gen_kwargs = {
"number_of_paths": path_gen_params[kc.NUMBER_OF_PATHS],
"random_seed": self.seed,
"num_samples": path_gen_params[kc.NUM_SAMPLES],
"beta": path_gen_params[kc.BETA],
"weight": path_gen_params[kc.WEIGHT],
"verbose": False
}
routes = jx.basic_generator(network, origins, destinations, as_df=True, calc_free_flow=True, **path_gen_kwargs)
self._save_paths_to_disc(routes, origins, destinations)
# Save paths visualizations
path_visuals_path = params[kc.PLOTS_FOLDER]
os.makedirs(path_visuals_path, exist_ok=True)
# Visualize paths and save figures
for origin_idx, origin in enumerate(origins):
for dest_idx, destination in enumerate(destinations):
# Filter routes for the current origin-destination pair
routes_to_show = (routes[(routes["origins"] == origin_idx)
& (routes["destinations"] == dest_idx)]['path'])
routes_to_show = [route.split(" ") for route in routes_to_show]
# Specify the save path and title for the figure
fig_save_path = os.path.join(path_visuals_path, f"{origin_idx}_{dest_idx}.png")
title=f"Origin: {origin_idx} ({origin}), Destination: {dest_idx} ({destination})"
# Show the routes
jx.show_multi_routes(self.nod_file_path, self.edge_file_path,
routes_to_show, origin, destination,
show=False, save_file_path=fig_save_path, title=title)
def _save_paths_to_disc(self, routes_df: pd.DataFrame, origins: list, destinations: list) -> None:
origins = {node_name: idx for idx, node_name in enumerate(origins)}
destinations = {node_name: idx for idx, node_name in enumerate(destinations)}
# Format routes dataframe
routes_df["origins"] = routes_df["origins"].apply(lambda x: origins[x])
routes_df["destinations"] = routes_df["destinations"].apply(lambda x: destinations[x])
routes_df["path"] = routes_df["path"].apply(lambda x: x.replace(",", " "))
routes_df.sort_values(by=["origins", "destinations"], inplace=True)
# Save paths to csv
routes_df.to_csv(self.paths_csv_file_path, index=False)
# Convert routes dataframe to a dictionary with od indices
paths_dict = dict()
for origin_idx in origins.values():
for destination_idx in destinations.values():
rows = routes_df[(routes_df["origins"] == origin_idx) & (routes_df["destinations"] == destination_idx)]
paths = rows["path"].to_list()
paths_dict[(origin_idx, destination_idx)] = paths
# Save paths to xml
with open(self.rou_xml_save_path, "w") as rou:
print("""<routes>""", file=rou)
# TODO: Following two lines are hardcoded. Change them to be dynamic.
print("<vType id=\"Human\" color=\"red\" guiShape=\"passenger/sedan\"/>", file=rou)
print("<vType id=\"AV\" color=\"yellow\"/>", file=rou)
for origin_idx in origins.values():
for destination_idx in destinations.values():
paths = (routes_df[(routes_df["origins"] == origin_idx)
& (routes_df["destinations"] == destination_idx)]["path"].values)
for idx, path in enumerate(paths):
print(f'<route id="{origin_idx}_{destination_idx}_{idx}" edges="', file=rou)
print(path, file=rou)
print('" />',file=rou)
print("</routes>", file=rou)
def _get_detectors(self):
paths_df = pd.read_csv(self.paths_csv_file_path)
paths_list = [path.split(" ") for path in paths_df["path"].values]
detectors_name = sorted(list(set([node for path in paths_list for node in path])))
detectors_df = pd.DataFrame({"name": detectors_name})
detectors_df.to_csv(self.detector_save_path, index=False)
with open(self.det_xml_save_path, "w") as det:
print("""<additional>""", file=det)
for det_id in detectors_name:
print(f"<inductionLoop id=\"{det_id}_det\" lane=\"{det_id}_0\" pos=\"-5\" file=\"NUL\" friendlyPos=\"True\"/>",
file=det)
print("</additional>", file=det)
return detectors_name
################################
######## SUMO CONTROL ##########
################################
[docs]
def start(self) -> None:
"""Starts the SUMO simulation with the specified configuration.
Returns:
None
"""
sumo_cmd = [self.sumo_type,"--seed",
str(self.seed),
"--fcd-output",
self.sumo_fcd,
"-c", self.sumo_config_path]
traci.start(sumo_cmd, label=self.sumo_id)
self.sumo_connection = traci.getConnection(self.sumo_id)
[docs]
def stop(self) -> None:
"""Stops and closes the SUMO simulation.
Returns:
None
"""
self.sumo_connection.close()
[docs]
def reset(self) -> dict:
""" Resets the SUMO simulation to its initial state.
Reads detector data.
Returns:
det_dict (dict[str, float]): dictionary with detector data.
"""
det_dict = {name: None for name in self.detectors_name}
for det_name in self.detectors_name:
det_dict[det_name] = self.sumo_connection.inductionloop.getIntervalVehicleNumber(f"{det_name}_det")
self.sumo_connection.load(["--seed",
str(self.seed),
"--fcd-output",
self.sumo_fcd,
'-c',
self.sumo_config_path])
self.timestep = 0
return det_dict
################################
######### SIMULATION ###########
################################
[docs]
def add_vehicle(self, act_dict: dict) -> None:
"""Adds a vehicle to the SUMO simulation environment with the specified route and parameters.
Args:
act_dict (dict): A dictionary containing key vehicle attributes.
Returns:
None
"""
route_id = (
self.route_id_cache.setdefault((
act_dict[kc.AGENT_ORIGIN],
act_dict[kc.AGENT_DESTINATION],
act_dict[kc.ACTION]),
f'{act_dict[kc.AGENT_ORIGIN]}_{act_dict[kc.AGENT_DESTINATION]}_{act_dict[kc.ACTION]}'))
kind = act_dict[kc.AGENT_KIND]
self.sumo_connection.vehicle.add(vehID=str(act_dict[kc.AGENT_ID]),
routeID=route_id,
depart=str(act_dict[kc.AGENT_START_TIME]),
typeID=kind)
[docs]
def step(self) -> tuple:
"""Advances the SUMO simulation by one timestep and retrieves information
about vehicle arrivals and detector data.
Returns:
self.timestep (int): The current simulation timestep.
arrivals (list): List of vehicle IDs that arrived at their destinations during the current timestep.
"""
arrivals = self.sumo_connection.simulation.getArrivedIDList()
self.sumo_connection.simulationStep()
self.timestep += 1
return self.timestep, arrivals