Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Normalizer/polymarket_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from Normalizer.schema import EventTypes, Exchanges, NormalizedEvent
import uuid


def parse_polymarket_message(raw_msg: dict) -> NormalizedEvent:
event = NormalizedEvent(
exchange=Exchanges.POLYMARKET,
market_id=raw_msg.market,
event_id=str(uuid.uuid4()),
timestamp=raw_msg.timestamp,
event_type=EventTypes.SNAPSHOT,
asset_id=raw_msg.asset_id,
hash=raw_msg.hash,
raw_data=raw_msg
)

return event
249 changes: 249 additions & 0 deletions Normalizer/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
"""
Unified data schema for orderbook events from all exchanges.

This module defines the NormalizedEvent dataclass for representing
order events across different prediction market exchanges (Kalshi, Polymarket).

File: src/normalization/schema.py
"""

from dataclasses import dataclass, field
from typing import Optional, Any
import time
import uuid


@dataclass
class NormalizedEvent:
"""
Unified orderbook event for all exchanges.

Supports events from:
- Kalshi: REST API and WebSocket
- Polymarket: User Channel (trades, orders) and Market Channel (book, price_change)

Note: Some Polymarket messages (book, price_change) should be expanded into
multiple NormalizedEvent objects (one per price level).

Fields:
event_id: Unique identifier for this event (UUID format)
exchange: Exchange name ("kalshi" or "polymarket")
market_id: Market identifier from the exchange
- Kalshi: Ticker format (e.g., "PRES2028")
- Polymarket: Condition ID (0x format, e.g., "0xbd31dc8a...")
timestamp: Event timestamp in nanoseconds (for precise ordering)
event_type: Type of event:
- "snapshot": Full orderbook snapshot (from book message)
- "insert": New order placed (from PLACEMENT or price_change with new level)
- "update": Order/price level updated (from UPDATE or price_change)
- "delete": Order cancelled (from CANCELLATION or price_change with size=0)
- "trade": Trade execution
- "metadata": Non-order event (e.g., tick_size_change)
side: Order side ("bid" or "ask"), None for metadata events
price: Price level (0.0 - 1.0 scale), None for metadata events
quantity: Quantity at this price level, None for metadata events
asset_id: Exchange-specific asset identifier (optional)
- Polymarket: ERC1155 token ID
order_id: Exchange-specific order identifier (optional)
trade_id: Exchange-specific trade identifier (optional)
status: Order/trade status (optional)
- Polymarket trades: "MATCHED", "MINED", "CONFIRMED", "RETRYING", "FAILED"
size_matched: Amount of order already filled (optional, for partial fills)
outcome: Market outcome this order is for (optional)
- Polymarket: "YES" or "NO"
owner: Anonymized user identifier (optional)
hash: Orderbook hash for integrity checking (optional, from Polymarket)
raw_data: Original event data from exchange for debugging

Example:
>>> event = NormalizedEvent(
... event_id="550e8400-e29b-41d4-a716-446655440000",
... exchange="polymarket",
... market_id="0xbd31dc8a20211944f6b70f31557f1001557b59905b7738480ca09bd4532f84af",
... timestamp=1672290701000000000,
... event_type="trade",
... side="bid",
... price=0.57,
... quantity=10.0
... )
>>> assert isinstance(event.price, float)
>>> assert event.exchange == "polymarket"
>>> assert 0 <= event.price <= 1
"""

# Required fields
event_id: str
exchange: str
market_id: str
timestamp: int
event_type: str

# Optional fields - None for metadata events
side: Optional[str] = None
price: Optional[float] = None
quantity: Optional[float] = None

# Additional context fields
asset_id: Optional[str] = None
order_id: Optional[str] = None
trade_id: Optional[str] = None
status: Optional[str] = None
size_matched: Optional[float] = None
outcome: Optional[str] = None
owner: Optional[str] = None
hash: Optional[str] = None

# Metadata
raw_data: Optional[dict[str, Any]] = field(default_factory=dict)

def __post_init__(self):
"""Validate data after initialization."""

# Auto-generate event_id if not provided
if not self.event_id:
self.event_id = str(uuid.uuid4())

# Auto-generate timestamp if not provided or invalid
if not self.timestamp or self.timestamp <= 0:
self.timestamp = time.time_ns()

# Validate exchange
if self.exchange not in {"kalshi", "polymarket"}:
raise ValueError(f"Invalid exchange: {self.exchange}. Must be 'kalshi' or 'polymarket'")

# Validate event_type
valid_event_types = {"snapshot", "insert", "update", "delete", "trade", "metadata"}
if self.event_type not in valid_event_types:
raise ValueError(f"Invalid event_type: {self.event_type}. Must be one of {valid_event_types}")

# Validate side if present
if self.side is not None and self.side not in {"bid", "ask"}:
raise ValueError(f"Invalid side: {self.side}. Must be 'bid' or 'ask'")

# Validate price range if present (probability scale)
if self.price is not None:
if not (0 <= self.price <= 1):
raise ValueError(f"Price must be between 0 and 1, got {self.price}")

# Validate quantity if present (allow 0 for delete events)
if self.quantity is not None:
if self.quantity < 0:
raise ValueError(f"Quantity must be non-negative, got {self.quantity}")

def to_dict(self) -> dict[str, Any]:
"""
Convert to dictionary for serialization.

Returns:
Dictionary representation of the event, suitable for JSON serialization
or database storage.
"""
return {
'event_id': self.event_id,
'exchange': self.exchange,
'market_id': self.market_id,
'timestamp': self.timestamp,
'event_type': self.event_type,
'side': self.side,
'price': self.price,
'quantity': self.quantity,
'asset_id': self.asset_id,
'order_id': self.order_id,
'trade_id': self.trade_id,
'status': self.status,
'size_matched': self.size_matched,
'outcome': self.outcome,
'owner': self.owner,
'hash': self.hash,
'raw_data': self.raw_data,
}

@classmethod
def from_dict(cls, data: dict[str, Any]) -> 'NormalizedEvent':
"""
Create NormalizedEvent from dictionary.

Args:
data: Dictionary containing event data

Returns:
NormalizedEvent instance
"""
return cls(**data)

def __repr__(self) -> str:
"""String representation for debugging."""
return (
f"NormalizedEvent(event_id='{self.event_id[:8]}...', "
f"exchange='{self.exchange}', "
f"event_type='{self.event_type}', "
f"side='{self.side}', "
f"price={self.price}, "
f"quantity={self.quantity})"
)


# Constants for event type mapping
class EventTypes:
"""Constants for event types."""
SNAPSHOT = "snapshot"
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
TRADE = "trade"
METADATA = "metadata"


class Sides:
"""Constants for order sides."""
BID = "bid"
ASK = "ask"


class Exchanges:
"""Constants for supported exchanges."""
KALSHI = "kalshi"
POLYMARKET = "polymarket"


# Validation helpers
def validate_price(price: float) -> None:
"""
Validate price is in valid range (0-1).

Args:
price: Price to validate

Raises:
ValueError: If price is out of range
"""
if not (0 <= price <= 1):
raise ValueError(f"Price must be between 0 and 1, got {price}")


def validate_quantity(quantity: float) -> None:
"""
Validate quantity is non-negative.

Args:
quantity: Quantity to validate

Raises:
ValueError: If quantity is negative
"""
if quantity < 0:
raise ValueError(f"Quantity must be non-negative, got {quantity}")


def validate_exchange(exchange: str) -> None:
"""
Validate exchange is supported.

Args:
exchange: Exchange name to validate

Raises:
ValueError: If exchange is not supported
"""
if exchange not in {Exchanges.KALSHI, Exchanges.POLYMARKET}:
raise ValueError(f"Invalid exchange: {exchange}")