Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions profile/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.11-slim

WORKDIR /app

RUN pip install --no-cache-dir uv

COPY pyproject.toml README.md ./
COPY src ./src

RUN uv pip install --system -e ".[dev]"

ENV DATABASE_URL=postgresql://postgres:postgres@db:5432/orderbook

CMD ["orderbook-app"]
75 changes: 74 additions & 1 deletion profile/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,74 @@
![brand-image](https://raw.githubusercontent.com/codatta/assets/refs/heads/main/brand-v3.png)
# Orderbook Ingestion (Binance + OKX)

This repository provides a Python (uv-managed) foundation to ingest **real-time** and **historical** orderbook data for **ETH/USDT** from **Binance** and **OKX**, store snapshots/updates in **PostgreSQL**, and prepare for future order placement.

## Goals

- Real-time orderbook streaming (WebSocket) for ETH/USDT (spot + perp as needed).
- Historical snapshot pulls (REST) for ETH/USDT.
- Store snapshots and updates in PostgreSQL.
- Prepare for order placement (spot/perp) with API keys.
- Provide simulated data generators for fast tests.
- Docker + docker-compose for easy deployment.

## Required APIs & Credentials

You can prepare these API permissions up front:

### Binance

**Public (no key required)**
- **REST snapshot (spot)**: `GET /api/v3/depth?symbol=ETHUSDT&limit=1000`
- **REST snapshot (perp)**: `GET /fapi/v1/depth?symbol=ETHUSDT&limit=1000`
- **WebSocket stream (spot)**: `wss://stream.binance.com:9443/ws/ethusdt@depth@100ms`
- **WebSocket stream (perp)**: `wss://fstream.binance.com/ws/ethusdt@depth@100ms`

**Private (key required)**
- **Spot order placement**: `POST /api/v3/order`
- **Futures order placement**: `POST /fapi/v1/order`

Required permissions: **Spot Trading** and/or **Futures Trading**; enable **Read** for account and order status.

### OKX

**Public (no key required)**
- **REST snapshot**: `GET /api/v5/market/books?instId=ETH-USDT&sz=400`
- **WebSocket stream**: `wss://ws.okx.com:8443/ws/v5/public` with subscribe message:
```json
{"op": "subscribe", "args": [{"channel": "books", "instId": "ETH-USDT"}]}
```

**Private (key required)**
- **Order placement (spot/perp)**: `POST /api/v5/trade/order`

Required permissions: **Trade** and **Read** for account/order status. Ensure passphrase is set.

## Quick Start (uv)

```bash
uv venv
source .venv/bin/activate
uv pip install -e .[dev]
```

## Docker

```bash
docker compose up --build
```

## Project Layout

```
src/orderbook_app/
connectors/ # Binance + OKX parsers
services/ # Simulated data generators
storage/ # PostgreSQL helpers
```

## Notes

- This code currently focuses on parsing and schema setup for ETH/USDT orderbooks.
- Use the simulated data generator to validate ingestion logic without API calls.
- Extend `storage/db.py` for historical storage policies (rollups, TTL, etc.).

19 changes: 19 additions & 0 deletions profile/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
db:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: orderbook
ports:
- "5432:5432"
volumes:
- db_data:/var/lib/postgresql/data
app:
build: .
depends_on:
- db
environment:
DATABASE_URL: postgresql://postgres:postgres@db:5432/orderbook
volumes:
db_data:
31 changes: 31 additions & 0 deletions profile/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[project]
name = "orderbook-app"
version = "0.1.0"
description = "Orderbook ingestion for Binance and OKX."
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Codatta" }]
dependencies = [
"asyncpg>=0.29.0",
"httpx>=0.27.0",
"pydantic>=2.7.0",
"websockets>=12.0",
]

[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
]

[project.scripts]
orderbook-app = "orderbook_app.__main__:main"

[tool.pytest.ini_options]
addopts = "-q"
asyncio_mode = "auto"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
1 change: 1 addition & 0 deletions profile/src/orderbook_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Orderbook ingestion package."""
30 changes: 30 additions & 0 deletions profile/src/orderbook_app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

import asyncio
import json

from orderbook_app.config import AppConfig
from orderbook_app.services.simulated import generate_simulated_update
from orderbook_app.storage.db import init_db, insert_update


async def run_simulated_ingest() -> None:
config = AppConfig.from_env()
await init_db(config.db_dsn)
update = generate_simulated_update("sim", "ETH/USDT")
await insert_update(
config.db_dsn,
update.venue,
update.symbol,
update.ts,
json.loads(update.model_dump_json())["bids"],
json.loads(update.model_dump_json())["asks"],
)


def main() -> None:
asyncio.run(run_simulated_ingest())


if __name__ == "__main__":
main()
15 changes: 15 additions & 0 deletions profile/src/orderbook_app/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from dataclasses import dataclass
import os


@dataclass(frozen=True)
class AppConfig:
db_dsn: str

@staticmethod
def from_env() -> "AppConfig":
return AppConfig(
db_dsn=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@db:5432/orderbook"),
)
27 changes: 27 additions & 0 deletions profile/src/orderbook_app/connectors/binance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from datetime import datetime, timezone

from orderbook_app.models import OrderBookSnapshot, OrderBookUpdate, parse_levels


def parse_depth_snapshot(payload: dict, symbol: str, venue: str) -> OrderBookSnapshot:
return OrderBookSnapshot(
venue=venue,
symbol=symbol,
ts=datetime.now(timezone.utc),
bids=parse_levels(payload.get("bids", [])),
asks=parse_levels(payload.get("asks", [])),
)


def parse_depth_update(payload: dict, symbol: str, venue: str) -> OrderBookUpdate:
event_time = payload.get("E")
ts = datetime.fromtimestamp(event_time / 1000, tz=timezone.utc) if event_time else datetime.now(timezone.utc)
return OrderBookUpdate(
venue=venue,
symbol=symbol,
ts=ts,
bids=parse_levels(payload.get("b", [])),
asks=parse_levels(payload.get("a", [])),
)
31 changes: 31 additions & 0 deletions profile/src/orderbook_app/connectors/okx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

from datetime import datetime, timezone

from orderbook_app.models import OrderBookSnapshot, OrderBookUpdate, parse_levels


def parse_snapshot(payload: dict, symbol: str) -> OrderBookSnapshot:
data = payload.get("data", [{}])[0]
ts_ms = int(data.get("ts", 0))
ts = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) if ts_ms else datetime.now(timezone.utc)
return OrderBookSnapshot(
venue="okx",
symbol=symbol,
ts=ts,
bids=parse_levels(data.get("bids", [])),
asks=parse_levels(data.get("asks", [])),
)


def parse_update(payload: dict, symbol: str) -> OrderBookUpdate:
data = payload.get("data", [{}])[0]
ts_ms = int(data.get("ts", 0))
ts = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) if ts_ms else datetime.now(timezone.utc)
return OrderBookUpdate(
venue="okx",
symbol=symbol,
ts=ts,
bids=parse_levels(data.get("bids", [])),
asks=parse_levels(data.get("asks", [])),
)
31 changes: 31 additions & 0 deletions profile/src/orderbook_app/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

from datetime import datetime
from typing import Iterable

from pydantic import BaseModel, Field


class OrderBookLevel(BaseModel):
price: float
size: float


class OrderBookSnapshot(BaseModel):
venue: str
symbol: str
ts: datetime
bids: list[OrderBookLevel] = Field(default_factory=list)
asks: list[OrderBookLevel] = Field(default_factory=list)


class OrderBookUpdate(BaseModel):
venue: str
symbol: str
ts: datetime
bids: list[OrderBookLevel] = Field(default_factory=list)
asks: list[OrderBookLevel] = Field(default_factory=list)


def parse_levels(levels: Iterable[Iterable[str | float]]) -> list[OrderBookLevel]:
return [OrderBookLevel(price=float(price), size=float(size)) for price, size in levels]
19 changes: 19 additions & 0 deletions profile/src/orderbook_app/services/simulated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations

from datetime import datetime, timezone
from random import random

from orderbook_app.models import OrderBookUpdate


def generate_simulated_update(venue: str, symbol: str) -> OrderBookUpdate:
base_price = 3000.0
bids = [[base_price - i, 1 + random()] for i in range(3)]
asks = [[base_price + i, 1 + random()] for i in range(3)]
return OrderBookUpdate(
venue=venue,
symbol=symbol,
ts=datetime.now(timezone.utc),
bids=[{"price": price, "size": size} for price, size in bids],
asks=[{"price": price, "size": size} for price, size in asks],
)
41 changes: 41 additions & 0 deletions profile/src/orderbook_app/storage/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

import asyncpg


CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS orderbook_updates (
id bigserial PRIMARY KEY,
venue text NOT NULL,
symbol text NOT NULL,
ts timestamptz NOT NULL,
bids jsonb NOT NULL,
asks jsonb NOT NULL
);
"""


async def init_db(dsn: str) -> None:
conn = await asyncpg.connect(dsn)
try:
await conn.execute(CREATE_TABLE_SQL)
finally:
await conn.close()


async def insert_update(dsn: str, venue: str, symbol: str, ts, bids, asks) -> None:
conn = await asyncpg.connect(dsn)
try:
await conn.execute(
"""
INSERT INTO orderbook_updates (venue, symbol, ts, bids, asks)
VALUES ($1, $2, $3, $4, $5)
""",
venue,
symbol,
ts,
bids,
asks,
)
finally:
await conn.close()
Loading