FX-aware PnL & NAV in EUR for traders — clean prices, correct conversions, and risk alerts.
Who is this for? Advanced retail / prop traders who track crypto in USD/USDT but report in EUR and need clean prices, correct EUR conversions, and actionable alerts.
- Problem: Crypto price feeds are noisy (spikes, gaps, duplicates). Exchanges quote in USD/USDT, but accounting and risk are in EUR.
- Solution: An Apache Airflow pipeline that ingests crypto prices and official EUR FX, cleans and normalizes them (raw → silver), converts correctly to EUR, and publishes gold models (PnL/NAV, FX sensitivity) for BI and alerts.
Coingecko (crypto) ECB (EUR FX)
JSON/REST JSON/XML
\ /
\ /
v v
[ BRONZE ] -> raw/ (JSONL on S3/MinIO)
v
[ SILVER ] -> normalized Parquet (partitioned by dt)
v
[ GOLD ] -> business models:
- pnl_daily_asset (EUR)
- nav_daily (EUR)
- fx_sensitivity (±1%)
- clean OHLC daily
- Bronze: raw JSON as received from Coingecko (prices/ohlc) and ECB (EUR FX).
- Silver: normalized, deduplicated prices (USD) and FX rates (EUR base) in Parquet.
- Gold: business metrics → PnL/NAV in EUR, FX sensitivity (±1%), and clean OHLC daily.
Schema sentinel
-
Dimensions
dim_asset(id_asset PK, symbol, name, source),UNIQUE(symbol, source)dim_currency(id_currency PK, code, name),UNIQUE(code)
-
Facts
fact_price_minute(ts, id_asset, id_currency, price, market_cap, volume, ingest_ts)
PK(ts, id_asset, id_currency)(idempotent upsert)fact_ohlc_daily(date, id_asset, id_currency, open, high, low, close, ingest_ts)
PK(date, id_asset, id_currency)fact_fx_daily(date, id_currency, eur_rate, ingest_ts)
PK(date, id_currency)— eur_rate = USD per EUR (EURUSD)
-
Portfolio (simple)
positions(asset, qty, start_date, end_date NULL)— current quantities for unrealized PnL.
Gold (views/tables)
gold.pnl_daily_asset(date, asset, qty, price_usd, eurusd, price_eur, pnl_eur, pnl_pct)gold.nav_daily(date, nav_eur)gold.fx_sensitivity(date, asset, nav_eur, nav_up1pct, nav_down1pct, delta_up, delta_down)
Correct conversion: if
EURUSD = 1.10⇒1 USD = 1/1.10 EUR.
price_eur = price_usd * (1 / eurusd).
-
pnl_sentinel_intraday— every 15 min (*/15 * * * *, TZ Europe/Lisbon)
extract_prices_raw→extract_fx_raw→silver_transform→upsert_dimensions→
load_fact_price_minute→gold_compute_intraday→dq_checks_intraday -
pnl_sentinel_eod— 23:55 Europe/Lisbon
extract_ohlc_raw→build_ohlc_daily→gold_pnl_daily→gold_fx_sensitivity→publish_bi
SLA target: each run < 2 min. Retries: 2 × 5 min.
- Minimum checks:
price > 0,eur_rate > 0, no duplicates by PK in facts/gold. - Outliers: flag if
|Δclose| > 30%(store insentinel.data_quality_issues). - Idempotency:
ON CONFLICT DO UPDATEon facts; gold recomputable from silver. - Lineage: store
ingest_ts,dag_run_id,source_keyin silver/gold.
Requirements: Docker & Docker Compose.
- Clone and prepare environment
cp .env.example .env
# Generate a Fernet key and paste it into FERNET_KEY:
python - <<'PY'
from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())
PY- Bring services up
docker compose up -d
# Airflow UI: http://localhost:8080 (admin / admin)
# MinIO UI: http://localhost:9001 (minio / minio123)
# Postgres: localhost:5432 (airflow/airflow)- Create schema/tables and seed positions
# Use the same DB for Airflow metadata and business data in dev (simple setup)
psql postgresql://airflow:airflow@localhost:5432/airflow -f sql/ddl_sentinel.sql
# (optional) seed a small portfolio
psql postgresql://airflow:airflow@localhost:5432/airflow -f sql/seed_positions.sql- Enable DAGs in Airflow
Turn onpnl_sentinel_intradayandpnl_sentinel_eod. Verify runs < 2 min.
TZ=Europe/Lisbon
FERNET_KEY=REPLACE_ME
# Postgres (dev)
PG_USER=airflow
PG_PASSWORD=airflow
PG_HOST=postgres
PG_DB_AIRFLOW=airflow # reuse in dev for simplicity
PG_DB_SENTINEL=airflow # reuse in dev for simplicity
# MinIO / S3 local
S3_ENDPOINT_URL=http://minio:9000
S3_ACCESS_KEY=minio
S3_SECRET_KEY=minio123
S3_REGION=us-east-1
S3_BUCKET_RAW=pnl-sentinel-raw
S3_BUCKET_SILVER=pnl-sentinel-silver
S3_BUCKET_GOLD=pnl-sentinel-goldIn AWS, you will remove
S3_ENDPOINT_URLand use IAM roles.
Daily PnL (last 30 days)
SELECT date, asset, qty, price_eur, pnl_eur, pnl_pct
FROM gold.pnl_daily_asset
WHERE date >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY date, asset;Top PnL contributors (7 days)
WITH w AS (
SELECT date, asset, pnl_eur
FROM gold.pnl_daily_asset
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
)
SELECT asset,
SUM(pnl_eur) AS pnl_week_eur,
100.0 * SUM(pnl_eur) / NULLIF(SUM(SUM(pnl_eur)) OVER (), 0) AS contrib_pct
FROM w
GROUP BY asset
ORDER BY pnl_week_eur DESC;FX sensitivity ±1% (today)
SELECT asset, nav_eur, nav_up1pct, nav_down1pct,
nav_up1pct - nav_eur AS delta_up_eur,
nav_down1pct - nav_eur AS delta_down_eur
FROM gold.fx_sensitivity
WHERE date = CURRENT_DATE;- Data quality:
price>0andeur_rate>0= 100%; duplicates in gold < 0.1%. - Runtime: each run < 2 min; retries effective.
- Demo volume: ~N rows/day in
fact_price_minute, ~N infact_ohlc_daily.
- Unrealized PnL via static
positions(notransactions/FIFO yet). - Base currency: EUR with EURUSD; additional FX pairs in roadmap.
- Demo assets: BTC/ETH/SOL (extendable).
- Data tests: Great Expectations (3–5 tests) + HTML report.
- Modeling: dbt for
silver → goldand auto-docs. - Alerts: daily drawdown, intraday spike, FX risk threshold (Telegram/Slack).
- Portfolio realism:
transactions+ average cost (FIFO/AVG) → realized/unrealized PnL. - Dashboards: Metabase/Superset/QuickSight (NAV + PnL + contributions).
- RDS PostgreSQL 16 (schemas
airflow_meta&sentinel). - S3:
pnl-sentinel-raw|silver|gold|dagswith SSE-KMS + lifecycle. - ECR + ECS Fargate: two services (webserver, scheduler) using the same image as dev.
- IAM minimal to buckets/Secrets; Secrets Manager for DB credentials.
- CloudWatch Logs + Budget/alarms.
Production image
FROM apache/airflow:2.9.2
COPY airflow/requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt
COPY airflow/dags /opt/airflow/dagsFor educational purposes only — not investment advice.