Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 

README.md

Examples

Practical examples and use cases for RouteMQ applications.

Topics

Complete IoT Example

This example shows a complete IoT device management system:

Device Router

# app/routers/devices.py
from core.router import Router
from app.controllers.device_controller import DeviceController
from app.middleware.auth import AuthMiddleware
from app.middleware.rate_limit import RateLimitMiddleware

router = Router()

auth = AuthMiddleware()
rate_limit = RateLimitMiddleware(max_requests=100, window_seconds=60)

with router.group(prefix="devices", middleware=[auth, rate_limit]) as devices:
    devices.on("register/{device_id}", DeviceController.register_device, qos=1)
    devices.on("heartbeat/{device_id}", DeviceController.heartbeat, qos=0)
    devices.on("data/{device_id}", DeviceController.receive_data, qos=1)
    devices.on("control/{device_id}", DeviceController.control_device, qos=2, shared=True)

Device Controller

# app/controllers/device_controller.py
from core.controller import Controller
from core.redis_manager import redis_manager
from app.models.device import Device
import json
import time

class DeviceController(Controller):
    @staticmethod
    async def register_device(device_id, payload, client):
        """Register a new device"""
        device_info = {
            "device_id": device_id,
            "name": payload.get("name"),
            "type": payload.get("type"),
            "firmware": payload.get("firmware"),
            "registered_at": time.time()
        }
        
        # Store in Redis for quick access
        await redis_manager.set_json(f"device:{device_id}", device_info, ex=86400)
        
        # Store in database for persistence
        device = Device(**device_info)
        await device.save()
        
        # Publish registration confirmation
        response_topic = f"devices/{device_id}/register/response"
        client.publish(response_topic, json.dumps({
            "status": "registered",
            "device_id": device_id,
            "timestamp": time.time()
        }))
        
        return {"status": "registered"}
    
    @staticmethod
    async def receive_data(device_id, payload, client):
        """Receive and process device data"""
        # Update last seen timestamp
        await redis_manager.set(f"device:{device_id}:last_seen", time.time(), ex=3600)
        
        # Process sensor data
        if "temperature" in payload:
            await DeviceController._process_temperature(device_id, payload["temperature"])
        
        if "humidity" in payload:
            await DeviceController._process_humidity(device_id, payload["humidity"])
        
        return {"status": "processed"}

Sensor Data Pipeline

# app/routers/sensors.py
from core.router import Router
from app.controllers.sensor_controller import SensorController
from app.middleware.validation import ValidationMiddleware

router = Router()

validation = ValidationMiddleware(schema="sensor_data.json")

with router.group(prefix="sensors", middleware=[validation]) as sensors:
    sensors.on("temperature/{sensor_id}", SensorController.handle_temperature, qos=1)
    sensors.on("batch/{location}", SensorController.handle_batch, qos=2, shared=True, worker_count=3)

Real-time Chat System

# app/routers/chat.py
from core.router import Router
from app.controllers.chat_controller import ChatController
from app.middleware.auth import AuthMiddleware

router = Router()

auth = AuthMiddleware()

with router.group(prefix="chat", middleware=[auth]) as chat:
    chat.on("message/{room_id}", ChatController.handle_message, qos=1)
    chat.on("join/{room_id}", ChatController.join_room, qos=1)
    chat.on("leave/{room_id}", ChatController.leave_room, qos=1)

Performance Monitoring Example

# app/middleware/performance.py
from core.middleware import Middleware
from core.redis_manager import redis_manager
import time

class PerformanceMiddleware(Middleware):
    async def handle(self, context, next_handler):
        start_time = time.time()
        topic = context['topic']
        
        # Track request count
        await redis_manager.incr(f"stats:requests:{topic}:count")
        await redis_manager.incr("stats:requests:total")
        
        # Execute handler
        result = await next_handler(context)
        
        # Track processing time
        processing_time = time.time() - start_time
        await redis_manager.set_json(f"stats:requests:{topic}:last_time", {
            "duration": processing_time,
            "timestamp": time.time()
        }, ex=3600)
        
        # Track slow requests
        if processing_time > 1.0:  # Over 1 second
            await redis_manager.incr(f"stats:slow_requests:{topic}")
        
        return result

Next Steps