From a33356b65a2dc87c0e60440b4f8ef476c8b41a06 Mon Sep 17 00:00:00 2001 From: Mahirs7 Date: Mon, 27 Oct 2025 18:28:41 -0500 Subject: [PATCH 1/2] added normalized schema --- Normalizer/schema.py | 249 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 Normalizer/schema.py diff --git a/Normalizer/schema.py b/Normalizer/schema.py new file mode 100644 index 0000000..08ddb39 --- /dev/null +++ b/Normalizer/schema.py @@ -0,0 +1,249 @@ +""" +Unified data schema for orderbook events from all exchanges. + +This module defines the NormalizedEvent dataclass for representing +order events across different prediction market exchanges (Kalshi, Polymarket). + +File: src/normalization/schema.py +""" + +from dataclasses import dataclass, field +from typing import Optional, Any +import time +import uuid + + +@dataclass +class NormalizedEvent: + """ + Unified orderbook event for all exchanges. + + Supports events from: + - Kalshi: REST API and WebSocket + - Polymarket: User Channel (trades, orders) and Market Channel (book, price_change) + + Note: Some Polymarket messages (book, price_change) should be expanded into + multiple NormalizedEvent objects (one per price level). + + Fields: + event_id: Unique identifier for this event (UUID format) + exchange: Exchange name ("kalshi" or "polymarket") + market_id: Market identifier from the exchange + - Kalshi: Ticker format (e.g., "PRES2028") + - Polymarket: Condition ID (0x format, e.g., "0xbd31dc8a...") + timestamp: Event timestamp in nanoseconds (for precise ordering) + event_type: Type of event: + - "snapshot": Full orderbook snapshot (from book message) + - "insert": New order placed (from PLACEMENT or price_change with new level) + - "update": Order/price level updated (from UPDATE or price_change) + - "delete": Order cancelled (from CANCELLATION or price_change with size=0) + - "trade": Trade execution + - "metadata": Non-order event (e.g., tick_size_change) + side: Order side ("bid" or "ask"), None for metadata events + price: Price level (0.0 - 1.0 scale), None for metadata events + quantity: Quantity at this price level, None for metadata events + asset_id: Exchange-specific asset identifier (optional) + - Polymarket: ERC1155 token ID + order_id: Exchange-specific order identifier (optional) + trade_id: Exchange-specific trade identifier (optional) + status: Order/trade status (optional) + - Polymarket trades: "MATCHED", "MINED", "CONFIRMED", "RETRYING", "FAILED" + size_matched: Amount of order already filled (optional, for partial fills) + outcome: Market outcome this order is for (optional) + - Polymarket: "YES" or "NO" + owner: Anonymized user identifier (optional) + hash: Orderbook hash for integrity checking (optional, from Polymarket) + raw_data: Original event data from exchange for debugging + + Example: + >>> event = NormalizedEvent( + ... event_id="550e8400-e29b-41d4-a716-446655440000", + ... exchange="polymarket", + ... market_id="0xbd31dc8a20211944f6b70f31557f1001557b59905b7738480ca09bd4532f84af", + ... timestamp=1672290701000000000, + ... event_type="trade", + ... side="bid", + ... price=0.57, + ... quantity=10.0 + ... ) + >>> assert isinstance(event.price, float) + >>> assert event.exchange == "polymarket" + >>> assert 0 <= event.price <= 1 + """ + + # Required fields + event_id: str + exchange: str + market_id: str + timestamp: int + event_type: str + + # Optional fields - None for metadata events + side: Optional[str] = None + price: Optional[float] = None + quantity: Optional[float] = None + + # Additional context fields + asset_id: Optional[str] = None + order_id: Optional[str] = None + trade_id: Optional[str] = None + status: Optional[str] = None + size_matched: Optional[float] = None + outcome: Optional[str] = None + owner: Optional[str] = None + hash: Optional[str] = None + + # Metadata + raw_data: Optional[dict[str, Any]] = field(default_factory=dict) + + def __post_init__(self): + """Validate data after initialization.""" + + # Auto-generate event_id if not provided + if not self.event_id: + self.event_id = str(uuid.uuid4()) + + # Auto-generate timestamp if not provided or invalid + if not self.timestamp or self.timestamp <= 0: + self.timestamp = time.time_ns() + + # Validate exchange + if self.exchange not in {"kalshi", "polymarket"}: + raise ValueError(f"Invalid exchange: {self.exchange}. Must be 'kalshi' or 'polymarket'") + + # Validate event_type + valid_event_types = {"snapshot", "insert", "update", "delete", "trade", "metadata"} + if self.event_type not in valid_event_types: + raise ValueError(f"Invalid event_type: {self.event_type}. Must be one of {valid_event_types}") + + # Validate side if present + if self.side is not None and self.side not in {"bid", "ask"}: + raise ValueError(f"Invalid side: {self.side}. Must be 'bid' or 'ask'") + + # Validate price range if present (probability scale) + if self.price is not None: + if not (0 <= self.price <= 1): + raise ValueError(f"Price must be between 0 and 1, got {self.price}") + + # Validate quantity if present (allow 0 for delete events) + if self.quantity is not None: + if self.quantity < 0: + raise ValueError(f"Quantity must be non-negative, got {self.quantity}") + + def to_dict(self) -> dict[str, Any]: + """ + Convert to dictionary for serialization. + + Returns: + Dictionary representation of the event, suitable for JSON serialization + or database storage. + """ + return { + 'event_id': self.event_id, + 'exchange': self.exchange, + 'market_id': self.market_id, + 'timestamp': self.timestamp, + 'event_type': self.event_type, + 'side': self.side, + 'price': self.price, + 'quantity': self.quantity, + 'asset_id': self.asset_id, + 'order_id': self.order_id, + 'trade_id': self.trade_id, + 'status': self.status, + 'size_matched': self.size_matched, + 'outcome': self.outcome, + 'owner': self.owner, + 'hash': self.hash, + 'raw_data': self.raw_data, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'NormalizedEvent': + """ + Create NormalizedEvent from dictionary. + + Args: + data: Dictionary containing event data + + Returns: + NormalizedEvent instance + """ + return cls(**data) + + def __repr__(self) -> str: + """String representation for debugging.""" + return ( + f"NormalizedEvent(event_id='{self.event_id[:8]}...', " + f"exchange='{self.exchange}', " + f"event_type='{self.event_type}', " + f"side='{self.side}', " + f"price={self.price}, " + f"quantity={self.quantity})" + ) + + +# Constants for event type mapping +class EventTypes: + """Constants for event types.""" + SNAPSHOT = "snapshot" + INSERT = "insert" + UPDATE = "update" + DELETE = "delete" + TRADE = "trade" + METADATA = "metadata" + + +class Sides: + """Constants for order sides.""" + BID = "bid" + ASK = "ask" + + +class Exchanges: + """Constants for supported exchanges.""" + KALSHI = "kalshi" + POLYMARKET = "polymarket" + + +# Validation helpers +def validate_price(price: float) -> None: + """ + Validate price is in valid range (0-1). + + Args: + price: Price to validate + + Raises: + ValueError: If price is out of range + """ + if not (0 <= price <= 1): + raise ValueError(f"Price must be between 0 and 1, got {price}") + + +def validate_quantity(quantity: float) -> None: + """ + Validate quantity is non-negative. + + Args: + quantity: Quantity to validate + + Raises: + ValueError: If quantity is negative + """ + if quantity < 0: + raise ValueError(f"Quantity must be non-negative, got {quantity}") + + +def validate_exchange(exchange: str) -> None: + """ + Validate exchange is supported. + + Args: + exchange: Exchange name to validate + + Raises: + ValueError: If exchange is not supported + """ + if exchange not in {Exchanges.KALSHI, Exchanges.POLYMARKET}: + raise ValueError(f"Invalid exchange: {exchange}") From 17cf126a44ad4977fca0f12c6aaafe916c5d94e8 Mon Sep 17 00:00:00 2001 From: Keshav Ramamurthy Date: Tue, 28 Oct 2025 21:00:33 -0500 Subject: [PATCH 2/2] feat(normalizer): add parser for polymarket messages --- Normalizer/polymarket_parser.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 Normalizer/polymarket_parser.py diff --git a/Normalizer/polymarket_parser.py b/Normalizer/polymarket_parser.py new file mode 100644 index 0000000..1e4fe2e --- /dev/null +++ b/Normalizer/polymarket_parser.py @@ -0,0 +1,17 @@ +from Normalizer.schema import EventTypes, Exchanges, NormalizedEvent +import uuid + + +def parse_polymarket_message(raw_msg: dict) -> NormalizedEvent: + event = NormalizedEvent( + exchange=Exchanges.POLYMARKET, + market_id=raw_msg.market, + event_id=str(uuid.uuid4()), + timestamp=raw_msg.timestamp, + event_type=EventTypes.SNAPSHOT, + asset_id=raw_msg.asset_id, + hash=raw_msg.hash, + raw_data=raw_msg + ) + + return event \ No newline at end of file