From 8057b31639c951c3c299666eb34b14de741a4191 Mon Sep 17 00:00:00 2001 From: "anthropic-code-agent[bot]" <242468646+Claude@users.noreply.github.com> Date: Thu, 12 Feb 2026 05:56:35 +0000 Subject: [PATCH 1/3] Initial plan From 6b8e96d4be22d4ec86f4188e759e897e5a27f5ce Mon Sep 17 00:00:00 2001 From: "anthropic-code-agent[bot]" <242468646+Claude@users.noreply.github.com> Date: Thu, 12 Feb 2026 06:03:12 +0000 Subject: [PATCH 2/3] Add WebSocket real-time quote subscription support - Create subscription manager for WebSocket-based quote push - Add subscribe/unsubscribe directories for file-based subscription control - Integrate subscription processing into controller main loop - Update README with WebSocket subscription documentation Co-authored-by: JetSquirrel <20291255+JetSquirrel@users.noreply.github.com> --- README.md | 65 ++++++++- cmd/longbridge-fs/main.go | 18 ++- internal/market/subscribe.go | 265 +++++++++++++++++++++++++++++++++++ 3 files changed, 346 insertions(+), 2 deletions(-) create mode 100644 internal/market/subscribe.go diff --git a/README.md b/README.md index 3b86f71..1ccad46 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Longbridge FS 是一个基于文件系统的股票交易框架,通过读写文 - **文件驱动** — 通过文件读写完成所有交易操作,AI Agent 无需学习 API - **AI 友好** — JSON 输出,天然适配 AI Agent 的文件操作能力 +- **实时行情** — 支持 WebSocket 订阅,高效获取实时行情推送 - **审计追踪** — 所有交易记录在 beancount 格式的 append-only 账本中 - **盈亏追踪** — 自动生成 `pnl.json` 和 `portfolio.json`,实时追踪持仓盈亏 - **风控止损** — 配置 `risk_control.json` 实现自动止损/止盈 @@ -58,7 +59,9 @@ longbridge-fs/ │ │ └── blocks/ # 归档区块 │ └── quote/ │ ├── hold/{SYMBOL}/ # 行情数据 (.txt + .json) -│ ├── track/ # 行情触发器 +│ ├── track/ # 行情触发器(一次性拉取) +│ ├── subscribe/ # WebSocket 订阅请求 +│ ├── unsubscribe/ # WebSocket 取消订阅请求 │ └── portfolio.json # 组合总览 ├── Makefile ├── go.mod @@ -140,6 +143,32 @@ Controller 会自动检测新 ORDER 并执行,结果追加为 EXECUTION 或 RE ### 查询行情 +#### 方式一:WebSocket 实时订阅(推荐) + +通过 WebSocket 订阅后,行情数据会自动实时更新,无需手动触发: + +```bash +# 订阅实时行情 +touch fs/quote/subscribe/AAPL.US +touch fs/quote/subscribe/700.HK + +# Controller 会自动处理订阅请求 +# 订阅成功后,overview.json 和 overview.txt 会自动更新 +cat fs/quote/hold/AAPL.US/overview.json # 实时更新的行情 + +# 取消订阅 +touch fs/quote/unsubscribe/AAPL.US +``` + +**优势**: +- 高效:WebSocket 长连接,延迟低 +- 实时:价格变动时自动推送更新 +- 省资源:无需轮询,减少 API 调用 + +#### 方式二:一次性拉取(按需获取) + +适合需要历史 K 线、分时等完整行情数据的场景: + ```bash # 触发行情获取 touch fs/quote/track/AAPL.US @@ -153,6 +182,11 @@ cat fs/quote/hold/AAPL.US/5D.json # 5分钟K线 (JSON) cat fs/quote/hold/AAPL.US/intraday.json # 分时数据 (JSON) ``` +**使用建议**: +- 使用 `subscribe/` 订阅需要实时监控的股票(如持仓股票、关注列表) +- 使用 `track/` 按需获取历史数据(如回测、分析) +- 两种方式可以同时使用,互不影响 + ### 查看盈亏 ```bash @@ -195,6 +229,35 @@ make deps # 下载依赖 ## 进阶功能 +### WebSocket 实时行情订阅 + +系统支持两种行情获取方式: + +1. **WebSocket 订阅**:适合需要实时监控的场景 + - 订阅后自动推送更新,无需轮询 + - 低延迟,高效率 + - 适合监控持仓、关注列表 + +2. **一次性拉取**:适合按需查询的场景 + - 通过 `track/` 目录触发 + - 获取完整数据(K线、分时等) + - 适合数据分析、回测 + +订阅示例: + +```bash +# 订阅实时行情 +touch fs/quote/subscribe/AAPL.US + +# 订阅成功后,每当 AAPL.US 价格变化时 +# fs/quote/hold/AAPL.US/overview.json 会自动更新 + +# 取消订阅 +touch fs/quote/unsubscribe/AAPL.US +``` + +**注意**:WebSocket 订阅仅更新 `overview.json` 和 `overview.txt`。如需 K 线、分时等数据,请使用 `track/` 方式。 + ### 批量订单处理 Controller 会批量处理所有未执行的 ORDER,每个订单处理完成后追加结果: diff --git a/cmd/longbridge-fs/main.go b/cmd/longbridge-fs/main.go index 8c6c096..b4986d4 100644 --- a/cmd/longbridge-fs/main.go +++ b/cmd/longbridge-fs/main.go @@ -80,6 +80,8 @@ func cmdInit() { filepath.Join(*root, "trade", "blocks"), filepath.Join(*root, "quote", "hold"), filepath.Join(*root, "quote", "track"), + filepath.Join(*root, "quote", "subscribe"), + filepath.Join(*root, "quote", "unsubscribe"), filepath.Join(*root, "quote", "market"), } for _, d := range dirs { @@ -133,6 +135,7 @@ func cmdController() { var tc *trade.TradeContext var qc *quote.QuoteContext + var subManager *market.SubscriptionManager useMock := *mock if !useMock { @@ -168,6 +171,12 @@ func cmdController() { log.Println("running in MOCK mode (no API calls)") } + // Initialize subscription manager + subManager = market.NewSubscriptionManager(qc, *root) + if qc != nil { + log.Println("WebSocket subscription manager initialized") + } + log.Printf("controller started: root=%s interval=%s compact-after=%d", *root, *interval, *compactAfter) executedCount := 0 @@ -202,7 +211,14 @@ func cmdController() { } } - // Refresh quotes (only with real API) + // Process WebSocket subscription requests (subscribe/unsubscribe) + if subManager != nil { + if err := subManager.ProcessSubscriptions(ctx); err != nil { + log.Printf("subscription processing failed: %v", err) + } + } + + // Refresh quotes via track files (one-shot poll-based) if qc != nil { market.RefreshQuotes(ctx, qc, *root) } diff --git a/internal/market/subscribe.go b/internal/market/subscribe.go new file mode 100644 index 0000000..21774b8 --- /dev/null +++ b/internal/market/subscribe.go @@ -0,0 +1,265 @@ +package market + +import ( + "context" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "longbridge-fs/internal/model" + + "github.com/longportapp/openapi-go/quote" +) + +// SubscriptionManager manages WebSocket-based quote subscriptions +type SubscriptionManager struct { + qc *quote.QuoteContext + root string + subscriptions map[string]bool // symbol -> subscribed + mu sync.RWMutex + subscribeDir string + unsubscribeDir string +} + +// NewSubscriptionManager creates a new subscription manager +func NewSubscriptionManager(qc *quote.QuoteContext, root string) *SubscriptionManager { + sm := &SubscriptionManager{ + qc: qc, + root: root, + subscriptions: make(map[string]bool), + subscribeDir: filepath.Join(root, "quote", "subscribe"), + unsubscribeDir: filepath.Join(root, "quote", "unsubscribe"), + } + + // Set up quote push callback + if qc != nil { + qc.OnQuote(sm.handleQuotePush) + } + + return sm +} + +// ProcessSubscriptions scans for new subscribe/unsubscribe requests +func (sm *SubscriptionManager) ProcessSubscriptions(ctx context.Context) error { + if sm.qc == nil { + return nil // Skip if no quote context (mock mode) + } + + // Process subscribe requests + if err := sm.processSubscribeRequests(ctx); err != nil { + return fmt.Errorf("process subscribe: %w", err) + } + + // Process unsubscribe requests + if err := sm.processUnsubscribeRequests(ctx); err != nil { + return fmt.Errorf("process unsubscribe: %w", err) + } + + return nil +} + +// processSubscribeRequests handles new subscription requests +func (sm *SubscriptionManager) processSubscribeRequests(ctx context.Context) error { + entries, err := os.ReadDir(sm.subscribeDir) + if err != nil { + return nil // Directory might not exist yet + } + + var toSubscribe []string + var filesToRemove []string + + sm.mu.RLock() + for _, e := range entries { + if e.IsDir() || strings.HasPrefix(e.Name(), ".") { + continue + } + symbol := e.Name() + if !sm.subscriptions[symbol] { + toSubscribe = append(toSubscribe, symbol) + filesToRemove = append(filesToRemove, filepath.Join(sm.subscribeDir, symbol)) + } else { + // Already subscribed, just remove the file + filesToRemove = append(filesToRemove, filepath.Join(sm.subscribeDir, symbol)) + } + } + sm.mu.RUnlock() + + if len(toSubscribe) == 0 { + // Clean up any leftover files + for _, f := range filesToRemove { + os.Remove(f) + } + return nil + } + + // Subscribe to new symbols + err = sm.qc.Subscribe(ctx, toSubscribe, []quote.SubType{quote.SubTypeQuote}, true) + if err != nil { + log.Printf("subscribe failed for %v: %v", toSubscribe, err) + return err + } + + sm.mu.Lock() + for _, symbol := range toSubscribe { + sm.subscriptions[symbol] = true + } + sm.mu.Unlock() + + log.Printf("subscribed to real-time quotes: %v", toSubscribe) + + // Remove subscribe request files + for _, f := range filesToRemove { + os.Remove(f) + } + + return nil +} + +// processUnsubscribeRequests handles unsubscription requests +func (sm *SubscriptionManager) processUnsubscribeRequests(ctx context.Context) error { + entries, err := os.ReadDir(sm.unsubscribeDir) + if err != nil { + return nil // Directory might not exist yet + } + + var toUnsubscribe []string + var filesToRemove []string + + sm.mu.RLock() + for _, e := range entries { + if e.IsDir() || strings.HasPrefix(e.Name(), ".") { + continue + } + symbol := e.Name() + if sm.subscriptions[symbol] { + toUnsubscribe = append(toUnsubscribe, symbol) + } + filesToRemove = append(filesToRemove, filepath.Join(sm.unsubscribeDir, symbol)) + } + sm.mu.RUnlock() + + if len(toUnsubscribe) == 0 { + // Clean up any leftover files + for _, f := range filesToRemove { + os.Remove(f) + } + return nil + } + + // Unsubscribe from symbols + err = sm.qc.Unsubscribe(ctx, false, toUnsubscribe, []quote.SubType{quote.SubTypeQuote}) + if err != nil { + log.Printf("unsubscribe failed for %v: %v", toUnsubscribe, err) + return err + } + + sm.mu.Lock() + for _, symbol := range toUnsubscribe { + delete(sm.subscriptions, symbol) + } + sm.mu.Unlock() + + log.Printf("unsubscribed from real-time quotes: %v", toUnsubscribe) + + // Remove unsubscribe request files + for _, f := range filesToRemove { + os.Remove(f) + } + + return nil +} + +// handleQuotePush is called when a real-time quote update is received +func (sm *SubscriptionManager) handleQuotePush(push *quote.PushQuote) { + symbol := push.Symbol + holdSymbolDir := filepath.Join(sm.root, "quote", "hold", symbol) + + if err := os.MkdirAll(holdSymbolDir, 0755); err != nil { + log.Printf("quote push mkdir %s: %v", symbol, err) + return + } + + // Update overview.json with real-time data + if err := sm.writeRealtimeOverview(holdSymbolDir, push); err != nil { + log.Printf("quote push write %s: %v", symbol, err) + return + } + + log.Printf("quote push updated: %s -> hold/%s/overview.json", symbol, symbol) +} + +// writeRealtimeOverview writes real-time quote data to overview.json +func (sm *SubscriptionManager) writeRealtimeOverview(dir string, push *quote.PushQuote) error { + last := decFloat(push.LastDone) + + // Try to read previous overview to get prev_close for calculating change + prevOverview := ReadOverview(dir) + prev := 0.0 + if prevOverview != nil { + prev = prevOverview.PrevClose + } + + change := 0.0 + changePct := 0.0 + if prev != 0 { + change = last - prev + changePct = change / prev * 100 + } + + ov := model.QuoteOverview{ + Symbol: push.Symbol, + Last: last, + Open: decFloat(push.Open), + High: decFloat(push.High), + Low: decFloat(push.Low), + PrevClose: prev, + Volume: push.Volume, + Turnover: decFloat(push.Turnover), + Change: roundN(change, 4), + ChangePct: roundN(changePct, 2), + UpdatedAt: time.Unix(push.Timestamp, 0).UTC().Format(time.RFC3339), + } + + // Also write text format for human readability + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Symbol: %s\n", push.Symbol)) + sb.WriteString(fmt.Sprintf("Last: %s\n", decStr(push.LastDone))) + sb.WriteString(fmt.Sprintf("Open: %s\n", decStr(push.Open))) + sb.WriteString(fmt.Sprintf("High: %s\n", decStr(push.High))) + sb.WriteString(fmt.Sprintf("Low: %s\n", decStr(push.Low))) + if prev != 0 { + sb.WriteString(fmt.Sprintf("Prev: %.2f\n", prev)) + } + sb.WriteString(fmt.Sprintf("Volume: %d\n", push.Volume)) + sb.WriteString(fmt.Sprintf("Turnover: %s\n", decStr(push.Turnover))) + sb.WriteString(fmt.Sprintf("Updated: %s\n", time.Unix(push.Timestamp, 0).UTC().Format(time.RFC3339))) + + os.WriteFile(filepath.Join(dir, "overview.txt"), []byte(sb.String()), 0644) + + return writeJSON(filepath.Join(dir, "overview.json"), ov) +} + +// GetSubscriptions returns the list of currently subscribed symbols +func (sm *SubscriptionManager) GetSubscriptions() []string { + sm.mu.RLock() + defer sm.mu.RUnlock() + + symbols := make([]string, 0, len(sm.subscriptions)) + for symbol := range sm.subscriptions { + symbols = append(symbols, symbol) + } + return symbols +} + +// Close cleans up the subscription manager +func (sm *SubscriptionManager) Close() { + sm.mu.Lock() + defer sm.mu.Unlock() + + // Clear subscriptions map + sm.subscriptions = make(map[string]bool) +} From 9d894ab1827b77afb513949bb93e60211f3d36e7 Mon Sep 17 00:00:00 2001 From: "anthropic-code-agent[bot]" <242468646+Claude@users.noreply.github.com> Date: Fri, 13 Feb 2026 15:20:05 +0000 Subject: [PATCH 3/3] Add pagination examples for viewing large data files - Add jq slicing examples for K-line and historical data - Add tail/head/sed examples for beancount.txt ledger - Add pagination for pnl.json and portfolio.json when many positions - Clarify which files are small vs large to guide appropriate viewing methods Co-authored-by: JetSquirrel <20291255+JetSquirrel@users.noreply.github.com> --- README.md | 49 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1ccad46..f21b71c 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,7 @@ touch fs/quote/subscribe/700.HK # Controller 会自动处理订阅请求 # 订阅成功后,overview.json 和 overview.txt 会自动更新 -cat fs/quote/hold/AAPL.US/overview.json # 实时更新的行情 +cat fs/quote/hold/AAPL.US/overview.json # 实时更新的行情(文件较小,适合直接查看) # 取消订阅 touch fs/quote/unsubscribe/AAPL.US @@ -174,12 +174,19 @@ touch fs/quote/unsubscribe/AAPL.US touch fs/quote/track/AAPL.US # Controller 获取后数据在 hold 目录,track 文件被自动删除 -cat fs/quote/hold/AAPL.US/overview.json # 实时报价 (JSON) -cat fs/quote/hold/AAPL.US/overview.txt # 实时报价 (文本) -cat fs/quote/hold/AAPL.US/D.json # 日K (120天, JSON) -cat fs/quote/hold/AAPL.US/W.json # 周K (52周, JSON) -cat fs/quote/hold/AAPL.US/5D.json # 5分钟K线 (JSON) -cat fs/quote/hold/AAPL.US/intraday.json # 分时数据 (JSON) +cat fs/quote/hold/AAPL.US/overview.json # 实时报价 (JSON,小文件) +cat fs/quote/hold/AAPL.US/overview.txt # 实时报价 (文本,小文件) + +# 查看历史 K 线数据(使用分页避免输出过多) +head -20 fs/quote/hold/AAPL.US/D.json # 查看前 20 行 +tail -20 fs/quote/hold/AAPL.US/D.json # 查看后 20 行 +jq '.[0:10]' fs/quote/hold/AAPL.US/D.json # 查看前 10 条 K 线数据 +jq '.[-10:]' fs/quote/hold/AAPL.US/D.json # 查看最后 10 条 K 线数据 + +# 其他 K 线文件 +jq '.[0:10]' fs/quote/hold/AAPL.US/W.json # 周K 前 10 条 +jq '.[0:10]' fs/quote/hold/AAPL.US/5D.json # 5分钟K 前 10 条 +jq '.[0:10]' fs/quote/hold/AAPL.US/intraday.json # 分时数据前 10 条 ``` **使用建议**: @@ -193,8 +200,15 @@ cat fs/quote/hold/AAPL.US/intraday.json # 分时数据 (JSON) # PnL 报告 (持仓 + 当前价格 → 未实现盈亏) cat fs/account/pnl.json +# 如果持仓较多,使用 jq 分页查看 +jq '.positions[0:5]' fs/account/pnl.json # 查看前 5 个持仓 +jq '.positions[-5:]' fs/account/pnl.json # 查看最后 5 个持仓 + # 组合总览 (所有持仓 + 行情) cat fs/quote/portfolio.json + +# 如果持仓较多,使用 jq 分页查看 +jq '.holdings[0:10]' fs/quote/portfolio.json # 查看前 10 个持仓 ``` ### 风控止损 @@ -313,6 +327,27 @@ fs/trade/blocks/ 主账本文件 `beancount.txt` 只保留未执行的订单,保持文件精简。 +### 查看账本记录(分页) + +由于 `beancount.txt` 是 append-only 账本,随着交易增加会不断增长。建议使用分页方式查看: + +```bash +# 查看最后 50 行(最近的订单) +tail -50 fs/trade/beancount.txt + +# 查看前 50 行(最早的订单) +head -50 fs/trade/beancount.txt + +# 查看特定范围的行(例如第 100-150 行) +sed -n '100,150p' fs/trade/beancount.txt + +# 搜索特定股票的订单 +grep "AAPL.US" fs/trade/beancount.txt | tail -20 + +# 查看归档的历史区块 +tail -50 fs/trade/blocks/block_0001.txt +``` + ### 行情数据格式 行情数据同时提供 JSON 和文本两种格式: