-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulation.py
More file actions
47 lines (37 loc) · 1.52 KB
/
simulation.py
File metadata and controls
47 lines (37 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# simulation.py — Dynamic time-based traffic simulation for UrbanFlow
def run_simulation(network, traffic_input, time_steps=10):
"""
Simulates traffic flow over time.
At each step:
1. New vehicles enter from lanes
2. Vehicles move one hop per step, limited by edge capacity
3. Unmoved vehicles stay in queue
4. Delay = total queued vehicles at end of step
"""
queues = {node: 0 for node in network.keys()}
total_delay = 0
for t in range(time_steps):
print(f"\n--- Time Step {t+1} ---")
# New vehicles enter
for lane, vehicles in traffic_input.items():
queues[lane] += vehicles
next_queues = {node: queues[node] for node in queues}
# Process flow — distribute across ALL valid edges, not just first
for node, connections in network.items():
if queues[node] == 0:
continue
remaining = queues[node]
for dest, capacity in sorted(connections.items(), key=lambda x: -x[1]):
if remaining == 0:
break
flow = min(remaining, capacity)
next_queues[node] -= flow
next_queues[dest] += flow
remaining -= flow
queues = next_queues
step_delay = sum(v for v in queues.values() if v > 0)
total_delay += step_delay
print(f"Queues: {queues}")
print(f"Step Delay: {step_delay}")
print(f"\nTotal delay over simulation: {total_delay}")
return total_delay