A distributed platform for intelligent urban traffic management, built with Python and ZeroMQ. The system simulates a city grid with smart sensors, real-time analytics, automated semaphore control, and a monitoring interface -- all running across three networked machines communicating via asynchronous messaging patterns.
The system simulates a city represented as a 4x4 grid (rows A-D, columns 1-4) with 16 intersections. Each intersection can have traffic sensors and a semaphore. Three types of sensors generate real-time events:
| Sensor | Event Type | What It Measures |
|---|---|---|
| Camera (CAM) | Queue Length (Lq) | Vehicles waiting + average speed |
| Inductive Loop (ESP) | Vehicle Count (Cv) | Vehicles passing over the loop in a time interval |
| GPS | Traffic Density (Dt) | Average speed + derived congestion level (ALTA/NORMAL/BAJA) |
Sensor data flows through a ZeroMQ broker to an analytics engine that evaluates traffic rules, makes decisions (extend green, trigger green wave, etc.), controls semaphores, and persists all data to both a primary and replica database. A monitoring service provides an interactive CLI for queries and direct commands (e.g., prioritizing an ambulance route).
The system handles fault tolerance transparently: if the primary database machine (PC3) goes down, all operations automatically fail over to the replica on PC2.
The system is distributed across 3 machines (Docker containers), each with distinct responsibilities:
COMPUTER 1 (PC1) COMPUTER 2 (PC2) COMPUTER 3 (PC3)
======================== ============================== =======================
+------------------+ +------------------------+ +-------------------+
| Camera Sensors |--PUB--+ | Analytics Service | | Monitoring & |
| (8 cameras) | | | - Receives events | | Query Service |
+------------------+ | | - Evaluates rules | REQ/ | - Interactive CLI |
| | - Makes decisions |<-REP-->| - Queries state |
+------------------+ | | - Sends DB writes | | - Direct commands |
| Inductive Loops |--PUB--+ +---+----+----+----------+ +-------------------+
| (8 loops) | | | | | |
+------------------+ | | | | |
| | | PUSH REQ/REP
+------------------+ | | | | |
| GPS Sensors |--PUB--+ | | v v
| (8 sensors) | | | | +-----------------+ +-------------------+
+------------------+ | | | | Semaphore | | Primary Database |
| | | | Control Service | | (SQLite) |
PUB/SUB | PUSH | | - State mgmt | +-------------------+
| | | | | - Light changes |
v | | | +-----------------+
+------------------+ | | |
| ZMQ Broker | SUB | | PUSH
| - Subscribes to |<------+ | |
| all sensors | | v
| - Forwards to |--PUB------->| +-----------------+
| PC2 | SUB | | DB Replica |
+------------------+ | | (SQLite backup) |
| +-----------------+
|
v
PUSH/PULL to
both databases
| Connection | Pattern | Direction | Purpose |
|---|---|---|---|
| Sensors --> Broker | PUB/SUB | PC1 internal | Sensors publish events, broker subscribes |
| Broker --> Analytics | PUB/SUB | PC1 --> PC2 | Broker forwards all events to analytics |
| Analytics --> Semaphore Control | PUSH/PULL | PC2 internal | Semaphore change commands |
| Analytics --> Primary DB | PUSH/PULL | PC2 --> PC3 | Persist sensor data and decisions |
| Analytics --> Replica DB | PUSH/PULL | PC2 internal | Replicate data for fault tolerance |
| Monitoring <--> Analytics | REQ/REP | PC3 --> PC2 | Queries and direct commands |
| Health Check | REQ/REP | PC2 --> PC3 | Periodic heartbeat to detect PC3 failure |
The analytics service evaluates sensor data against these rules to determine the traffic state at each intersection:
| State | Condition | Action | Timing |
|---|---|---|---|
| Normal | Q < 5 AND Vp > 35 AND D < 20 | Standard semaphore cycle | 15s red/green |
| Congestion | Q >= 10 OR Vp <= 20 OR D >= 40 | Extend green phase on congested direction | +10s green extension |
| Green Wave | User command (e.g., ambulance) | Force all semaphores on a route to GREEN | 30s forced green |
Variables:
- Q -- Queue length: number of vehicles waiting (from camera sensor
volumen) - Vp -- Average speed in km/h (from camera and GPS sensors
velocidad_promedio) - D -- Traffic density proxy: vehicle count (from inductive loop sensor
vehiculos_contados)
GPS Congestion Levels:
- ALTA (High): average speed < 10 km/h
- NORMAL: 10 <= average speed <= 40 km/h
- BAJA (Low): average speed > 40 km/h
- Docker and Docker Compose (recommended)
- Python 3.11+ (for local development)
- Git
This launches all three machines as containers on a shared network:
# Build and start the entire system
docker compose up --build
# Run in detached mode
docker compose up --build -d
# View logs for a specific service (use service names, not container names)
docker compose logs -f pc1
docker compose logs -f pc2
docker compose logs -f pc3
# Stop the system
docker compose down
# Stop and remove volumes (clears databases)
docker compose down -vThe monitoring service on PC3 provides an interactive CLI. To attach to it:
docker attach pc3-monitoringTo run with the multithreaded broker (for performance experiments), set the environment variable in docker-compose.yml or pass it directly:
# Add to pc1 service in docker-compose.yml:
# environment:
# - BROKER_MODE=threaded
# - SENSOR_INTERVAL=5
# Or override at runtime:
BROKER_MODE=threaded SENSOR_INTERVAL=5 docker compose up pc1Note:
SENSOR_INTERVAL=0(the default indocker-compose.yml) means "use each sensor type's configured default interval" -- 10s for cameras and GPS, 30s for inductive loops. Setting a non-zero value overrides the interval for all sensor types.
For local development and debugging without Docker:
# 1. Install dependencies
pip install -r requirements.txt
# 2. Set PYTHONPATH to project root
# Linux/macOS:
export PYTHONPATH=$(pwd)
# Windows (PowerShell):
$env:PYTHONPATH = (Get-Location).Path
# 3. Start PC1 (sensors + broker)
python pc1/start_pc1.py
# Options: --broker-mode standard|threaded (default: standard, env: BROKER_MODE)
# --interval N (seconds, 0 = use config defaults, env: SENSOR_INTERVAL)
# --sensor-count N (sensors per type, 0 = all from config, env: SENSOR_COUNT)
# Or start individual components:
python -m pc1.broker --mode standard
python -m pc1.sensors.camera_sensor --all --interval 10 --count 2
python -m pc1.sensors.inductive_sensor --all --interval 30 --count 2
python -m pc1.sensors.gps_sensor --all --interval 10 --count 2
# --count N: launch only the first N sensors of that type (0 or omit = all)
# 4. Start PC2 (analytics + semaphore control + replica DB)
# Note: --replica-db-path defaults to /data/traffic_replica.db (Docker path).
# Override for local development:
python pc2/start_pc2.py --replica-db-path ./traffic_replica.db
# 5. Start PC3 (monitoring CLI + primary DB)
# Note: --db-path defaults to /data/traffic_primary.db (Docker path).
# Override for local development:
python pc3/start_pc3.py --db-path ./traffic_primary.dbEach sensor supports both --all (all sensors of that type from config) and single-sensor mode:
# Single camera sensor at intersection INT-A1
python -m pc1.sensors.camera_sensor --sensor-id CAM-A1 --intersection INT-A1 --interval 10
# Single GPS sensor
python -m pc1.sensors.gps_sensor --sensor-id GPS-B2 --intersection INT-B2 --interval 10
# Single inductive loop (30s default interval)
python -m pc1.sensors.inductive_sensor --sensor-id ESP-C4 --intersection INT-C4# Install dev dependencies
pip install -r requirements-dev.txt
# Run all tests
pytest tests/ -v
# Run with coverage (if installed)
pytest tests/ -v --tb=shortAll system parameters are defined in config/city_config.json. This single file controls the entire system behavior.
"city": {
"grid": {
"rows": ["A", "B", "C", "D"],
"columns": [1, 2, 3, 4]
}
}This creates 16 intersections: INT-A1 through INT-D4. To change the grid size, modify the rows and columns arrays and update the intersections, sensors, and semaphores sections accordingly.
Each sensor entry maps a sensor ID to an intersection:
"sensors": {
"cameras": [
{"sensor_id": "CAM-A1", "interseccion": "INT-A1"},
...
],
"inductive_loops": [
{"sensor_id": "ESP-A2", "interseccion": "INT-A2"},
...
],
"gps": [
{"sensor_id": "GPS-A1", "interseccion": "INT-A1"},
...
]
}The default configuration has 8 sensors of each type (24 total) distributed across the 16 intersections.
| Parameter | Default | Description |
|---|---|---|
normal_cycle_sec |
15 | Standard red/green cycle duration |
congestion_extension_sec |
10 | Extra green time during congestion |
green_wave_duration_sec |
30 | Duration of forced green wave |
sensor_default_interval_sec |
10 | Camera and GPS event generation interval |
inductive_interval_sec |
30 | Inductive loop measurement interval |
health_check_interval_sec |
5 | How often PC2 pings PC3 |
health_check_timeout_ms |
2000 | Timeout for each health check ping |
health_check_max_retries |
3 | Failed pings before declaring PC3 down |
| Variable | Default | Description |
|---|---|---|
BROKER_MODE |
standard |
Broker type: standard (single-thread) or threaded (multi-thread) |
SENSOR_INTERVAL |
0 |
Sensor event interval in seconds (0 = use config defaults per sensor type) |
SENSOR_COUNT |
0 |
Number of sensors per type to launch (0 = all from config) |
| Port | Name | Used By |
|---|---|---|
| 5555 | sensor_camera_pub |
Camera sensors PUB |
| 5556 | sensor_inductive_pub |
Inductive sensors PUB |
| 5557 | sensor_gps_pub |
GPS sensors PUB |
| 5560 | broker_pub |
Broker PUB (to PC2) |
| 5561 | analytics_rep |
Analytics REP (from PC3 monitoring) |
| 5562 | semaphore_control_pull |
Semaphore control PULL |
| 5563 | db_primary_pull |
Primary DB PULL (on PC3) |
| 5564 | db_replica_pull |
Replica DB PULL (on PC2) |
| 5565 | health_check_rep |
Health check REP (on PC3) |
"network": {
"pc1_host": "pc1",
"pc2_host": "pc2",
"pc3_host": "pc3"
}These match the Docker Compose service names. For local development, change them to localhost or 127.0.0.1.
The system is designed to handle the failure of PC3 (primary database + monitoring) transparently.
Normal Operation:
Analytics (PC2) --PUSH--> Primary DB (PC3) [writes go to both]
Analytics (PC2) --PUSH--> Replica DB (PC2)
Health Check:
Analytics (PC2) --REQ "PING"--> PC3
Analytics (PC2) <--REP "PONG"-- PC3 [every 5 seconds]
PC3 Failure Detected:
Analytics (PC2) --REQ "PING"--> PC3
Analytics (PC2) ... timeout (2s) ... retry x3
[FAILOVER] PC3 is down. Using replica DB on PC2.
After Failover:
Analytics (PC2) --PUSH--> Replica DB (PC2) [writes only to replica]
Monitoring queries redirected to Replica DB
- The analytics service on PC2 sends a
PINGheartbeat to PC3 every 5 seconds - If no
PONGresponse is received within 2 seconds, it counts as a failed attempt - After 3 consecutive failures (configurable), PC3 is declared down
- All database writes are redirected exclusively to the PC2 replica
- Monitoring queries are served from the replica database
- The system continues operating without interruption
When PC3 is down, the monitoring CLI on PC3 is unavailable. A fallback CLI on PC2 provides the same functionality:
docker exec -it pc2-analytics python -m pc2.monitoring_fallbackThis connects to the analytics REP socket on localhost:5561 and queries the replica DB directly.
When PC3 comes back online, the health checker detects the restored PONG response and automatically:
- Reconnects the primary PUSH socket to PC3
- Resumes dual writes (primary + replica)
- Logs
[RECOVERY] PC3 is back
No manual intervention is required. Data written during the failover period only exists in the replica (no DB resync).
# Start the full system
docker compose up -d
# Simulate PC3 failure
docker stop pc3-monitoring
# Observe failover in PC2 logs (use service name, not container name)
docker compose logs -f pc2
# Should show: [FAILOVER] PC3 is down. Using replica DB on PC2.
# Verify system continues operating
# (sensors still generate data, analytics still processes, DB replica still receives writes)
# Use fallback monitoring CLI on PC2
docker exec -it pc2-analytics python -m pc2.monitoring_fallback
# Restore PC3 - auto-recovery kicks in
docker start pc3-monitoring
# PC2 logs should show: [RECOVERY] PC3 is backThe project requires comparing two broker designs under different load conditions.
| Scenario | Sensors | Generation Interval | Broker Design |
|---|---|---|---|
| 1A | 1 of each type (3 total) | 10 seconds | Standard (single-thread) |
| 1B | 1 of each type (3 total) | 10 seconds | Multithreaded |
| 2A | 2 of each type (6 total) | 5 seconds | Standard (single-thread) |
| 2B | 2 of each type (6 total) | 5 seconds | Multithreaded |
Note: Use
SENSOR_COUNTto control how many sensors of each type are launched.SENSOR_COUNT=0(default) launches all 8 of each type from config.SENSOR_INTERVALcontrols the generation interval for all sensor types simultaneously.SENSOR_INTERVAL=0(default) means "use each sensor type's configured default".
Dependent variables (what we measure):
- Throughput: Number of events stored in the database within a 2-minute window
- Latency: Time from when a user sends a semaphore change command to when the semaphore actually changes state
Independent variables (what we control):
- Number of sensors generating data
- Time interval between event generations
- Broker architecture (single-threaded vs multithreaded)
The scenario runner automates all 4 experiments, collecting throughput and latency:
# Build images first
docker compose build
# Run all 4 scenarios (2 minutes each by default)
python -m perf.run_scenarios
# Or with a custom duration
python -m perf.run_scenarios --duration 60
# Generate graphs from results
python -m perf.generate_graphsResults are saved to benchmark_results/results.json and graphs to benchmark_results/graphs/.
# Start pc2 and pc3 in the background
docker compose up -d pc2 pc3
# Scenario 1A: 1 sensor/type, 10s interval, standard broker
BROKER_MODE=standard SENSOR_INTERVAL=10 SENSOR_COUNT=1 docker compose up pc1
# Scenario 1B: 1 sensor/type, 10s interval, threaded broker
BROKER_MODE=threaded SENSOR_INTERVAL=10 SENSOR_COUNT=1 docker compose up pc1
# Scenario 2A: 2 sensors/type, 5s interval, standard broker
BROKER_MODE=standard SENSOR_INTERVAL=5 SENSOR_COUNT=2 docker compose up pc1
# Scenario 2B: 2 sensors/type, 5s interval, threaded broker
BROKER_MODE=threaded SENSOR_INTERVAL=5 SENSOR_COUNT=2 docker compose up pc1
# Stop pc1 between scenarios (pc2 and pc3 stay running)
docker compose stop pc1
# Tear down everything when done
docker compose down-
Standard (
BROKER_MODE=standard): Single-threaded broker usingzmq.Pollerto monitor all three sensor SUB sockets in a loop. Simple and predictable. -
Threaded (
BROKER_MODE=threaded): Each sensor topic gets its own subscriber thread. Threads forward messages via aninproc://PUSH/PULL pipeline to a collector thread that publishes to PC2. Higher concurrency but more overhead.
Latency is measured end-to-end across processes using wall-clock timestamps. Each SemaphoreCommand carries a created_at field (time.time() at creation). When traffic_light_control applies the state change, it computes latency_ms = (time.time() - command.created_at) * 1000 and logs it as [LATENCY] INT-XX: N.NN ms.
After running all 4 scenarios, collect results in tables, generate graphs (throughput and latency as functions of sensor count and interval), and analyze:
- Which design handles higher load better?
- How does latency scale with increased sensor count?
- Which architecture is more scalable and why?