High-performance real-time market data processing system designed for low-latency trading applications. This system ingests, processes, and distributes market data with sub-millisecond latency requirements.
Market Data Sources → WebSocket Client → Kafka → Stream Processor → InfluxDB/Redis → API/Dashboard
- Data Ingestion: WebSocket clients for multiple exchanges
- Message Queue: Apache Kafka for reliable data streaming
- Stream Processing: Real-time data normalization and enrichment
- Storage: InfluxDB for time-series data, Redis for low-latency access
- API: FastAPI for real-time data access
- Monitoring: Prometheus metrics and Grafana dashboards
- Sub-millisecond processing latency
- 1M+ messages per second throughput
- Horizontal scaling capabilities
- Memory-optimized data structures
- Multi-exchange data normalization
- Real-time trade and quote processing
- Order book reconstruction
- Market data quality validation
- Anomaly detection and alerting
- Python 3.11+: Core application logic
- uv: Fast Python package manager and project manager
- Apache Kafka: Message streaming platform
- InfluxDB: Time-series database
- Redis: In-memory data store
- FastAPI: REST API framework
- asyncio: Asynchronous programming
- Docker: Containerization
- Docker and Docker Compose
- Python 3.11+
- uv (fast Python package manager)
- 8GB+ RAM recommended
# Clone and setup
cd real_time_pipeline
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install dependencies
uv sync
# Start infrastructure
docker-compose up -d
# Initialize databases
uv run python scripts/setup_db.py
# Generate test data (optional)
uv run python scripts/generate_test_data.py
# Start data pipeline
uv run python main.py
# View dashboard
open http://localhost:8080The pipeline requires InfluxDB and Redis to be properly initialized before starting:
# Initialize all databases
uv run python scripts/setup_db.py
# Or initialize specific databases
uv run python scripts/setup_db.py --influxdb-only
uv run python scripts/setup_db.py --redis-onlyThis script will:
- Create InfluxDB buckets with proper retention policies
- Set up Redis data structures for symbols and metadata
- Test connectivity to both databases
- Provide detailed logging of the initialization process
For development and testing, you can generate sample market data:
# Generate all test data
uv run python scripts/generate_test_data.py
# Generate data for specific symbol
uv run python scripts/generate_test_data.py --symbol BTC-USD --count 1000
# Custom output directory
uv run python scripts/generate_test_data.py --output-dir my_test_dataGenerated data includes:
- Trade Data: Realistic trade records with price movements
- Order Books: Sample bid/ask data for each symbol
- Ticker Data: 24h/30d volume and price statistics
- Configuration: Summary of generated data
| Metric | Value |
|---|---|
| Processing Latency | <500μs |
| Throughput | 1.2M msg/sec |
| Memory Usage | <2GB |
| CPU Usage | <50% (4 cores) |
Key configuration options in config.yaml:
- Exchange connections
- Kafka topics and partitions
- Processing batch sizes
- Storage retention policies
- Alert thresholds
- Message processing latency (p50, p95, p99)
- Throughput (messages/second)
- Error rates and types
- Memory and CPU utilization
- Queue depths and processing lags
- High latency alerts (>1ms)
- Message loss detection
- Exchange disconnections
- System resource limits
# Unit tests
uv run pytest tests/unit/
# Integration tests
uv run pytest tests/integration/
# Performance tests
uv run python tests/performance/load_test.py
# Generate test data for testing
uv run python scripts/generate_test_data.py
# Test database connectivity
uv run python scripts/setup_db.py --influxdb-only
uv run python scripts/setup_db.py --redis-onlySupported exchanges:
- Binance (WebSocket)
- Coinbase Advanced Trade (WebSocket) - using coinbase-advanced-py
- Kraken (WebSocket)
- Custom exchange adapters
GET /health- System health checkGET /metrics- Prometheus metricsGET /symbols- Available trading symbolsGET /trades/{symbol}- Recent tradesGET /orderbook/{symbol}- Current order bookWebSocket /ws- Real-time data stream
# Build images
docker build -t market-data-pipeline .
# Deploy with Kubernetes
kubectl apply -f k8s/
# Scale components
kubectl scale deployment processor --replicas=5- Environment-specific configs
- Secrets management with Vault
- Feature flags for gradual rollouts
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install dependencies including dev tools
uv sync --dev
# Run tests
uv run pytest
# Format code
uv run black .
uv run isort .
# Type checking
uv run mypy src/
# Linting
uv run flake8 src/# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -e ".[dev,test]"
# Run tests
pytestDatabase Connection Errors:
# Check if databases are running
docker-compose ps
# Restart databases
docker-compose restart influxdb redis
# Reinitialize databases
uv run python scripts/setup_db.pyDependency Conflicts:
# Clean and reinstall dependencies
rm -rf .venv uv.lock
uv syncPort Conflicts:
# Check what's using ports
lsof -i :8000 # API port
lsof -i :9090 # Prometheus port
lsof -i :3000 # Grafana portDocker Issues:
# Clean Docker resources
docker-compose down -v
docker system prune -f
docker-compose up -d# View application logs
uv run python main.py --log-level DEBUG
# View Docker logs
docker-compose logs -f kafka
docker-compose logs -f influxdb
docker-compose logs -f redis
# Check Prometheus targets
open http://localhost:9090/targets- Follow PEP 8 style guidelines
- Add unit tests for new features
- Update documentation
- Performance test critical paths
- Monitor resource usage
- Use
uvfor dependency management