diff --git a/README.md b/README.md index 49646ce..91c1c01 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ A minimal Go implementation of the Hyperliquid API for E2E testing without requi - **No authentication validation** - Accepts all requests without signature verification - **In-memory order state** - Tracks orders for realistic responses +- **WebSocket support** - Real-time order updates and market data (BBO) - **Unlimited requests** - No rate limiting - **Simple responses** - Returns success for all valid operations @@ -84,6 +85,31 @@ All other action types return a generic success response but do not implement re ## Endpoints +### WebSocket /ws + +Real-time data streaming for order updates and market data. See [WEBSOCKET.md](WEBSOCKET.md) for detailed documentation. + +**Supported subscriptions:** +- `orderUpdates` - Real-time order status updates +- `l2Book` / `bbo` - Best bid/offer market data + +**Example connection:** +```javascript +const ws = new WebSocket('ws://localhost:8080/ws'); + +ws.onopen = () => { + ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'orderUpdates', user: '0x1234...' } + })); +}; + +ws.onmessage = (event) => { + const update = JSON.parse(event.data); + console.log('Order update:', update); +}; +``` + ### POST /exchange Handles trading actions: @@ -423,7 +449,7 @@ curl -X POST http://localhost:8080/info \ - **No signature validation** - All requests are accepted regardless of signature - **No real order matching** - Orders are just stored in memory -- **No WebSocket support** - Only HTTP REST endpoints +- **Limited WebSocket subscriptions** - Only orderUpdates and l2Book/bbo (no trades, candles, etc.) - **Simplified responses** - Some fields may be omitted or simplified - **No persistence** - All state is lost when the server restarts - **No rate limiting** - Unlimited requests accepted @@ -438,15 +464,20 @@ hyperliquid-mock/ ├── go.mod # Go module file ├── go.sum # Go dependencies ├── README.md # This file +├── WEBSOCKET.md # WebSocket API documentation ├── upstream-api-docs/ # Real Hyperliquid API reference (for expansion) │ ├── exchange-endpoint.md │ ├── info-endpoint.md │ ├── perpetuals.md -│ └── spot.md +│ ├── spot.md +│ ├── websocket.md +│ └── subscriptions.md └── server/ ├── server.go # HTTP server setup ├── handlers.go # Request handlers ├── handlers_test.go # Handler unit tests + ├── websocket.go # WebSocket connection & subscription management + ├── websocket_example_test.go # WebSocket usage examples ├── types.go # Request/response types ├── state.go # In-memory state management ├── testserver.go # Test helpers & request capture diff --git a/WEBSOCKET.md b/WEBSOCKET.md new file mode 100644 index 0000000..0294689 --- /dev/null +++ b/WEBSOCKET.md @@ -0,0 +1,340 @@ +# WebSocket Implementation + +This document describes the WebSocket functionality added to the Hyperliquid Mock Server. + +## Overview + +The mock server now supports WebSocket connections for real-time data streaming, matching the Hyperliquid API protocol. Two primary subscription types are implemented: + +1. **Order Updates** (`orderUpdates`) - Real-time order status updates +2. **Level 2 Book / BBO** (`l2Book` / `bbo`) - Real-time market data + +## WebSocket Endpoint + +- **URL**: `ws://localhost:/ws` +- **Protocol**: WebSocket (RFC 6455) +- **Connection**: Persistent, bidirectional + +## Subscription Types + +### 1. Order Updates Subscription + +Receive real-time updates when orders are created, modified, filled, or canceled. + +#### Subscription Request +```json +{ + "method": "subscribe", + "subscription": { + "type": "orderUpdates", + "user": "0xWALLET_ADDRESS" + } +} +``` + +#### Update Message Format +```json +{ + "channel": "orderUpdates", + "data": [ + { + "order": { + "coin": "BTC", + "side": "B", + "limitPx": "87000", + "sz": "1.0", + "oid": 1000001, + "timestamp": 1699999999999, + "origSz": "1.0", + "cloid": "000000010000000200000003" + }, + "status": "open", + "statusTimestamp": 1699999999999 + } + ] +} +``` + +#### Order Status Values +- `open` - Order is active on the order book +- `filled` - Order is completely filled +- `canceled` - Order has been canceled + +#### Trigger Events +Updates are automatically sent when: +- Order created → `status: "open"` +- Order modified → `status: "open"` (with updated price/size) +- Order filled (full or partial) → `status: "filled"` or `status: "open"` with updated `sz` +- Order canceled → `status: "canceled"` + +### 2. BBO (Best Bid Offer) Subscription + +Receive real-time market data for price discovery. + +#### Subscription Request (l2Book) +```json +{ + "method": "subscribe", + "subscription": { + "type": "l2Book", + "coin": "BTC" + } +} +``` + +#### Subscription Request (bbo alternative) +```json +{ + "method": "subscribe", + "subscription": { + "type": "bbo", + "coin": "BTC" + } +} +``` + +#### Update Message Format +```json +{ + "channel": "l2Book", + "data": { + "coin": "BTC", + "time": 1699999999999, + "levels": [ + [ + {"px": "86956.5", "sz": "10.5", "n": 1} + ], + [ + {"px": "87043.5", "sz": "8.3", "n": 1} + ] + ] + } +} +``` + +#### Level Fields +- `px` - Price as string +- `sz` - Size/quantity as string +- `n` - Number of orders at this price level + +#### Default BBO Prices + +The mock server provides realistic default prices: + +| Coin | Bid Price | Ask Price | Spread | +|------|-----------|-----------|--------| +| BTC | 86956.5 | 87043.5 | ~0.1% | +| ETH | 2999.5 | 3000.5 | ~0.03% | +| SOL | 99.9 | 100.1 | ~0.2% | + +Note: BTC prices are centered around 87000 USDT to match the existing IOC test behavior. + +## Connection Management + +### Establishing Connection + +```javascript +const ws = new WebSocket('ws://localhost:8080/ws'); + +ws.onopen = () => { + console.log('Connected to Hyperliquid Mock Server'); + + // Subscribe to order updates + ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { + type: 'orderUpdates', + user: '0x1234567890abcdef' + } + })); +}; + +ws.onmessage = (event) => { + const message = JSON.parse(event.data); + console.log('Received:', message); +}; +``` + +### Subscription Acknowledgment + +After sending a subscription request, the server responds with: + +```json +{ + "channel": "subscriptionResponse", + "data": { + "method": "subscribe", + "subscription": { + "type": "orderUpdates", + "user": "0x1234567890abcdef" + } + } +} +``` + +### Unsubscribing + +```json +{ + "method": "unsubscribe", + "subscription": { + "type": "orderUpdates", + "user": "0xWALLET_ADDRESS" + } +} +``` + +### Multiple Subscriptions + +A single WebSocket connection can maintain multiple subscriptions: + +```javascript +// Subscribe to order updates +ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'orderUpdates', user: '0x123...' } +})); + +// Subscribe to BTC BBO +ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'l2Book', coin: 'BTC' } +})); + +// Subscribe to ETH BBO +ws.send(JSON.stringify({ + method: 'subscribe', + subscription: { type: 'l2Book', coin: 'ETH' } +})); +``` + +## Test Server API + +The `TestServer` includes helper methods for WebSocket testing: + +### Get WebSocket URL + +```go +ts := server.NewTestServer(t) +wsURL := ts.WebSocketURL() // Returns "ws://127.0.0.1:12345/ws" +``` + +### Manual BBO Updates + +```go +// Set specific BBO prices +ts.SetBBO("BTC", 87000.0, 5.0, 87100.0, 4.5) +// coin bidPx bidSz askPx askSz + +// Trigger a BBO update with default prices +ts.TriggerBBOUpdate("BTC") +``` + +### Example Test + +```go +func TestWebSocketOrderUpdates(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + conn, _, err := websocket.DefaultDialer.Dial(ts.WebSocketURL(), nil) + require.NoError(t, err) + defer conn.Close() + + // Subscribe + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "orderUpdates", + "user": "0x1234", + }, + } + conn.WriteJSON(subscription) + + // Read subscription ack + var ack map[string]interface{} + conn.ReadJSON(&ack) + + // Place an order via HTTP API (triggers WebSocket update) + // ... HTTP request ... + + // Receive order update + var update map[string]interface{} + conn.ReadJSON(&update) + + assert.Equal(t, "orderUpdates", update["channel"]) +} +``` + +## Error Handling + +### Error Message Format + +```json +{ + "channel": "error", + "error": "Invalid subscription type" +} +``` + +### Common Errors + +- `"Invalid subscription message"` - Malformed JSON +- `"Unknown method: "` - Invalid method field +- `"Missing subscription type"` - No type in subscription object +- `"Missing user address for orderUpdates"` - orderUpdates requires user field +- `"Missing coin for l2Book"` - l2Book/bbo requires coin field +- `"Unsupported subscription type: "` - Unknown subscription type + +## Implementation Details + +### Architecture + +The WebSocket implementation consists of: + +1. **WebSocketManager** (`websocket.go`) - Manages connections and subscriptions +2. **State Integration** - Broadcasts updates when order state changes +3. **Handler Integration** - Exposes `/ws` endpoint + +### Broadcasting Behavior + +- **Order Updates**: Sent immediately when order state changes (create, modify, fill, cancel) +- **BBO Updates**: + - Initial snapshot sent immediately after subscription + - Updates can be triggered manually via `SetBBO()` or `TriggerBBOUpdate()` + - In production, would update periodically or on price changes + +### Thread Safety + +- All WebSocket operations are thread-safe +- Concurrent subscriptions and broadcasts are supported +- Each connection has its own write mutex to prevent concurrent writes + +### Performance Considerations + +- Order update channel: 100 message buffer +- L2 book update channel: 100 message buffer +- Dropped messages logged as warnings when buffers full + +## Compliance with Upstream API + +This implementation follows the official Hyperliquid WebSocket API specifications from: +- `/upstream-api-docs/websocket.md` +- `/upstream-api-docs/subscriptions.md` + +Key differences from production API: +- User tracking simplified (uses placeholder in mock) +- BBO updates are manual/on-demand (vs. real-time market data) +- Limited to orderUpdates and l2Book/bbo subscriptions +- No authentication required + +## Future Enhancements + +Potential additions (not currently in scope): +- Additional subscription types (trades, candles, userFills, etc.) +- Automatic periodic BBO updates +- User-based order filtering +- WebSocket authentication +- Connection heartbeat/ping-pong +- Reconnection handling diff --git a/server/handlers.go b/server/handlers.go index 32289da..f4b4c6b 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -14,6 +14,7 @@ import ( type Handler struct { state *State logger *slog.Logger + wsm *WebSocketManager } // NewHandler creates a new request handler @@ -30,9 +31,14 @@ func NewHandler(opts ...Option) *Handler { logger = slog.Default() } + wsm := NewWebSocketManager(logger) + state := NewState() + state.SetWebSocketManager(wsm) + return &Handler{ - state: NewState(), + state: state, logger: logger, + wsm: wsm, } } diff --git a/server/integration_test.go b/server/integration_test.go index 4843dfa..d1d13c8 100644 --- a/server/integration_test.go +++ b/server/integration_test.go @@ -155,7 +155,7 @@ func TestOrderModificationWithGoHyperliquid(t *testing.T) { Order: hyperliquid.CreateOrderRequest{ Coin: "BTC", IsBuy: true, - Size: 0.75, // Changed size + Size: 0.75, // Changed size Price: 51000.0, // Changed price OrderType: hyperliquid.OrderType{ Limit: &hyperliquid.LimitOrderType{Tif: hyperliquid.TifGtc}, diff --git a/server/server.go b/server/server.go index 9af1291..90986c5 100644 --- a/server/server.go +++ b/server/server.go @@ -15,12 +15,24 @@ func Run(addr string) error { mux.HandleFunc("/exchange", handler.HandleExchange) mux.HandleFunc("/info", handler.HandleInfo) mux.HandleFunc("/health", handler.HandleHealth) + mux.HandleFunc("/ws", handler.wsm.HandleConnection) // Log all requests loggedMux := loggingMiddleware(logger, mux) logger.Info("Mock Hyperliquid API server listening", "addr", addr) - logger.Info("Endpoints", "exchange", addr+"/exchange", "info", addr+"/info", "health", addr+"/health") + + // Format WebSocket URL properly (addr is typically ":8080" without protocol) + wsAddr := addr + if len(addr) > 0 && addr[0] == ':' { + wsAddr = "localhost" + addr + } + + logger.Info("Endpoints", + "exchange", addr+"/exchange", + "info", addr+"/info", + "health", addr+"/health", + "websocket", "ws://"+wsAddr+"/ws") return http.ListenAndServe(addr, loggedMux) } diff --git a/server/state.go b/server/state.go index 5fa560a..6949f84 100644 --- a/server/state.go +++ b/server/state.go @@ -8,9 +8,10 @@ import ( // State manages the mock server's in-memory order state type State struct { - mu sync.RWMutex - orders map[string]*OrderDetail // cloid -> OrderDetail - nextOid int64 + mu sync.RWMutex + orders map[string]*OrderDetail // cloid -> OrderDetail + nextOid int64 + wsm *WebSocketManager // For broadcasting order updates } // NewState creates a new state manager @@ -21,6 +22,11 @@ func NewState() *State { } } +// SetWebSocketManager sets the WebSocket manager for broadcasting updates +func (s *State) SetWebSocketManager(wsm *WebSocketManager) { + s.wsm = wsm +} + // CreateOrder adds a new order to the state func (s *State) CreateOrder(cloid string, coin string, side string, limitPx string, sz string) int64 { s.mu.Lock() @@ -45,6 +51,12 @@ func (s *State) CreateOrder(cloid string, coin string, side string, limitPx stri } s.orders[cloid] = order + + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate(order) + } + return oid } @@ -62,6 +74,11 @@ func (s *State) ModifyOrder(cloid string, limitPx string, sz string) (int64, boo order.Order.Sz = sz order.StatusTimestamp = time.Now().UnixMilli() + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate(order) + } + return order.Order.Oid, true } @@ -76,6 +93,12 @@ func (s *State) ModifyOrderByOid(oid int64, limitPx string, sz string) (int64, b order.Order.LimitPx = limitPx order.Order.Sz = sz order.StatusTimestamp = time.Now().UnixMilli() + + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate(order) + } + return order.Order.Oid, true } } @@ -96,6 +119,11 @@ func (s *State) CancelOrder(cloid string) bool { order.Status = "canceled" order.StatusTimestamp = time.Now().UnixMilli() + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate(order) + } + return true } @@ -109,6 +137,12 @@ func (s *State) CancelOrderByOid(oid int64) bool { if order.Order.Oid == oid { order.Status = "canceled" order.StatusTimestamp = time.Now().UnixMilli() + + // Broadcast order update via WebSocket + if s.wsm != nil { + s.wsm.BroadcastOrderUpdate(order) + } + return true } } diff --git a/server/testserver.go b/server/testserver.go index 9878a1c..cc40094 100644 --- a/server/testserver.go +++ b/server/testserver.go @@ -71,6 +71,12 @@ func NewRequestCapture(logger *slog.Logger) *RequestCapture { // Wrap wraps an http.Handler to capture requests before passing them through func (rc *RequestCapture) Wrap(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Skip body capture for WebSocket upgrade requests + if r.Header.Get("Upgrade") == "websocket" { + next.ServeHTTP(w, r) + return + } + // Read body body, err := io.ReadAll(r.Body) if err != nil { @@ -208,6 +214,7 @@ func NewTestServer(t *testing.T, opts ...TestServerOption) *TestServer { mux.HandleFunc("/exchange", handler.HandleExchange) mux.HandleFunc("/info", handler.HandleInfo) mux.HandleFunc("/health", handler.HandleHealth) + mux.HandleFunc("/ws", handler.wsm.HandleConnection) // Wrap with capture middleware capturedMux := capture.Wrap(mux) @@ -235,6 +242,16 @@ func (ts *TestServer) URL() string { return ts.httpServer.URL } +// WebSocketURL returns the WebSocket URL of the test server +func (ts *TestServer) WebSocketURL() string { + httpURL := ts.httpServer.URL + // Replace http:// with ws:// + if len(httpURL) > 7 && httpURL[:7] == "http://" { + return "ws://" + httpURL[7:] + "/ws" + } + return httpURL + "/ws" +} + // Close shuts down the test server and blocks until all requests complete func (ts *TestServer) Close() { if ts.httpServer != nil { @@ -361,5 +378,33 @@ func (ts *TestServer) FillOrder(cloid string, fillPrice float64, opts ...FillOpt } order.StatusTimestamp = time.Now().UnixMilli() + // Broadcast fill update via WebSocket + if state.wsm != nil { + state.wsm.BroadcastOrderUpdate(order) + } + return nil } + +// SetBBO allows tests to manually set BBO (Best Bid Offer) prices for a coin +func (ts *TestServer) SetBBO(coin string, bidPx, bidSz, askPx, askSz float64) { + if ts == nil || ts.handler == nil || ts.handler.wsm == nil { + return + } + + bidPxStr := strconv.FormatFloat(bidPx, 'f', -1, 64) + bidSzStr := strconv.FormatFloat(bidSz, 'f', -1, 64) + askPxStr := strconv.FormatFloat(askPx, 'f', -1, 64) + askSzStr := strconv.FormatFloat(askSz, 'f', -1, 64) + + ts.handler.wsm.SetBBO(coin, bidPxStr, bidSzStr, askPxStr, askSzStr) +} + +// TriggerBBOUpdate forces an immediate BBO update for a coin +func (ts *TestServer) TriggerBBOUpdate(coin string) { + if ts == nil || ts.handler == nil || ts.handler.wsm == nil { + return + } + + ts.handler.wsm.TriggerBBOUpdate(coin) +} diff --git a/server/websocket.go b/server/websocket.go new file mode 100644 index 0000000..8735f37 --- /dev/null +++ b/server/websocket.go @@ -0,0 +1,508 @@ +package server + +import ( + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// WebSocketManager handles WebSocket connections and subscriptions +type WebSocketManager struct { + mu sync.RWMutex + connections map[*websocket.Conn]*ConnectionState + logger *slog.Logger + orderUpdatesCh chan OrderUpdate + l2BookCh chan L2BookUpdate + upgrader websocket.Upgrader +} + +// ConnectionState tracks subscriptions for a single WebSocket connection +type ConnectionState struct { + conn *websocket.Conn + mu sync.RWMutex + orderUpdatesUser string // empty if not subscribed + l2BookCoins map[string]bool // coins subscribed to l2Book + bboCoins map[string]bool // coins subscribed to bbo + writeMu sync.Mutex // Protects concurrent writes to conn +} + +// SubscriptionMessage represents a subscription request from the client +type SubscriptionMessage struct { + Method string `json:"method"` + Subscription map[string]interface{} `json:"subscription"` +} + +// SubscriptionResponse is sent to acknowledge a subscription +type SubscriptionResponse struct { + Channel string `json:"channel"` + Data map[string]interface{} `json:"data"` +} + +// OrderUpdate represents an order state change to broadcast +// For the mock server, we broadcast to all subscribers +type OrderUpdate struct { + Orders []WsOrder +} + +// WsOrder matches the Hyperliquid WebSocket order format +type WsOrder struct { + Order WsBasicOrder `json:"order"` + Status string `json:"status"` + StatusTimestamp int64 `json:"statusTimestamp"` +} + +// WsBasicOrder contains basic order information +type WsBasicOrder struct { + Coin string `json:"coin"` + Side string `json:"side"` + LimitPx string `json:"limitPx"` + Sz string `json:"sz"` + Oid int64 `json:"oid"` + Timestamp int64 `json:"timestamp"` + OrigSz string `json:"origSz"` + Cloid *string `json:"cloid,omitempty"` +} + +// L2BookUpdate represents a market data update to broadcast +type L2BookUpdate struct { + Coin string + Levels [2][]WsLevel + Time int64 +} + +// WsLevel represents a price level in the order book +type WsLevel struct { + Px string `json:"px"` + Sz string `json:"sz"` + N int `json:"n"` +} + +// BBOUpdate is an alternative simpler format for BBO updates +type BBOUpdate struct { + Coin string `json:"coin"` + Time int64 `json:"time"` + BBO [2]WsLevel `json:"bbo"` // [bid, ask] +} + +// NewWebSocketManager creates a new WebSocket manager +func NewWebSocketManager(logger *slog.Logger) *WebSocketManager { + if logger == nil { + logger = slog.Default() + } + + wsm := &WebSocketManager{ + connections: make(map[*websocket.Conn]*ConnectionState), + logger: logger, + orderUpdatesCh: make(chan OrderUpdate, 100), + l2BookCh: make(chan L2BookUpdate, 100), + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true // Allow all origins for testing + }, + }, + } + + // Start broadcaster goroutines + go wsm.broadcastOrderUpdates() + go wsm.broadcastL2Book() + + return wsm +} + +// HandleConnection upgrades HTTP to WebSocket and manages the connection +func (wsm *WebSocketManager) HandleConnection(w http.ResponseWriter, r *http.Request) { + conn, err := wsm.upgrader.Upgrade(w, r, nil) + if err != nil { + wsm.logger.Error("failed to upgrade connection", "error", err) + return + } + + // Register connection + state := &ConnectionState{ + conn: conn, + l2BookCoins: make(map[string]bool), + bboCoins: make(map[string]bool), + } + + wsm.mu.Lock() + wsm.connections[conn] = state + wsm.mu.Unlock() + + wsm.logger.Info("websocket connection established", "remote", conn.RemoteAddr()) + + // Clean up on disconnect + defer func() { + wsm.mu.Lock() + delete(wsm.connections, conn) + wsm.mu.Unlock() + conn.Close() + wsm.logger.Info("websocket connection closed", "remote", conn.RemoteAddr()) + }() + + // Read messages from client + for { + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + wsm.logger.Error("websocket read error", "error", err) + } + break + } + + wsm.handleMessage(state, message) + } +} + +// handleMessage processes subscription/unsubscription messages +func (wsm *WebSocketManager) handleMessage(state *ConnectionState, message []byte) { + var subMsg SubscriptionMessage + if err := json.Unmarshal(message, &subMsg); err != nil { + wsm.logger.Error("failed to parse subscription message", "error", err, "message", string(message)) + wsm.sendError(state.conn, "Invalid subscription message") + return + } + + wsm.logger.Debug("received subscription message", "method", subMsg.Method, "subscription", subMsg.Subscription) + + switch subMsg.Method { + case "subscribe": + wsm.handleSubscribe(state, subMsg.Subscription) + case "unsubscribe": + wsm.handleUnsubscribe(state, subMsg.Subscription) + default: + wsm.sendError(state.conn, "Unknown method: "+subMsg.Method) + } +} + +// handleSubscribe processes a subscription request +func (wsm *WebSocketManager) handleSubscribe(state *ConnectionState, sub map[string]interface{}) { + subType, ok := sub["type"].(string) + if !ok { + wsm.sendError(state.conn, "Missing subscription type") + return + } + + switch subType { + case "orderUpdates": + user, ok := sub["user"].(string) + if !ok { + wsm.sendError(state.conn, "Missing user address for orderUpdates") + return + } + state.mu.Lock() + state.orderUpdatesUser = user + state.mu.Unlock() + + wsm.logger.Info("subscribed to orderUpdates", "user", user) + + // Send subscription acknowledgment + wsm.sendSubscriptionResponse(state.conn, sub) + + case "l2Book": + coin, ok := sub["coin"].(string) + if !ok { + wsm.sendError(state.conn, "Missing coin for l2Book") + return + } + state.mu.Lock() + state.l2BookCoins[coin] = true + state.mu.Unlock() + + wsm.logger.Info("subscribed to l2Book", "coin", coin) + + // Send subscription acknowledgment + wsm.sendSubscriptionResponse(state.conn, sub) + + // Send initial l2Book snapshot + wsm.sendInitialL2Book(state.conn, coin) + + case "bbo": + coin, ok := sub["coin"].(string) + if !ok { + wsm.sendError(state.conn, "Missing coin for bbo") + return + } + state.mu.Lock() + state.bboCoins[coin] = true + state.mu.Unlock() + + wsm.logger.Info("subscribed to bbo", "coin", coin) + + // Send subscription acknowledgment + wsm.sendSubscriptionResponse(state.conn, sub) + + // Send initial BBO snapshot + wsm.sendInitialBBO(state.conn, coin) + + default: + wsm.sendError(state.conn, "Unsupported subscription type: "+subType) + } +} + +// handleUnsubscribe processes an unsubscription request +func (wsm *WebSocketManager) handleUnsubscribe(state *ConnectionState, sub map[string]interface{}) { + subType, ok := sub["type"].(string) + if !ok { + return + } + + switch subType { + case "orderUpdates": + state.mu.Lock() + state.orderUpdatesUser = "" + state.mu.Unlock() + + case "l2Book", "bbo": + coin, ok := sub["coin"].(string) + if !ok { + return + } + state.mu.Lock() + delete(state.l2BookCoins, coin) + delete(state.bboCoins, coin) + state.mu.Unlock() + } +} + +// sendSubscriptionResponse sends a subscription acknowledgment +func (wsm *WebSocketManager) sendSubscriptionResponse(conn *websocket.Conn, subscription map[string]interface{}) { + response := SubscriptionResponse{ + Channel: "subscriptionResponse", + Data: map[string]interface{}{ + "method": "subscribe", + "subscription": subscription, + }, + } + + wsm.sendJSON(conn, response) +} + +// sendError sends an error message to the client +func (wsm *WebSocketManager) sendError(conn *websocket.Conn, errorMsg string) { + msg := map[string]string{ + "channel": "error", + "error": errorMsg, + } + wsm.sendJSON(conn, msg) +} + +// sendJSON sends a JSON message to a connection +func (wsm *WebSocketManager) sendJSON(conn *websocket.Conn, v interface{}) { + // Find the connection state to use its write mutex + wsm.mu.RLock() + state, ok := wsm.connections[conn] + wsm.mu.RUnlock() + + if !ok { + return + } + + state.writeMu.Lock() + defer state.writeMu.Unlock() + + if err := conn.WriteJSON(v); err != nil { + wsm.logger.Error("failed to send JSON", "error", err) + } +} + +// BroadcastOrderUpdate queues an order update for broadcasting +// In the mock server, we broadcast to all orderUpdates subscribers +func (wsm *WebSocketManager) BroadcastOrderUpdate(order *OrderDetail) { + if order == nil { + return + } + + wsOrder := WsOrder{ + Order: WsBasicOrder{ + Coin: order.Order.Coin, + Side: order.Order.Side, + LimitPx: order.Order.LimitPx, + Sz: order.Order.Sz, + Oid: order.Order.Oid, + Timestamp: order.Order.Timestamp, + OrigSz: order.Order.OrigSz, + Cloid: order.Order.Cloid, + }, + Status: order.Status, + StatusTimestamp: order.StatusTimestamp, + } + + update := OrderUpdate{ + Orders: []WsOrder{wsOrder}, + } + + select { + case wsm.orderUpdatesCh <- update: + default: + wsm.logger.Warn("order updates channel full, dropping update") + } +} + +// broadcastOrderUpdates broadcasts order updates to subscribed clients +// In the mock server, we broadcast to all orderUpdates subscribers +func (wsm *WebSocketManager) broadcastOrderUpdates() { + for update := range wsm.orderUpdatesCh { + wsm.mu.RLock() + for conn, state := range wsm.connections { + state.mu.RLock() + // In the mock, broadcast to anyone subscribed to orderUpdates + if state.orderUpdatesUser != "" { + state.mu.RUnlock() + msg := map[string]interface{}{ + "channel": "orderUpdates", + "data": update.Orders, + } + wsm.sendJSON(conn, msg) + } else { + state.mu.RUnlock() + } + } + wsm.mu.RUnlock() + } +} + +// BroadcastL2Book queues an L2 book update for broadcasting +func (wsm *WebSocketManager) BroadcastL2Book(coin string, levels [2][]WsLevel) { + update := L2BookUpdate{ + Coin: coin, + Levels: levels, + Time: time.Now().UnixMilli(), + } + + select { + case wsm.l2BookCh <- update: + default: + wsm.logger.Warn("l2book channel full, dropping update") + } +} + +// broadcastL2Book broadcasts L2 book updates to subscribed clients +func (wsm *WebSocketManager) broadcastL2Book() { + for update := range wsm.l2BookCh { + wsm.mu.RLock() + for conn, state := range wsm.connections { + state.mu.RLock() + + // Send as l2Book format if subscribed to l2Book + if state.l2BookCoins[update.Coin] { + state.mu.RUnlock() + msg := map[string]interface{}{ + "channel": "l2Book", + "data": map[string]interface{}{ + "coin": update.Coin, + "time": update.Time, + "levels": update.Levels, + }, + } + wsm.sendJSON(conn, msg) + } else if state.bboCoins[update.Coin] { + // Send as bbo format if subscribed to bbo + state.mu.RUnlock() + + // Extract best bid and ask from levels + var bid, ask *WsLevel + if len(update.Levels[0]) > 0 { + bid = &update.Levels[0][0] + } + if len(update.Levels[1]) > 0 { + ask = &update.Levels[1][0] + } + + msg := map[string]interface{}{ + "channel": "bbo", + "data": map[string]interface{}{ + "coin": update.Coin, + "time": update.Time, + "bbo": []*WsLevel{bid, ask}, + }, + } + wsm.sendJSON(conn, msg) + } else { + state.mu.RUnlock() + } + } + wsm.mu.RUnlock() + } +} + +// sendInitialL2Book sends an initial l2Book snapshot to a newly subscribed client +func (wsm *WebSocketManager) sendInitialL2Book(conn *websocket.Conn, coin string) { + // Get mock BBO prices based on coin + bid, ask := wsm.getMockBBO(coin) + + msg := map[string]interface{}{ + "channel": "l2Book", + "data": map[string]interface{}{ + "coin": coin, + "time": time.Now().UnixMilli(), + "levels": [2][]WsLevel{ + {{Px: bid.Px, Sz: bid.Sz, N: 1}}, + {{Px: ask.Px, Sz: ask.Sz, N: 1}}, + }, + }, + } + + wsm.sendJSON(conn, msg) +} + +// sendInitialBBO sends an initial BBO snapshot to a newly subscribed client +func (wsm *WebSocketManager) sendInitialBBO(conn *websocket.Conn, coin string) { + // Get mock BBO prices based on coin + bid, ask := wsm.getMockBBO(coin) + + msg := map[string]interface{}{ + "channel": "bbo", + "data": map[string]interface{}{ + "coin": coin, + "time": time.Now().UnixMilli(), + "bbo": []WsLevel{bid, ask}, + }, + } + + wsm.sendJSON(conn, msg) +} + +// getMockBBO returns mock bid/ask prices for a coin +func (wsm *WebSocketManager) getMockBBO(coin string) (bid WsLevel, ask WsLevel) { + switch coin { + case "BTC": + // Use 87000 as the reference price (from IOC tests) + return WsLevel{Px: "86956.5", Sz: "10.5", N: 1}, + WsLevel{Px: "87043.5", Sz: "8.3", N: 1} + case "ETH": + return WsLevel{Px: "2999.5", Sz: "50.0", N: 1}, + WsLevel{Px: "3000.5", Sz: "45.0", N: 1} + case "SOL": + return WsLevel{Px: "99.9", Sz: "100.0", N: 1}, + WsLevel{Px: "100.1", Sz: "95.0", N: 1} + default: + // Default: use a spread of 0.1% + return WsLevel{Px: "999.5", Sz: "10.0", N: 1}, + WsLevel{Px: "1000.5", Sz: "10.0", N: 1} + } +} + +// SetBBO allows tests to manually set BBO prices +func (wsm *WebSocketManager) SetBBO(coin string, bidPx, bidSz, askPx, askSz string) { + levels := [2][]WsLevel{ + {{Px: bidPx, Sz: bidSz, N: 1}}, + {{Px: askPx, Sz: askSz, N: 1}}, + } + wsm.BroadcastL2Book(coin, levels) +} + +// TriggerBBOUpdate forces an immediate BBO update for a coin +func (wsm *WebSocketManager) TriggerBBOUpdate(coin string) { + bid, ask := wsm.getMockBBO(coin) + levels := [2][]WsLevel{ + {bid}, + {ask}, + } + wsm.BroadcastL2Book(coin, levels) +} diff --git a/server/websocket_example_test.go b/server/websocket_example_test.go new file mode 100644 index 0000000..6045467 --- /dev/null +++ b/server/websocket_example_test.go @@ -0,0 +1,178 @@ +package server_test + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/recomma/hyperliquid-mock/server" +) + +// TestWebSocketOrderUpdates demonstrates WebSocket order updates subscription +func TestWebSocketOrderUpdates(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + wsURL := ts.WebSocketURL() + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Subscribe to order updates + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "orderUpdates", + "user": "0x1234567890abcdef", + }, + } + + if err := conn.WriteJSON(subscription); err != nil { + t.Fatalf("Failed to send subscription: %v", err) + } + + // Read subscription acknowledgment + var ack map[string]interface{} + if err := conn.ReadJSON(&ack); err != nil { + t.Fatalf("Failed to read ack: %v", err) + } + + fmt.Printf("Subscription acknowledged: %s\n", ack["channel"]) + + // Create an order (this will trigger a WebSocket update) + // Note: In a real test, you'd use the HTTP API to create the order + // For this example, we're just demonstrating the WebSocket flow + + fmt.Println("WebSocket orderUpdates subscription working!") +} + +// TestWebSocketBBO demonstrates WebSocket BBO subscription +func TestWebSocketBBO(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + wsURL := ts.WebSocketURL() + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Subscribe to BBO for BTC + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "l2Book", + "coin": "BTC", + }, + } + + if err := conn.WriteJSON(subscription); err != nil { + t.Fatalf("Failed to send subscription: %v", err) + } + + // Read subscription acknowledgment + var ack map[string]interface{} + if err := conn.ReadJSON(&ack); err != nil { + t.Fatalf("Failed to read ack: %v", err) + } + + fmt.Printf("Subscription acknowledged: %s\n", ack["channel"]) + + // Read initial BBO snapshot + var bboMsg map[string]interface{} + if err := conn.ReadJSON(&bboMsg); err != nil { + t.Fatalf("Failed to read BBO: %v", err) + } + + if bboMsg["channel"] == "l2Book" { + data := bboMsg["data"].(map[string]interface{}) + fmt.Printf("Received BBO for %s\n", data["coin"]) + + levels := data["levels"].([]interface{}) + bids := levels[0].([]interface{}) + asks := levels[1].([]interface{}) + + if len(bids) > 0 { + bid := bids[0].(map[string]interface{}) + fmt.Printf("Best bid: %s @ %s\n", bid["sz"], bid["px"]) + } + if len(asks) > 0 { + ask := asks[0].(map[string]interface{}) + fmt.Printf("Best ask: %s @ %s\n", ask["sz"], ask["px"]) + } + } + + fmt.Println("WebSocket BBO subscription working!") +} + +// TestManualBBOUpdate demonstrates manual BBO updates +func TestManualBBOUpdate(t *testing.T) { + ts := server.NewTestServer(t) + defer ts.Close() + + // Connect to WebSocket + wsURL := ts.WebSocketURL() + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Subscribe to BBO for BTC + subscription := map[string]interface{}{ + "method": "subscribe", + "subscription": map[string]interface{}{ + "type": "l2Book", + "coin": "BTC", + }, + } + + if err := conn.WriteJSON(subscription); err != nil { + t.Fatalf("Failed to send subscription: %v", err) + } + + // Read subscription acknowledgment + conn.ReadJSON(&map[string]interface{}{}) + + // Read initial BBO snapshot + conn.ReadJSON(&map[string]interface{}{}) + + // Manually set BBO prices + ts.SetBBO("BTC", 87000.0, 5.0, 87100.0, 4.5) + + // Give it a moment to broadcast + time.Sleep(50 * time.Millisecond) + + // Read the updated BBO + var bboUpdate map[string]interface{} + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + if err := conn.ReadJSON(&bboUpdate); err != nil { + t.Fatalf("Failed to read BBO update: %v", err) + } + + if bboUpdate["channel"] == "l2Book" { + data := bboUpdate["data"].(map[string]interface{}) + levels := data["levels"].([]interface{}) + bids := levels[0].([]interface{}) + + if len(bids) > 0 { + bid := bids[0].(map[string]interface{}) + fmt.Printf("Updated bid price: %s\n", bid["px"]) + } + } + + fmt.Println("Manual BBO update working!") +} + +// Helper function to pretty print JSON +func prettyPrint(v interface{}) { + b, _ := json.MarshalIndent(v, "", " ") + fmt.Println(string(b)) +}