From e1b1be886399d9f60be9b1097687847d8983bef1 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 17:24:02 +0000 Subject: [PATCH 1/6] feat: implement WebSocket support for order updates and market data Add comprehensive WebSocket functionality to the Hyperliquid mock server: WebSocket Features: - Real-time order updates subscription (orderUpdates) - Best bid/offer market data subscription (l2Book/bbo) - Automatic broadcasting on order state changes - Multiple subscriptions per connection - Thread-safe concurrent operations Implementation: - New WebSocketManager for connection and subscription handling - Integration with existing State for automatic order update broadcasts - WebSocket endpoint at /ws - Support for subscribe/unsubscribe operations Test Utilities: - TestServer.WebSocketURL() for easy test connections - TestServer.SetBBO() for manual market data updates - TestServer.TriggerBBOUpdate() for forcing BBO broadcasts - Example tests demonstrating WebSocket usage Documentation: - Comprehensive WEBSOCKET.md guide - Updated README.md with WebSocket information - Example code for order updates and BBO subscriptions Mock Data: - BTC: bid=86956.5, ask=87043.5 (centered on 87000 USDT) - ETH: bid=2999.5, ask=3000.5 - SOL: bid=99.9, ask=100.1 Compliance: - Follows official Hyperliquid WebSocket API specifications - Compatible with upstream API docs --- README.md | 35 ++- WEBSOCKET.md | 340 +++++++++++++++++++++++ server/handlers.go | 8 +- server/integration_test.go | 2 +- server/server.go | 7 +- server/state.go | 42 ++- server/testserver.go | 39 +++ server/websocket.go | 462 +++++++++++++++++++++++++++++++ server/websocket_example_test.go | 178 ++++++++++++ 9 files changed, 1105 insertions(+), 8 deletions(-) create mode 100644 WEBSOCKET.md create mode 100644 server/websocket.go create mode 100644 server/websocket_example_test.go diff --git a/README.md b/README.md index 49646ce..91c1c01 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ A minimal Go implementation of the Hyperliquid API for E2E testing without requi - **No authentication validation** - Accepts all requests without signature verification - **In-memory order state** - Tracks orders for realistic responses +- **WebSocket support** - Real-time order updates and market data (BBO) - **Unlimited requests** - No rate limiting - **Simple responses** - Returns success for all valid operations @@ -84,6 +85,31 @@ All other action types return a generic success response but do not implement re ## Endpoints +### WebSocket /ws + +Real-time data streaming for order updates and market data. See [WEBSOCKET.md](WEBSOCKET.md) for detailed documentation. + +**Supported subscriptions:** +- `orderUpdates` - Real-time order status updates +- `l2Book` / `bbo` - Best bid/offer market data + +**Example connection:** +```javascript +const ws = new WebSocket('ws://localhost:8080/ws'); + +ws.onopen = () => { + ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'orderUpdates', user: '0x1234...' } + })); +}; + +ws.onmessage = (event) => { + const update = JSON.parse(event.data); + console.log('Order update:', update); +}; +``` + ### POST /exchange Handles trading actions: @@ -423,7 +449,7 @@ curl -X POST http://localhost:8080/info \ - **No signature validation** - All requests are accepted regardless of signature - **No real order matching** - Orders are just stored in memory -- **No WebSocket support** - Only HTTP REST endpoints +- **Limited WebSocket subscriptions** - Only orderUpdates and l2Book/bbo (no trades, candles, etc.) - **Simplified responses** - Some fields may be omitted or simplified - **No persistence** - All state is lost when the server restarts - **No rate limiting** - Unlimited requests accepted @@ -438,15 +464,20 @@ hyperliquid-mock/ ├── go.mod # Go module file ├── go.sum # Go dependencies ├── README.md # This file +├── WEBSOCKET.md # WebSocket API documentation ├── upstream-api-docs/ # Real Hyperliquid API reference (for expansion) │ ├── exchange-endpoint.md │ ├── info-endpoint.md │ ├── perpetuals.md -│ └── spot.md +│ ├── spot.md +│ ├── websocket.md +│ └── subscriptions.md └── server/ ├── server.go # HTTP server setup ├── handlers.go # Request handlers ├── handlers_test.go # Handler unit tests + ├── websocket.go # WebSocket connection & subscription management + ├── websocket_example_test.go # WebSocket usage examples ├── types.go # Request/response types ├── state.go # In-memory state management ├── testserver.go # Test helpers & request capture diff --git a/WEBSOCKET.md b/WEBSOCKET.md new file mode 100644 index 0000000..0294689 --- /dev/null +++ b/WEBSOCKET.md @@ -0,0 +1,340 @@ +# WebSocket Implementation + +This document describes the WebSocket functionality added to the Hyperliquid Mock Server. + +## Overview + +The mock server now supports WebSocket connections for real-time data streaming, matching the Hyperliquid API protocol. Two primary subscription types are implemented: + +1. **Order Updates** (`orderUpdates`) - Real-time order status updates +2. **Level 2 Book / BBO** (`l2Book` / `bbo`) - Real-time market data + +## WebSocket Endpoint + +- **URL**: `ws://localhost:/ws` +- **Protocol**: WebSocket (RFC 6455) +- **Connection**: Persistent, bidirectional + +## Subscription Types + +### 1. Order Updates Subscription + +Receive real-time updates when orders are created, modified, filled, or canceled. + +#### Subscription Request +```json +{ + "method": "subscribe", + "subscription": { + "type": "orderUpdates", + "user": "0xWALLET_ADDRESS" + } +} +``` + +#### Update Message Format +```json +{ + "channel": "orderUpdates", + "data": [ + { + "order": { + "coin": "BTC", + "side": "B", + "limitPx": "87000", + "sz": "1.0", + "oid": 1000001, + "timestamp": 1699999999999, + "origSz": "1.0", + "cloid": "000000010000000200000003" + }, + "status": "open", + "statusTimestamp": 1699999999999 + } + ] +} +``` + +#### Order Status Values +- `open` - Order is active on the order book +- `filled` - Order is completely filled +- `canceled` - Order has been canceled + +#### Trigger Events +Updates are automatically sent when: +- Order created → `status: "open"` +- Order modified → `status: "open"` (with updated price/size) +- Order filled (full or partial) → `status: "filled"` or `status: "open"` with updated `sz` +- Order canceled → `status: "canceled"` + +### 2. BBO (Best Bid Offer) Subscription + +Receive real-time market data for price discovery. + +#### Subscription Request (l2Book) +```json +{ + "method": "subscribe", + "subscription": { + "type": "l2Book", + "coin": "BTC" + } +} +``` + +#### Subscription Request (bbo alternative) +```json +{ + "method": "subscribe", + "subscription": { + "type": "bbo", + "coin": "BTC" + } +} +``` + +#### Update Message Format +```json +{ + "channel": "l2Book", + "data": { + "coin": "BTC", + "time": 1699999999999, + "levels": [ + [ + {"px": "86956.5", "sz": "10.5", "n": 1} + ], + [ + {"px": "87043.5", "sz": "8.3", "n": 1} + ] + ] + } +} +``` + +#### Level Fields +- `px` - Price as string +- `sz` - Size/quantity as string +- `n` - Number of orders at this price level + +#### Default BBO Prices + +The mock server provides realistic default prices: + +| Coin | Bid Price | Ask Price | Spread | +|------|-----------|-----------|--------| +| BTC | 86956.5 | 87043.5 | ~0.1% | +| ETH | 2999.5 | 3000.5 | ~0.03% | +| SOL | 99.9 | 100.1 | ~0.2% | + +Note: BTC prices are centered around 87000 USDT to match the existing IOC test behavior. + +## Connection Management + +### Establishing Connection + +```javascript +const ws = new WebSocket('ws://localhost:8080/ws'); + +ws.onopen = () => { + console.log('Connected to Hyperliquid Mock Server'); + + // Subscribe to order updates + ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { + type: 'orderUpdates', + user: '0x1234567890abcdef' + } + })); +}; + +ws.onmessage = (event) => { + const message = JSON.parse(event.data); + console.log('Received:', message); +}; +``` + +### Subscription Acknowledgment + +After sending a subscription request, the server responds with: + +```json +{ + "channel": "subscriptionResponse", + "data": { + "method": "subscribe", + "subscription": { + "type": "orderUpdates", + "user": "0x1234567890abcdef" + } + } +} +``` + +### Unsubscribing + +```json +{ + "method": "unsubscribe", + "subscription": { + "type": "orderUpdates", + "user": "0xWALLET_ADDRESS" + } +} +``` + +### Multiple Subscriptions + +A single WebSocket connection can maintain multiple subscriptions: + +```javascript +// Subscribe to order updates +ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'orderUpdates', user: '0x123...' } +})); + +// Subscribe to BTC BBO +ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'l2Book', coin: 'BTC' } +})); + +// Subscribe to ETH BBO +ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'l2Book', coin: 'ETH' } +})); +``` + +## Test Server API + +The `TestServer` includes helper methods for WebSocket testing: + +### Get WebSocket URL + +```go +ts := server.NewTestServer(t) +wsURL := ts.WebSocketURL() // Returns "ws://127.0.0.1:12345/ws" +``` + +### Manual BBO Updates + +```go +// Set specific BBO prices +ts.SetBBO("BTC", 87000.0, 5.0, 87100.0, 4.5) +// coin bidPx bidSz askPx askSz + +// Trigger a BBO update with default prices +ts.TriggerBBOUpdate("BTC") +``` + +### Example Test + +```go +func TestWebSocketOrderUpdates(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + conn, _, err := websocket.DefaultDialer.Dial(ts.WebSocketURL(), nil) + require.NoError(t, err) + defer conn.Close() + + // Subscribe + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "orderUpdates", + "user": "0x1234", + }, + } + conn.WriteJSON(subscription) + + // Read subscription ack + var ack map[string]interface{} + conn.ReadJSON(&ack) + + // Place an order via HTTP API (triggers WebSocket update) + // ... HTTP request ... + + // Receive order update + var update map[string]interface{} + conn.ReadJSON(&update) + + assert.Equal(t, "orderUpdates", update["channel"]) +} +``` + +## Error Handling + +### Error Message Format + +```json +{ + "channel": "error", + "error": "Invalid subscription type" +} +``` + +### Common Errors + +- `"Invalid subscription message"` - Malformed JSON +- `"Unknown method: "` - Invalid method field +- `"Missing subscription type"` - No type in subscription object +- `"Missing user address for orderUpdates"` - orderUpdates requires user field +- `"Missing coin for l2Book"` - l2Book/bbo requires coin field +- `"Unsupported subscription type: "` - Unknown subscription type + +## Implementation Details + +### Architecture + +The WebSocket implementation consists of: + +1. **WebSocketManager** (`websocket.go`) - Manages connections and subscriptions +2. **State Integration** - Broadcasts updates when order state changes +3. **Handler Integration** - Exposes `/ws` endpoint + +### Broadcasting Behavior + +- **Order Updates**: Sent immediately when order state changes (create, modify, fill, cancel) +- **BBO Updates**: + - Initial snapshot sent immediately after subscription + - Updates can be triggered manually via `SetBBO()` or `TriggerBBOUpdate()` + - In production, would update periodically or on price changes + +### Thread Safety + +- All WebSocket operations are thread-safe +- Concurrent subscriptions and broadcasts are supported +- Each connection has its own write mutex to prevent concurrent writes + +### Performance Considerations + +- Order update channel: 100 message buffer +- L2 book update channel: 100 message buffer +- Dropped messages logged as warnings when buffers full + +## Compliance with Upstream API + +This implementation follows the official Hyperliquid WebSocket API specifications from: +- `/upstream-api-docs/websocket.md` +- `/upstream-api-docs/subscriptions.md` + +Key differences from production API: +- User tracking simplified (uses placeholder in mock) +- BBO updates are manual/on-demand (vs. real-time market data) +- Limited to orderUpdates and l2Book/bbo subscriptions +- No authentication required + +## Future Enhancements + +Potential additions (not currently in scope): +- Additional subscription types (trades, candles, userFills, etc.) +- Automatic periodic BBO updates +- User-based order filtering +- WebSocket authentication +- Connection heartbeat/ping-pong +- Reconnection handling diff --git a/server/handlers.go b/server/handlers.go index 32289da..f4b4c6b 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -14,6 +14,7 @@ import ( type Handler struct { state *State logger *slog.Logger + wsm *WebSocketManager } // NewHandler creates a new request handler @@ -30,9 +31,14 @@ func NewHandler(opts ...Option) *Handler { logger = slog.Default() } + wsm := NewWebSocketManager(logger) + state := NewState() + state.SetWebSocketManager(wsm) + return &Handler{ - state: NewState(), + state: state, logger: logger, + wsm: wsm, } } diff --git a/server/integration_test.go b/server/integration_test.go index 4843dfa..d1d13c8 100644 --- a/server/integration_test.go +++ b/server/integration_test.go @@ -155,7 +155,7 @@ func TestOrderModificationWithGoHyperliquid(t *testing.T) { Order: hyperliquid.CreateOrderRequest{ Coin: "BTC", IsBuy: true, - Size: 0.75, // Changed size + Size: 0.75, // Changed size Price: 51000.0, // Changed price OrderType: hyperliquid.OrderType{ Limit: &hyperliquid.LimitOrderType{Tif: hyperliquid.TifGtc}, diff --git a/server/server.go b/server/server.go index 9af1291..f89610d 100644 --- a/server/server.go +++ b/server/server.go @@ -15,12 +15,17 @@ func Run(addr string) error { mux.HandleFunc("/exchange", handler.HandleExchange) mux.HandleFunc("/info", handler.HandleInfo) mux.HandleFunc("/health", handler.HandleHealth) + mux.HandleFunc("/ws", handler.wsm.HandleConnection) // Log all requests loggedMux := loggingMiddleware(logger, mux) logger.Info("Mock Hyperliquid API server listening", "addr", addr) - logger.Info("Endpoints", "exchange", addr+"/exchange", "info", addr+"/info", "health", addr+"/health") + logger.Info("Endpoints", + "exchange", addr+"/exchange", + "info", addr+"/info", + "health", addr+"/health", + "websocket", "ws://"+addr[7:]+"/ws") return http.ListenAndServe(addr, loggedMux) } diff --git a/server/state.go b/server/state.go index 5fa560a..306eb2c 100644 --- a/server/state.go +++ b/server/state.go @@ -8,9 +8,10 @@ import ( // State manages the mock server's in-memory order state type State struct { - mu sync.RWMutex - orders map[string]*OrderDetail // cloid -> OrderDetail - nextOid int64 + mu sync.RWMutex + orders map[string]*OrderDetail // cloid -> OrderDetail + nextOid int64 + wsm *WebSocketManager // For broadcasting order updates } // NewState creates a new state manager @@ -21,6 +22,11 @@ func NewState() *State { } } +// SetWebSocketManager sets the WebSocket manager for broadcasting updates +func (s *State) SetWebSocketManager(wsm *WebSocketManager) { + s.wsm = wsm +} + // CreateOrder adds a new order to the state func (s *State) CreateOrder(cloid string, coin string, side string, limitPx string, sz string) int64 { s.mu.Lock() @@ -45,6 +51,14 @@ func (s *State) CreateOrder(cloid string, coin string, side string, limitPx stri } s.orders[cloid] = order + + // Broadcast order update via WebSocket + if s.wsm != nil { + // Note: In a real system, we'd track which user owns which order + // For the mock, we use a placeholder user address + s.wsm.BroadcastOrderUpdate("", order) + } + return oid } @@ -62,6 +76,11 @@ func (s *State) ModifyOrder(cloid string, limitPx string, sz string) (int64, boo order.Order.Sz = sz order.StatusTimestamp = time.Now().UnixMilli() + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate("", order) + } + return order.Order.Oid, true } @@ -76,6 +95,12 @@ func (s *State) ModifyOrderByOid(oid int64, limitPx string, sz string) (int64, b order.Order.LimitPx = limitPx order.Order.Sz = sz order.StatusTimestamp = time.Now().UnixMilli() + + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate("", order) + } + return order.Order.Oid, true } } @@ -96,6 +121,11 @@ func (s *State) CancelOrder(cloid string) bool { order.Status = "canceled" order.StatusTimestamp = time.Now().UnixMilli() + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate("", order) + } + return true } @@ -109,6 +139,12 @@ func (s *State) CancelOrderByOid(oid int64) bool { if order.Order.Oid == oid { order.Status = "canceled" order.StatusTimestamp = time.Now().UnixMilli() + + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate("", order) + } + return true } } diff --git a/server/testserver.go b/server/testserver.go index 9878a1c..79a90e0 100644 --- a/server/testserver.go +++ b/server/testserver.go @@ -208,6 +208,7 @@ func NewTestServer(t *testing.T, opts ...TestServerOption) *TestServer { mux.HandleFunc("/exchange", handler.HandleExchange) mux.HandleFunc("/info", handler.HandleInfo) mux.HandleFunc("/health", handler.HandleHealth) + mux.HandleFunc("/ws", handler.wsm.HandleConnection) // Wrap with capture middleware capturedMux := capture.Wrap(mux) @@ -235,6 +236,16 @@ func (ts *TestServer) URL() string { return ts.httpServer.URL } +// WebSocketURL returns the WebSocket URL of the test server +func (ts *TestServer) WebSocketURL() string { + httpURL := ts.httpServer.URL + // Replace http:// with ws:// + if len(httpURL) > 7 && httpURL[:7] == "http://" { + return "ws://" + httpURL[7:] + "/ws" + } + return httpURL + "/ws" +} + // Close shuts down the test server and blocks until all requests complete func (ts *TestServer) Close() { if ts.httpServer != nil { @@ -361,5 +372,33 @@ func (ts *TestServer) FillOrder(cloid string, fillPrice float64, opts ...FillOpt } order.StatusTimestamp = time.Now().UnixMilli() + // Broadcast fill update via WebSocket + if state.wsm != nil { + state.wsm.BroadcastOrderUpdate("", order) + } + return nil } + +// SetBBO allows tests to manually set BBO (Best Bid Offer) prices for a coin +func (ts *TestServer) SetBBO(coin string, bidPx, bidSz, askPx, askSz float64) { + if ts == nil || ts.handler == nil || ts.handler.wsm == nil { + return + } + + bidPxStr := strconv.FormatFloat(bidPx, 'f', -1, 64) + bidSzStr := strconv.FormatFloat(bidSz, 'f', -1, 64) + askPxStr := strconv.FormatFloat(askPx, 'f', -1, 64) + askSzStr := strconv.FormatFloat(askSz, 'f', -1, 64) + + ts.handler.wsm.SetBBO(coin, bidPxStr, bidSzStr, askPxStr, askSzStr) +} + +// TriggerBBOUpdate forces an immediate BBO update for a coin +func (ts *TestServer) TriggerBBOUpdate(coin string) { + if ts == nil || ts.handler == nil || ts.handler.wsm == nil { + return + } + + ts.handler.wsm.TriggerBBOUpdate(coin) +} diff --git a/server/websocket.go b/server/websocket.go new file mode 100644 index 0000000..90be851 --- /dev/null +++ b/server/websocket.go @@ -0,0 +1,462 @@ +package server + +import ( + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// WebSocketManager handles WebSocket connections and subscriptions +type WebSocketManager struct { + mu sync.RWMutex + connections map[*websocket.Conn]*ConnectionState + logger *slog.Logger + orderUpdatesCh chan OrderUpdate + l2BookCh chan L2BookUpdate + upgrader websocket.Upgrader +} + +// ConnectionState tracks subscriptions for a single WebSocket connection +type ConnectionState struct { + conn *websocket.Conn + mu sync.RWMutex + orderUpdatesUser string // empty if not subscribed + l2BookCoins map[string]bool + writeMu sync.Mutex // Protects concurrent writes to conn +} + +// SubscriptionMessage represents a subscription request from the client +type SubscriptionMessage struct { + Method string `json:"method"` + Subscription map[string]interface{} `json:"subscription"` +} + +// SubscriptionResponse is sent to acknowledge a subscription +type SubscriptionResponse struct { + Channel string `json:"channel"` + Data map[string]interface{} `json:"data"` +} + +// OrderUpdate represents an order state change to broadcast +type OrderUpdate struct { + User string + Orders []WsOrder +} + +// WsOrder matches the Hyperliquid WebSocket order format +type WsOrder struct { + Order WsBasicOrder `json:"order"` + Status string `json:"status"` + StatusTimestamp int64 `json:"statusTimestamp"` +} + +// WsBasicOrder contains basic order information +type WsBasicOrder struct { + Coin string `json:"coin"` + Side string `json:"side"` + LimitPx string `json:"limitPx"` + Sz string `json:"sz"` + Oid int64 `json:"oid"` + Timestamp int64 `json:"timestamp"` + OrigSz string `json:"origSz"` + Cloid *string `json:"cloid,omitempty"` +} + +// L2BookUpdate represents a market data update to broadcast +type L2BookUpdate struct { + Coin string + Levels [2][]WsLevel + Time int64 +} + +// WsLevel represents a price level in the order book +type WsLevel struct { + Px string `json:"px"` + Sz string `json:"sz"` + N int `json:"n"` +} + +// BBOUpdate is an alternative simpler format for BBO updates +type BBOUpdate struct { + Coin string `json:"coin"` + Time int64 `json:"time"` + BBO [2]WsLevel `json:"bbo"` // [bid, ask] +} + +// NewWebSocketManager creates a new WebSocket manager +func NewWebSocketManager(logger *slog.Logger) *WebSocketManager { + if logger == nil { + logger = slog.Default() + } + + wsm := &WebSocketManager{ + connections: make(map[*websocket.Conn]*ConnectionState), + logger: logger, + orderUpdatesCh: make(chan OrderUpdate, 100), + l2BookCh: make(chan L2BookUpdate, 100), + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true // Allow all origins for testing + }, + }, + } + + // Start broadcaster goroutines + go wsm.broadcastOrderUpdates() + go wsm.broadcastL2Book() + + return wsm +} + +// HandleConnection upgrades HTTP to WebSocket and manages the connection +func (wsm *WebSocketManager) HandleConnection(w http.ResponseWriter, r *http.Request) { + conn, err := wsm.upgrader.Upgrade(w, r, nil) + if err != nil { + wsm.logger.Error("failed to upgrade connection", "error", err) + return + } + + // Register connection + state := &ConnectionState{ + conn: conn, + l2BookCoins: make(map[string]bool), + } + + wsm.mu.Lock() + wsm.connections[conn] = state + wsm.mu.Unlock() + + wsm.logger.Info("websocket connection established", "remote", conn.RemoteAddr()) + + // Clean up on disconnect + defer func() { + wsm.mu.Lock() + delete(wsm.connections, conn) + wsm.mu.Unlock() + conn.Close() + wsm.logger.Info("websocket connection closed", "remote", conn.RemoteAddr()) + }() + + // Read messages from client + for { + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + wsm.logger.Error("websocket read error", "error", err) + } + break + } + + wsm.handleMessage(state, message) + } +} + +// handleMessage processes subscription/unsubscription messages +func (wsm *WebSocketManager) handleMessage(state *ConnectionState, message []byte) { + var subMsg SubscriptionMessage + if err := json.Unmarshal(message, &subMsg); err != nil { + wsm.logger.Error("failed to parse subscription message", "error", err, "message", string(message)) + wsm.sendError(state.conn, "Invalid subscription message") + return + } + + wsm.logger.Debug("received subscription message", "method", subMsg.Method, "subscription", subMsg.Subscription) + + switch subMsg.Method { + case "subscribe": + wsm.handleSubscribe(state, subMsg.Subscription) + case "unsubscribe": + wsm.handleUnsubscribe(state, subMsg.Subscription) + default: + wsm.sendError(state.conn, "Unknown method: "+subMsg.Method) + } +} + +// handleSubscribe processes a subscription request +func (wsm *WebSocketManager) handleSubscribe(state *ConnectionState, sub map[string]interface{}) { + subType, ok := sub["type"].(string) + if !ok { + wsm.sendError(state.conn, "Missing subscription type") + return + } + + switch subType { + case "orderUpdates": + user, ok := sub["user"].(string) + if !ok { + wsm.sendError(state.conn, "Missing user address for orderUpdates") + return + } + state.mu.Lock() + state.orderUpdatesUser = user + state.mu.Unlock() + + wsm.logger.Info("subscribed to orderUpdates", "user", user) + + // Send subscription acknowledgment + wsm.sendSubscriptionResponse(state.conn, sub) + + case "l2Book": + coin, ok := sub["coin"].(string) + if !ok { + wsm.sendError(state.conn, "Missing coin for l2Book") + return + } + state.mu.Lock() + state.l2BookCoins[coin] = true + state.mu.Unlock() + + wsm.logger.Info("subscribed to l2Book", "coin", coin) + + // Send subscription acknowledgment + wsm.sendSubscriptionResponse(state.conn, sub) + + // Send initial BBO snapshot + wsm.sendInitialBBO(state.conn, coin) + + case "bbo": + coin, ok := sub["coin"].(string) + if !ok { + wsm.sendError(state.conn, "Missing coin for bbo") + return + } + state.mu.Lock() + state.l2BookCoins[coin] = true + state.mu.Unlock() + + wsm.logger.Info("subscribed to bbo", "coin", coin) + + // Send subscription acknowledgment + wsm.sendSubscriptionResponse(state.conn, sub) + + // Send initial BBO snapshot + wsm.sendInitialBBO(state.conn, coin) + + default: + wsm.sendError(state.conn, "Unsupported subscription type: "+subType) + } +} + +// handleUnsubscribe processes an unsubscription request +func (wsm *WebSocketManager) handleUnsubscribe(state *ConnectionState, sub map[string]interface{}) { + subType, ok := sub["type"].(string) + if !ok { + return + } + + switch subType { + case "orderUpdates": + state.mu.Lock() + state.orderUpdatesUser = "" + state.mu.Unlock() + + case "l2Book", "bbo": + coin, ok := sub["coin"].(string) + if !ok { + return + } + state.mu.Lock() + delete(state.l2BookCoins, coin) + state.mu.Unlock() + } +} + +// sendSubscriptionResponse sends a subscription acknowledgment +func (wsm *WebSocketManager) sendSubscriptionResponse(conn *websocket.Conn, subscription map[string]interface{}) { + response := SubscriptionResponse{ + Channel: "subscriptionResponse", + Data: map[string]interface{}{ + "method": "subscribe", + "subscription": subscription, + }, + } + + wsm.sendJSON(conn, response) +} + +// sendError sends an error message to the client +func (wsm *WebSocketManager) sendError(conn *websocket.Conn, errorMsg string) { + msg := map[string]string{ + "channel": "error", + "error": errorMsg, + } + wsm.sendJSON(conn, msg) +} + +// sendJSON sends a JSON message to a connection +func (wsm *WebSocketManager) sendJSON(conn *websocket.Conn, v interface{}) { + // Find the connection state to use its write mutex + wsm.mu.RLock() + state, ok := wsm.connections[conn] + wsm.mu.RUnlock() + + if !ok { + return + } + + state.writeMu.Lock() + defer state.writeMu.Unlock() + + if err := conn.WriteJSON(v); err != nil { + wsm.logger.Error("failed to send JSON", "error", err) + } +} + +// BroadcastOrderUpdate queues an order update for broadcasting +func (wsm *WebSocketManager) BroadcastOrderUpdate(user string, order *OrderDetail) { + if order == nil { + return + } + + wsOrder := WsOrder{ + Order: WsBasicOrder{ + Coin: order.Order.Coin, + Side: order.Order.Side, + LimitPx: order.Order.LimitPx, + Sz: order.Order.Sz, + Oid: order.Order.Oid, + Timestamp: order.Order.Timestamp, + OrigSz: order.Order.OrigSz, + Cloid: order.Order.Cloid, + }, + Status: order.Status, + StatusTimestamp: order.StatusTimestamp, + } + + update := OrderUpdate{ + User: user, + Orders: []WsOrder{wsOrder}, + } + + select { + case wsm.orderUpdatesCh <- update: + default: + wsm.logger.Warn("order updates channel full, dropping update") + } +} + +// broadcastOrderUpdates broadcasts order updates to subscribed clients +func (wsm *WebSocketManager) broadcastOrderUpdates() { + for update := range wsm.orderUpdatesCh { + wsm.mu.RLock() + for conn, state := range wsm.connections { + state.mu.RLock() + if state.orderUpdatesUser == update.User { + state.mu.RUnlock() + msg := map[string]interface{}{ + "channel": "orderUpdates", + "data": update.Orders, + } + wsm.sendJSON(conn, msg) + } else { + state.mu.RUnlock() + } + } + wsm.mu.RUnlock() + } +} + +// BroadcastL2Book queues an L2 book update for broadcasting +func (wsm *WebSocketManager) BroadcastL2Book(coin string, levels [2][]WsLevel) { + update := L2BookUpdate{ + Coin: coin, + Levels: levels, + Time: time.Now().UnixMilli(), + } + + select { + case wsm.l2BookCh <- update: + default: + wsm.logger.Warn("l2book channel full, dropping update") + } +} + +// broadcastL2Book broadcasts L2 book updates to subscribed clients +func (wsm *WebSocketManager) broadcastL2Book() { + for update := range wsm.l2BookCh { + wsm.mu.RLock() + for conn, state := range wsm.connections { + state.mu.RLock() + if state.l2BookCoins[update.Coin] { + state.mu.RUnlock() + msg := map[string]interface{}{ + "channel": "l2Book", + "data": map[string]interface{}{ + "coin": update.Coin, + "time": update.Time, + "levels": update.Levels, + }, + } + wsm.sendJSON(conn, msg) + } else { + state.mu.RUnlock() + } + } + wsm.mu.RUnlock() + } +} + +// sendInitialBBO sends an initial BBO snapshot to a newly subscribed client +func (wsm *WebSocketManager) sendInitialBBO(conn *websocket.Conn, coin string) { + // Get mock BBO prices based on coin + bid, ask := wsm.getMockBBO(coin) + + msg := map[string]interface{}{ + "channel": "l2Book", + "data": map[string]interface{}{ + "coin": coin, + "time": time.Now().UnixMilli(), + "levels": [2][]WsLevel{ + {{Px: bid.Px, Sz: bid.Sz, N: 1}}, + {{Px: ask.Px, Sz: ask.Sz, N: 1}}, + }, + }, + } + + wsm.sendJSON(conn, msg) +} + +// getMockBBO returns mock bid/ask prices for a coin +func (wsm *WebSocketManager) getMockBBO(coin string) (bid WsLevel, ask WsLevel) { + switch coin { + case "BTC": + // Use 87000 as the reference price (from IOC tests) + return WsLevel{Px: "86956.5", Sz: "10.5", N: 1}, + WsLevel{Px: "87043.5", Sz: "8.3", N: 1} + case "ETH": + return WsLevel{Px: "2999.5", Sz: "50.0", N: 1}, + WsLevel{Px: "3000.5", Sz: "45.0", N: 1} + case "SOL": + return WsLevel{Px: "99.9", Sz: "100.0", N: 1}, + WsLevel{Px: "100.1", Sz: "95.0", N: 1} + default: + // Default: use a spread of 0.1% + return WsLevel{Px: "999.5", Sz: "10.0", N: 1}, + WsLevel{Px: "1000.5", Sz: "10.0", N: 1} + } +} + +// SetBBO allows tests to manually set BBO prices +func (wsm *WebSocketManager) SetBBO(coin string, bidPx, bidSz, askPx, askSz string) { + levels := [2][]WsLevel{ + {{Px: bidPx, Sz: bidSz, N: 1}}, + {{Px: askPx, Sz: askSz, N: 1}}, + } + wsm.BroadcastL2Book(coin, levels) +} + +// TriggerBBOUpdate forces an immediate BBO update for a coin +func (wsm *WebSocketManager) TriggerBBOUpdate(coin string) { + bid, ask := wsm.getMockBBO(coin) + levels := [2][]WsLevel{ + {bid}, + {ask}, + } + wsm.BroadcastL2Book(coin, levels) +} diff --git a/server/websocket_example_test.go b/server/websocket_example_test.go new file mode 100644 index 0000000..277825f --- /dev/null +++ b/server/websocket_example_test.go @@ -0,0 +1,178 @@ +package server_test + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/recomma/hyperliquid-mock/server" +) + +// Example demonstrating WebSocket order updates subscription +func ExampleWebSocketOrderUpdates(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + wsURL := ts.WebSocketURL() + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Subscribe to order updates + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "orderUpdates", + "user": "0x1234567890abcdef", + }, + } + + if err := conn.WriteJSON(subscription); err != nil { + t.Fatalf("Failed to send subscription: %v", err) + } + + // Read subscription acknowledgment + var ack map[string]interface{} + if err := conn.ReadJSON(&ack); err != nil { + t.Fatalf("Failed to read ack: %v", err) + } + + fmt.Printf("Subscription acknowledged: %s\n", ack["channel"]) + + // Create an order (this will trigger a WebSocket update) + // Note: In a real test, you'd use the HTTP API to create the order + // For this example, we're just demonstrating the WebSocket flow + + fmt.Println("WebSocket orderUpdates subscription working!") +} + +// Example demonstrating WebSocket BBO subscription +func ExampleWebSocketBBO(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + wsURL := ts.WebSocketURL() + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Subscribe to BBO for BTC + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "l2Book", + "coin": "BTC", + }, + } + + if err := conn.WriteJSON(subscription); err != nil { + t.Fatalf("Failed to send subscription: %v", err) + } + + // Read subscription acknowledgment + var ack map[string]interface{} + if err := conn.ReadJSON(&ack); err != nil { + t.Fatalf("Failed to read ack: %v", err) + } + + fmt.Printf("Subscription acknowledged: %s\n", ack["channel"]) + + // Read initial BBO snapshot + var bboMsg map[string]interface{} + if err := conn.ReadJSON(&bboMsg); err != nil { + t.Fatalf("Failed to read BBO: %v", err) + } + + if bboMsg["channel"] == "l2Book" { + data := bboMsg["data"].(map[string]interface{}) + fmt.Printf("Received BBO for %s\n", data["coin"]) + + levels := data["levels"].([]interface{}) + bids := levels[0].([]interface{}) + asks := levels[1].([]interface{}) + + if len(bids) > 0 { + bid := bids[0].(map[string]interface{}) + fmt.Printf("Best bid: %s @ %s\n", bid["sz"], bid["px"]) + } + if len(asks) > 0 { + ask := asks[0].(map[string]interface{}) + fmt.Printf("Best ask: %s @ %s\n", ask["sz"], ask["px"]) + } + } + + fmt.Println("WebSocket BBO subscription working!") +} + +// Example demonstrating manual BBO updates +func ExampleManualBBOUpdate(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + wsURL := ts.WebSocketURL() + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Subscribe to BBO for BTC + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "l2Book", + "coin": "BTC", + }, + } + + if err := conn.WriteJSON(subscription); err != nil { + t.Fatalf("Failed to send subscription: %v", err) + } + + // Read subscription acknowledgment + conn.ReadJSON(&map[string]interface{}{}) + + // Read initial BBO snapshot + conn.ReadJSON(&map[string]interface{}{}) + + // Manually set BBO prices + ts.SetBBO("BTC", 87000.0, 5.0, 87100.0, 4.5) + + // Give it a moment to broadcast + time.Sleep(50 * time.Millisecond) + + // Read the updated BBO + var bboUpdate map[string]interface{} + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + if err := conn.ReadJSON(&bboUpdate); err != nil { + t.Fatalf("Failed to read BBO update: %v", err) + } + + if bboUpdate["channel"] == "l2Book" { + data := bboUpdate["data"].(map[string]interface{}) + levels := data["levels"].([]interface{}) + bids := levels[0].([]interface{}) + + if len(bids) > 0 { + bid := bids[0].(map[string]interface{}) + fmt.Printf("Updated bid price: %s\n", bid["px"]) + } + } + + fmt.Println("Manual BBO update working!") +} + +// Helper function to pretty print JSON +func prettyPrint(v interface{}) { + b, _ := json.MarshalIndent(v, "", " ") + fmt.Println(string(b)) +} From 9d79dd022879c198d06f2a7e4d5ae59efd268ff3 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 17:31:30 +0000 Subject: [PATCH 2/6] fix: convert Example functions to Test functions in websocket tests Go example functions must be niladic (no parameters), but these needed *testing.T for proper test infrastructure. Renamed from Example* to Test* to fix build errors: - ExampleWebSocketOrderUpdates -> TestWebSocketOrderUpdates - ExampleWebSocketBBO -> TestWebSocketBBO - ExampleManualBBOUpdate -> TestManualBBOUpdate Fixes build error: 'Example functions should be niladic' --- server/websocket_example_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/websocket_example_test.go b/server/websocket_example_test.go index 277825f..6045467 100644 --- a/server/websocket_example_test.go +++ b/server/websocket_example_test.go @@ -10,8 +10,8 @@ import ( "github.com/recomma/hyperliquid-mock/server" ) -// Example demonstrating WebSocket order updates subscription -func ExampleWebSocketOrderUpdates(t *testing.T) { +// TestWebSocketOrderUpdates demonstrates WebSocket order updates subscription +func TestWebSocketOrderUpdates(t *testing.T) { ts := server.NewTestServer(t) defer ts.Close() @@ -51,8 +51,8 @@ func ExampleWebSocketOrderUpdates(t *testing.T) { fmt.Println("WebSocket orderUpdates subscription working!") } -// Example demonstrating WebSocket BBO subscription -func ExampleWebSocketBBO(t *testing.T) { +// TestWebSocketBBO demonstrates WebSocket BBO subscription +func TestWebSocketBBO(t *testing.T) { ts := server.NewTestServer(t) defer ts.Close() @@ -112,8 +112,8 @@ func ExampleWebSocketBBO(t *testing.T) { fmt.Println("WebSocket BBO subscription working!") } -// Example demonstrating manual BBO updates -func ExampleManualBBOUpdate(t *testing.T) { +// TestManualBBOUpdate demonstrates manual BBO updates +func TestManualBBOUpdate(t *testing.T) { ts := server.NewTestServer(t) defer ts.Close() From b84333da41ef1a7c4f03e161488831a7653dba30 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 17:34:40 +0000 Subject: [PATCH 3/6] fix: skip request body capture for WebSocket upgrade requests The request capture middleware was interfering with WebSocket handshakes by trying to read the request body. WebSocket upgrades use HTTP headers for the handshake, not the request body, so we now skip body capture when the 'Upgrade: websocket' header is present. Fixes: websocket: bad handshake error in tests --- server/testserver.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/testserver.go b/server/testserver.go index 79a90e0..e6ae1b2 100644 --- a/server/testserver.go +++ b/server/testserver.go @@ -71,6 +71,12 @@ func NewRequestCapture(logger *slog.Logger) *RequestCapture { // Wrap wraps an http.Handler to capture requests before passing them through func (rc *RequestCapture) Wrap(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Skip body capture for WebSocket upgrade requests + if r.Header.Get("Upgrade") == "websocket" { + next.ServeHTTP(w, r) + return + } + // Read body body, err := io.ReadAll(r.Body) if err != nil { From 2306452fd98201e8bd2e4b012ec6b7cd97e98aaf Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 17:42:12 +0000 Subject: [PATCH 4/6] fix: prevent panic when logging WebSocket endpoint The Run function receives addresses like ':8080' without a protocol prefix, but the WebSocket URL logging was assuming 'http://' and slicing addr[7:], causing an index out of bounds panic. Now properly handles addresses starting with ':' by prepending 'localhost' when formatting the WebSocket URL for logging. Fixes panic on server startup when address is ':8080' --- server/server.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index f89610d..90986c5 100644 --- a/server/server.go +++ b/server/server.go @@ -21,11 +21,18 @@ func Run(addr string) error { loggedMux := loggingMiddleware(logger, mux) logger.Info("Mock Hyperliquid API server listening", "addr", addr) + + // Format WebSocket URL properly (addr is typically ":8080" without protocol) + wsAddr := addr + if len(addr) > 0 && addr[0] == ':' { + wsAddr = "localhost" + addr + } + logger.Info("Endpoints", "exchange", addr+"/exchange", "info", addr+"/info", "health", addr+"/health", - "websocket", "ws://"+addr[7:]+"/ws") + "websocket", "ws://"+wsAddr+"/ws") return http.ListenAndServe(addr, loggedMux) } From 0546006b1cc2a11d912cf040a124ec21ee9ba3e7 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 17:44:18 +0000 Subject: [PATCH 5/6] fix: order update broadcasts now reach all subscribers Critical bug fix: Order updates were being broadcast with an empty user string but subscribers require a specific user address. The user filtering check 'if state.orderUpdatesUser == update.User' always failed, causing all orderUpdates messages to be silently dropped. Changes: - Removed user parameter from BroadcastOrderUpdate() - Changed broadcast logic to send to all orderUpdates subscribers - Simplified OrderUpdate struct (removed User field) - Updated all call sites in state.go and testserver.go For a mock server, broadcasting to all subscribers is appropriate since we don't track actual user ownership of orders. Subscribers still need to provide a user address when subscribing (per Hyperliquid protocol), but the mock broadcasts all order changes to everyone subscribed to orderUpdates. This makes the WebSocket orderUpdates feature actually work. --- server/state.go | 12 +++++------- server/testserver.go | 2 +- server/websocket.go | 10 ++++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/state.go b/server/state.go index 306eb2c..6949f84 100644 --- a/server/state.go +++ b/server/state.go @@ -54,9 +54,7 @@ func (s *State) CreateOrder(cloid string, coin string, side string, limitPx stri // Broadcast order update via WebSocket if s.wsm != nil { - // Note: In a real system, we'd track which user owns which order - // For the mock, we use a placeholder user address - s.wsm.BroadcastOrderUpdate("", order) + s.wsm.BroadcastOrderUpdate(order) } return oid @@ -78,7 +76,7 @@ func (s *State) ModifyOrder(cloid string, limitPx string, sz string) (int64, boo // Broadcast order update via WebSocket if s.wsm != nil { - s.wsm.BroadcastOrderUpdate("", order) + s.wsm.BroadcastOrderUpdate(order) } return order.Order.Oid, true @@ -98,7 +96,7 @@ func (s *State) ModifyOrderByOid(oid int64, limitPx string, sz string) (int64, b // Broadcast order update via WebSocket if s.wsm != nil { - s.wsm.BroadcastOrderUpdate("", order) + s.wsm.BroadcastOrderUpdate(order) } return order.Order.Oid, true @@ -123,7 +121,7 @@ func (s *State) CancelOrder(cloid string) bool { // Broadcast order update via WebSocket if s.wsm != nil { - s.wsm.BroadcastOrderUpdate("", order) + s.wsm.BroadcastOrderUpdate(order) } return true @@ -142,7 +140,7 @@ func (s *State) CancelOrderByOid(oid int64) bool { // Broadcast order update via WebSocket if s.wsm != nil { - s.wsm.BroadcastOrderUpdate("", order) + s.wsm.BroadcastOrderUpdate(order) } return true diff --git a/server/testserver.go b/server/testserver.go index e6ae1b2..cc40094 100644 --- a/server/testserver.go +++ b/server/testserver.go @@ -380,7 +380,7 @@ func (ts *TestServer) FillOrder(cloid string, fillPrice float64, opts ...FillOpt // Broadcast fill update via WebSocket if state.wsm != nil { - state.wsm.BroadcastOrderUpdate("", order) + state.wsm.BroadcastOrderUpdate(order) } return nil diff --git a/server/websocket.go b/server/websocket.go index 90be851..2319b1d 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -42,8 +42,8 @@ type SubscriptionResponse struct { } // OrderUpdate represents an order state change to broadcast +// For the mock server, we broadcast to all subscribers type OrderUpdate struct { - User string Orders []WsOrder } @@ -309,7 +309,8 @@ func (wsm *WebSocketManager) sendJSON(conn *websocket.Conn, v interface{}) { } // BroadcastOrderUpdate queues an order update for broadcasting -func (wsm *WebSocketManager) BroadcastOrderUpdate(user string, order *OrderDetail) { +// In the mock server, we broadcast to all orderUpdates subscribers +func (wsm *WebSocketManager) BroadcastOrderUpdate(order *OrderDetail) { if order == nil { return } @@ -330,7 +331,6 @@ func (wsm *WebSocketManager) BroadcastOrderUpdate(user string, order *OrderDetai } update := OrderUpdate{ - User: user, Orders: []WsOrder{wsOrder}, } @@ -342,12 +342,14 @@ func (wsm *WebSocketManager) BroadcastOrderUpdate(user string, order *OrderDetai } // broadcastOrderUpdates broadcasts order updates to subscribed clients +// In the mock server, we broadcast to all orderUpdates subscribers func (wsm *WebSocketManager) broadcastOrderUpdates() { for update := range wsm.orderUpdatesCh { wsm.mu.RLock() for conn, state := range wsm.connections { state.mu.RLock() - if state.orderUpdatesUser == update.User { + // In the mock, broadcast to anyone subscribed to orderUpdates + if state.orderUpdatesUser != "" { state.mu.RUnlock() msg := map[string]interface{}{ "channel": "orderUpdates", From 0b483cb0357b8c0b75cc905d856eee1340e7903b Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 7 Nov 2025 17:51:54 +0000 Subject: [PATCH 6/6] fix: send BBO subscriptions in correct channel format BBO and l2Book subscriptions now send different message formats: l2Book subscriptions receive: - channel: "l2Book" - data.levels: [[bids...], [asks...]] bbo subscriptions receive: - channel: "bbo" - data.bbo: [bid, ask] Changes: - Track l2Book and bbo subscriptions separately in ConnectionState - Split sendInitialBBO into sendInitialL2Book and sendInitialBBO - Updated broadcastL2Book to send different formats based on subscription - Consumers subscribing to bbo now receive the correct WsBbo format This matches the upstream API docs (upstream-api-docs/subscriptions.md) --- server/websocket.go | 60 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/server/websocket.go b/server/websocket.go index 2319b1d..8735f37 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -24,9 +24,10 @@ type WebSocketManager struct { type ConnectionState struct { conn *websocket.Conn mu sync.RWMutex - orderUpdatesUser string // empty if not subscribed - l2BookCoins map[string]bool - writeMu sync.Mutex // Protects concurrent writes to conn + orderUpdatesUser string // empty if not subscribed + l2BookCoins map[string]bool // coins subscribed to l2Book + bboCoins map[string]bool // coins subscribed to bbo + writeMu sync.Mutex // Protects concurrent writes to conn } // SubscriptionMessage represents a subscription request from the client @@ -126,6 +127,7 @@ func (wsm *WebSocketManager) HandleConnection(w http.ResponseWriter, r *http.Req state := &ConnectionState{ conn: conn, l2BookCoins: make(map[string]bool), + bboCoins: make(map[string]bool), } wsm.mu.Lock() @@ -217,8 +219,8 @@ func (wsm *WebSocketManager) handleSubscribe(state *ConnectionState, sub map[str // Send subscription acknowledgment wsm.sendSubscriptionResponse(state.conn, sub) - // Send initial BBO snapshot - wsm.sendInitialBBO(state.conn, coin) + // Send initial l2Book snapshot + wsm.sendInitialL2Book(state.conn, coin) case "bbo": coin, ok := sub["coin"].(string) @@ -227,7 +229,7 @@ func (wsm *WebSocketManager) handleSubscribe(state *ConnectionState, sub map[str return } state.mu.Lock() - state.l2BookCoins[coin] = true + state.bboCoins[coin] = true state.mu.Unlock() wsm.logger.Info("subscribed to bbo", "coin", coin) @@ -263,6 +265,7 @@ func (wsm *WebSocketManager) handleUnsubscribe(state *ConnectionState, sub map[s } state.mu.Lock() delete(state.l2BookCoins, coin) + delete(state.bboCoins, coin) state.mu.Unlock() } } @@ -385,6 +388,8 @@ func (wsm *WebSocketManager) broadcastL2Book() { wsm.mu.RLock() for conn, state := range wsm.connections { state.mu.RLock() + + // Send as l2Book format if subscribed to l2Book if state.l2BookCoins[update.Coin] { state.mu.RUnlock() msg := map[string]interface{}{ @@ -396,6 +401,28 @@ func (wsm *WebSocketManager) broadcastL2Book() { }, } wsm.sendJSON(conn, msg) + } else if state.bboCoins[update.Coin] { + // Send as bbo format if subscribed to bbo + state.mu.RUnlock() + + // Extract best bid and ask from levels + var bid, ask *WsLevel + if len(update.Levels[0]) > 0 { + bid = &update.Levels[0][0] + } + if len(update.Levels[1]) > 0 { + ask = &update.Levels[1][0] + } + + msg := map[string]interface{}{ + "channel": "bbo", + "data": map[string]interface{}{ + "coin": update.Coin, + "time": update.Time, + "bbo": []*WsLevel{bid, ask}, + }, + } + wsm.sendJSON(conn, msg) } else { state.mu.RUnlock() } @@ -404,8 +431,8 @@ func (wsm *WebSocketManager) broadcastL2Book() { } } -// sendInitialBBO sends an initial BBO snapshot to a newly subscribed client -func (wsm *WebSocketManager) sendInitialBBO(conn *websocket.Conn, coin string) { +// sendInitialL2Book sends an initial l2Book snapshot to a newly subscribed client +func (wsm *WebSocketManager) sendInitialL2Book(conn *websocket.Conn, coin string) { // Get mock BBO prices based on coin bid, ask := wsm.getMockBBO(coin) @@ -424,6 +451,23 @@ func (wsm *WebSocketManager) sendInitialBBO(conn *websocket.Conn, coin string) { wsm.sendJSON(conn, msg) } +// sendInitialBBO sends an initial BBO snapshot to a newly subscribed client +func (wsm *WebSocketManager) sendInitialBBO(conn *websocket.Conn, coin string) { + // Get mock BBO prices based on coin + bid, ask := wsm.getMockBBO(coin) + + msg := map[string]interface{}{ + "channel": "bbo", + "data": map[string]interface{}{ + "coin": coin, + "time": time.Now().UnixMilli(), + "bbo": []WsLevel{bid, ask}, + }, + } + + wsm.sendJSON(conn, msg) +} + // getMockBBO returns mock bid/ask prices for a coin func (wsm *WebSocketManager) getMockBBO(coin string) (bid WsLevel, ask WsLevel) { switch coin {