A compact, async event pipeline built with Python: FastAPI -> Kafka -> Processor -> PostgreSQL. Designed with interfaces (Messaging / Database), factories, Pydantic v2 validation, and idempotent writes.
receiver/— FastAPI app that accepts events and publishes them to Kafka.processor/— worker that consumes from Kafka, validates events, and writes them to Postgres.docker-compose.yml/docker-compose.kraft.yml
[Client] -> POST /api/v1/events -> [Receiver (FastAPI, producer)] -> Kafka topic -> [Processor (consumer)] -> PostgreSQL (events)
For local run:
- Python 3.13+
- uv
- Docker & Docker Compose (for Kafka + Postgres)
git clone <repo-url>
cd <repo-root>POSTGRES_DB=processor
POSTGRES_USER=admin
POSTGRES_PASSWORD=admin
POSTGRES_PORT=5432
PROJECT_NAME="receiver"
KAFKA_BOOTSTRAP_SERVERS="kafka:9092"
KAFKA_TOPIC="incoming_events"docker compose -f docker-compose.yml up -dOR
docker compose -f docker-compose.kraft.yml up -dNote: the first KRaft run must initialize storage.
Services automatically starts with docker-compose, but you can start them locally
- Create .env.dev
- Create venv and install requirements
cd receiver
uv sync
cd processor
uv sync- Use vscode debug options (don't forget about virtual environment in vs code)or run manually:
# Receiver
cd receiver
source .venv/bin/activate
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Processor
cd processor
source .venv/bin/activate
python app/main.pypoe lint
poe format
poe mypy
poe testGithub actions / gitlab ci-cd pipeline with stages:
- Linting & formatting:
ruff. - Static types:
mypy. - Tests:
pytest. - Build: build Docker images and push to a registry.
Modern Kafka distributions support running without ZooKeeper (KRaft mode).
Add a Schema Registry
I used Kafdrop for simplicity
- Use multiple nodes (3+) for kafka, zookeeper, postgres.
- Use reverse proxy for web server
Decimal type preferred for monetary values
I used only one branch, but in prod i will use feature->dev->main strategy
- Add authentication to the receiver API.
- Metrics: expose Prometheus metrics (FastAPI + aiokafka exporter; Kafka JMX exporter; Postgres exporter).
- Dashboards: Grafana dashboards for application metrics, Kafka lag, consumer lag, broker health.
- Logging: structured logs, shipped to centralized system (ELK/EFK, Loki).
- Backups and migrations for database
- Expand tests: unit, integration, end-to-end.