diff --git a/cmd/analytics-mock/main.go b/cmd/analytics-mock/main.go new file mode 100644 index 00000000..90da73cd --- /dev/null +++ b/cmd/analytics-mock/main.go @@ -0,0 +1,114 @@ +package main + +import ( + "encoding/json" + "io" + "log" + "net/http" + "sync" +) + +type AnalyticsMock struct { + mu sync.RWMutex + receivedEvents []map[string]interface{} + totalEvents int +} + +func NewAnalyticsMock() *AnalyticsMock { + return &AnalyticsMock{ + receivedEvents: make([]map[string]interface{}, 0), + } +} + +func (m *AnalyticsMock) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/health" { + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("OK")); err != nil { + log.Printf("Failed to write health response: %v", err) + } + return + } + + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if r.URL.Path != "/events" { + w.WriteHeader(http.StatusNotFound) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + log.Printf("Failed to read body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + var events []map[string]interface{} + if err := json.Unmarshal(body, &events); err != nil { + log.Printf("Failed to unmarshal events: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + m.mu.Lock() + m.receivedEvents = append(m.receivedEvents, events...) + m.totalEvents += len(events) + total := m.totalEvents + m.mu.Unlock() + + log.Printf("✅ Received batch of %d events (total: %d)", len(events), total) + for _, event := range events { + if eventName, ok := event["event_name"].(string); ok { + log.Printf(" - %s", eventName) + } + } + + // Return 202 Accepted like the real analytics server + w.WriteHeader(http.StatusAccepted) +} + +func (m *AnalyticsMock) GetStats(w http.ResponseWriter, r *http.Request) { + m.mu.RLock() + defer m.mu.RUnlock() + + eventTypes := make(map[string]int) + for _, event := range m.receivedEvents { + if eventName, ok := event["event_name"].(string); ok { + eventTypes[eventName]++ + } + } + + stats := map[string]interface{}{ + "total_events": m.totalEvents, + "event_types": eventTypes, + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(stats); err != nil { + log.Printf("Failed to encode stats: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } +} + +func main() { + mock := NewAnalyticsMock() + + http.HandleFunc("/events", mock.ServeHTTP) + http.HandleFunc("/health", mock.ServeHTTP) + http.HandleFunc("/stats", mock.GetStats) + + port := ":9090" + log.Printf("🚀 Analytics Mock Server starting on %s", port) + log.Printf("📊 Endpoints:") + log.Printf(" POST /events - Receive analytics events") + log.Printf(" GET /health - Health check") + log.Printf(" GET /stats - Get statistics") + log.Printf("") + if err := http.ListenAndServe(port, nil); err != nil { + log.Fatal(err) + } +} diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 221da65e..eb5280a1 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "net/http/pprof" @@ -12,12 +13,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "github.com/ton-connect/bridge/internal" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/app" "github.com/ton-connect/bridge/internal/config" bridge_middleware "github.com/ton-connect/bridge/internal/middleware" "github.com/ton-connect/bridge/internal/utils" handlerv1 "github.com/ton-connect/bridge/internal/v1/handler" "github.com/ton-connect/bridge/internal/v1/storage" + "github.com/ton-connect/bridge/tonmetrics" "golang.org/x/exp/slices" "golang.org/x/time/rate" ) @@ -32,7 +35,20 @@ func main() { app.SetBridgeInfo("bridgev1", "postgres") } - dbConn, err := storage.NewStorage(config.Config.PostgresURI) + tonAnalytics := tonmetrics.NewAnalyticsClient() + + collector := analytics.NewCollector(200, tonAnalytics, 500*time.Millisecond) + go collector.Run(context.Background()) + + analyticsBuilder := analytics.NewEventBuilder( + config.Config.TonAnalyticsBridgeURL, + "bridge", + "bridge", + config.Config.TonAnalyticsBridgeVersion, + config.Config.TonAnalyticsNetworkId, + ) + + dbConn, err := storage.NewStorage(config.Config.PostgresURI, collector, analyticsBuilder) if err != nil { log.Fatalf("db connection %v", err) } @@ -93,7 +109,7 @@ func main() { e.Use(corsConfig) } - h := handlerv1.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor) + h := handlerv1.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor, collector, analyticsBuilder) e.GET("/bridge/events", h.EventRegistrationHandler) e.POST("/bridge/message", h.SendMessageHandler) diff --git a/cmd/bridge3/main.go b/cmd/bridge3/main.go index 773cb39c..4c20f5b5 100644 --- a/cmd/bridge3/main.go +++ b/cmd/bridge3/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "net/http/pprof" @@ -12,12 +13,15 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "github.com/ton-connect/bridge/internal" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/app" "github.com/ton-connect/bridge/internal/config" bridge_middleware "github.com/ton-connect/bridge/internal/middleware" + "github.com/ton-connect/bridge/internal/ntp" "github.com/ton-connect/bridge/internal/utils" handlerv3 "github.com/ton-connect/bridge/internal/v3/handler" storagev3 "github.com/ton-connect/bridge/internal/v3/storage" + "github.com/ton-connect/bridge/tonmetrics" "golang.org/x/exp/slices" "golang.org/x/time/rate" ) @@ -27,6 +31,27 @@ func main() { config.LoadConfig() app.InitMetrics() + var timeProvider ntp.TimeProvider + if config.Config.NTPEnabled { + ntpClient := ntp.NewClient(ntp.Options{ + Servers: config.Config.NTPServers, + SyncInterval: time.Duration(config.Config.NTPSyncInterval) * time.Second, + QueryTimeout: time.Duration(config.Config.NTPQueryTimeout) * time.Second, + }) + ctx := context.Background() + ntpClient.Start(ctx) + defer ntpClient.Stop() + timeProvider = ntpClient + log.WithFields(log.Fields{ + "servers": config.Config.NTPServers, + "sync_interval": config.Config.NTPSyncInterval, + }).Info("NTP synchronization enabled") + } else { + timeProvider = ntp.NewLocalTimeProvider() + log.Info("NTP synchronization disabled, using local time") + } + tonAnalytics := tonmetrics.NewAnalyticsClient() + dbURI := "" store := "memory" if config.Config.Storage != "" { @@ -45,7 +70,18 @@ func main() { // No URI needed for memory storage } - dbConn, err := storagev3.NewStorage(store, dbURI) + collector := analytics.NewCollector(200, tonAnalytics, 500*time.Millisecond) + go collector.Run(context.Background()) + + analyticsBuilder := analytics.NewEventBuilder( + config.Config.TonAnalyticsBridgeURL, + "bridge", + "bridge", + config.Config.TonAnalyticsBridgeVersion, + config.Config.TonAnalyticsNetworkId, + ) + + dbConn, err := storagev3.NewStorage(store, dbURI, collector, analyticsBuilder) if err != nil { log.Fatalf("failed to create storage: %v", err) @@ -116,7 +152,7 @@ func main() { e.Use(corsConfig) } - h := handlerv3.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor) + h := handlerv3.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor, timeProvider, collector, analyticsBuilder) e.GET("/bridge/events", h.EventRegistrationHandler) e.POST("/bridge/message", h.SendMessageHandler) diff --git a/docker/Dockerfile.analytics-mock b/docker/Dockerfile.analytics-mock new file mode 100644 index 00000000..ed9e4821 --- /dev/null +++ b/docker/Dockerfile.analytics-mock @@ -0,0 +1,16 @@ +FROM golang:1.24-alpine AS builder + +WORKDIR /app + +# Copy the standalone mock server +COPY cmd/analytics-mock/main.go ./main.go + +# Build the server +RUN go build -o analytics-mock main.go + +FROM alpine:latest +RUN apk --no-cache add ca-certificates curl +WORKDIR /root/ +COPY --from=builder /app/analytics-mock . +EXPOSE 9090 +CMD ["./analytics-mock"] diff --git a/docker/docker-compose.cluster-valkey.yml b/docker/docker-compose.cluster-valkey.yml index 32b0b63e..fd810e2d 100644 --- a/docker/docker-compose.cluster-valkey.yml +++ b/docker/docker-compose.cluster-valkey.yml @@ -80,6 +80,23 @@ services: " restart: "no" + analytics-mock: + build: + context: .. + dockerfile: docker/Dockerfile.analytics-mock + container_name: analytics-mock + ports: + - "9090:9090" + networks: + bridge_network: + ipv4_address: 172.20.0.25 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9090/health"] + interval: 5s + timeout: 3s + retries: 3 + restart: unless-stopped + bridge: build: context: .. @@ -93,6 +110,11 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge:8081" ports: - "8081:8081" # bridge - "9103:9103" # pprof and metrics @@ -105,6 +127,8 @@ services: condition: service_healthy valkey-cluster-init: condition: service_completed_successfully + analytics-mock: + condition: service_healthy networks: bridge_network: ipv4_address: 172.20.0.30 diff --git a/docker/docker-compose.dnsmasq.yml b/docker/docker-compose.dnsmasq.yml index 8e4c4457..044bcb54 100644 --- a/docker/docker-compose.dnsmasq.yml +++ b/docker/docker-compose.dnsmasq.yml @@ -138,6 +138,23 @@ services: " restart: "no" + analytics-mock: + build: + context: .. + dockerfile: docker/Dockerfile.analytics-mock + container_name: analytics-mock + ports: + - "9090:9090" + networks: + bridge_network: + ipv4_address: 172.20.0.25 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9090/health"] + interval: 5s + timeout: 3s + retries: 3 + restart: unless-stopped + bridge1: build: context: .. @@ -151,11 +168,18 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance1:8080" ports: - "9103:9103" # pprof and metrics depends_on: valkey-cluster-init: condition: service_completed_successfully + analytics-mock: + condition: service_healthy dnsmasq: condition: service_started networks: @@ -178,9 +202,17 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + # Analytics configuration + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance2:8080" depends_on: valkey-cluster-init: condition: service_completed_successfully + analytics-mock: + condition: service_healthy dnsmasq: condition: service_started networks: @@ -202,9 +234,16 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance2:8080" depends_on: valkey-cluster-init: condition: service_completed_successfully + analytics-mock: + condition: service_healthy dnsmasq: condition: service_started networks: diff --git a/docker/docker-compose.memory.yml b/docker/docker-compose.memory.yml index f1d38978..3d9036cb 100644 --- a/docker/docker-compose.memory.yml +++ b/docker/docker-compose.memory.yml @@ -8,6 +8,11 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance1:8080" ports: - "8081:8081" # bridge - "9103:9103" # pprof and metrics diff --git a/docker/docker-compose.nginx.yml b/docker/docker-compose.nginx.yml index 98e883d3..ec682a57 100644 --- a/docker/docker-compose.nginx.yml +++ b/docker/docker-compose.nginx.yml @@ -129,6 +129,11 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance1:8080" ports: - "9103:9103" # pprof and metrics depends_on: diff --git a/docker/docker-compose.postgres.yml b/docker/docker-compose.postgres.yml index 24744d69..9fc230de 100644 --- a/docker/docker-compose.postgres.yml +++ b/docker/docker-compose.postgres.yml @@ -28,6 +28,11 @@ services: HEARTBEAT_INTERVAL: 10 RPS_LIMIT: 1000 CONNECTIONS_LIMIT: 200 + TON_ANALYTICS_ENABLED: "true" + TON_ANALYTICS_URL: "http://analytics-mock:9090/events" + TON_ANALYTICS_NETWORK_ID: "-239" + TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0" + TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance1:8080" ports: - "8081:8081" # bridge - "9103:9103" # pprof and metrics diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 06697bfd..f50bba8f 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -43,10 +43,30 @@ TON Connect Bridge uses pub/sub architecture to synchronize state across multipl **Message Sending Flow:** 1. Client sends message via `POST /bridge/message` -2. Bridge publishes message to Redis pub/sub channel (instant delivery to all instances) -3. Bridge stores message in Redis sorted set (for offline clients) -4. All bridge instances with subscribed clients receive the message via pub/sub -5. Bridge instances deliver message to their connected clients via SSE +2. Bridge generates a monotonic event ID using time-based generation +3. Bridge publishes message to Redis pub/sub channel (instant delivery to all instances) +4. Bridge stores message in Redis sorted set (for offline clients) +5. All bridge instances with subscribed clients receive the message via pub/sub +6. Bridge instances deliver message to their connected clients via SSE + +## Time Synchronization + +**Event ID Generation:** +- Bridge uses time-based event IDs to ensure monotonic ordering across instances +- Format: `(timestamp_ms << 11) | local_counter` (53 bits total for JavaScript compatibility) +- 42 bits for timestamp (supports dates up to year 2100), 11 bits for counter +- Provides ~2K events per millisecond per instance + +**NTP Synchronization (Optional):** +- When enabled, all bridge instances synchronize their clocks with NTP servers +- Improves event ordering consistency across distributed instances +- Fallback to local system time if NTP is unavailable +- Configuration: `NTP_ENABLED`, `NTP_SERVERS`, `NTP_SYNC_INTERVAL` + +**Time Provider Architecture:** +- Uses `TimeProvider` interface for clock abstraction +- `ntp.Client`: NTP-synchronized time (recommended for multi-instance deployments) +- `ntp.LocalTimeProvider`: Local system time (single instance or testing) ## Scaling Requirements diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 9ab83961..af35b965 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -54,14 +54,26 @@ Complete reference for all environment variables supported by TON Connect Bridge TODO where to read more about it? +| Variable | Type | Default | Description | +|--------------------------------|--------|---------|--------------------------------------------------------------| +| `TON_ANALYTICS_ENABLED` | bool | `false` | Enable TonConnect analytics | +| `TON_ANALYTICS_URL` | string | `https://analytics.ton.org/events` | TON Analytics endpoint URL | +| `TON_ANALYTICS_BRIDGE_VERSION` | string | `1.0.0` | Bridge version for analytics tracking (auto-set during build) | +| `TON_ANALYTICS_BRIDGE_URL` | string | `localhost` | Public bridge URL for analytics | +| `TON_ANALYTICS_NETWORK_ID` | string | `-239` | TON network: `-239` (mainnet), `-3` (testnet) | + +## NTP Time Synchronization + +Bridge v3 supports NTP time synchronization for consistent `event_id` generation across multiple instances. This ensures monotonic event ordering even when bridge instances run on different servers. + | Variable | Type | Default | Description | |----------|------|---------|-------------| -| `TF_ANALYTICS_ENABLED` | bool | `false` | Enable TonConnect analytics | -| `BRIDGE_NAME` | string | `ton-connect-bridge` | Instance name for metrics/logging | -| `BRIDGE_VERSION` | string | `1.0.0` | Version (auto-set during build) | -| `BRIDGE_URL` | string | `localhost` | Public bridge URL | -| `ENVIRONMENT` | string | `production` | Environment name (`dev`, `staging`, `production`) | -| `NETWORK_ID` | string | `-239` | TON network: `-239` (mainnet), `-3` (testnet) | +| `NTP_ENABLED` | bool | `true` | Enable NTP time synchronization | +| `NTP_SERVERS` | string | `time.google.com,time.cloudflare.com,pool.ntp.org` | Comma-separated NTP server list | +| `NTP_SYNC_INTERVAL` | int | `300` | NTP sync interval (seconds) | +| `NTP_QUERY_TIMEOUT` | int | `5` | NTP query timeout (seconds) | + +**Note:** NTP synchronization is only available in bridge v3. Bridge v1 uses local system time. ## Configuration Presets @@ -74,6 +86,7 @@ CORS_ENABLE=true HEARTBEAT_INTERVAL=10 RPS_LIMIT=50 CONNECTIONS_LIMIT=50 +NTP_ENABLED=true ``` ### 🚀 Production (Redis/Valkey) @@ -89,6 +102,9 @@ CONNECT_CACHE_SIZE=500000 TRUSTED_PROXY_RANGES="10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,{use_your_own_please}" ENVIRONMENT=production BRIDGE_URL="https://use-your-own-bridge.myapp.com" +NTP_ENABLED=true +NTP_SERVERS=time.google.com,time.cloudflare.com,pool.ntp.org +NTP_SYNC_INTERVAL=300 ``` ## Using Environment Files diff --git a/go.mod b/go.mod index 9e340a91..ed0bf86b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/ton-connect/bridge go 1.24.6 require ( + github.com/beevik/ntp v1.5.0 github.com/caarlos0/env/v6 v6.10.1 github.com/golang-migrate/migrate/v4 v4.19.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 10e18feb..dfb67e53 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/beevik/ntp v1.5.0 h1:y+uj/JjNwlY2JahivxYvtmv4ehfi3h74fAuABB9ZSM4= +github.com/beevik/ntp v1.5.0/go.mod h1:mJEhBrwT76w9D+IfOEGvuzyuudiW9E52U2BaTrMOYow= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= diff --git a/internal/analytics/collector.go b/internal/analytics/collector.go new file mode 100644 index 00000000..0d478820 --- /dev/null +++ b/internal/analytics/collector.go @@ -0,0 +1,150 @@ +package analytics + +import ( + "context" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + "github.com/ton-connect/bridge/tonmetrics" +) + +// EventCollector is a non-blocking analytics producer API. +type EventCollector interface { + // TryAdd attempts to enqueue the event. Returns true if enqueued, false if dropped. + TryAdd(interface{}) bool +} + +// Collector provides bounded, non-blocking storage for analytics events and +// periodically flushes them to a backend. When buffer is full, new events are dropped. +type Collector struct { + // Buffer fields + eventCh chan interface{} + notifyCh chan struct{} + + capacity int + triggerCapacity int + dropped atomic.Uint64 + + // Sender fields + sender tonmetrics.AnalyticsClient + flushInterval time.Duration +} + +// NewCollector builds a collector with a periodic flush. +func NewCollector(capacity int, client tonmetrics.AnalyticsClient, flushInterval time.Duration) *Collector { + triggerCapacity := capacity + if capacity > 10 { + triggerCapacity = capacity - 10 + } + return &Collector{ + eventCh: make(chan interface{}, capacity), // channel for events + notifyCh: make(chan struct{}, 1), // channel to trigger flushing + capacity: capacity, + triggerCapacity: triggerCapacity, + sender: client, + flushInterval: flushInterval, + } +} + +// TryAdd enqueues without blocking. If full, returns false and increments drop count. +func (c *Collector) TryAdd(event interface{}) bool { + result := false + select { + case c.eventCh <- event: + result = true + default: + c.dropped.Add(1) + result = false + } + + if len(c.eventCh) >= c.triggerCapacity { + select { + case c.notifyCh <- struct{}{}: + default: + } + } + return result +} + +// PopAll drains all pending events. +// If there are 100 or more events, only reads 100 elements. +func (c *Collector) PopAll() []interface{} { + channelLen := len(c.eventCh) + if channelLen == 0 { + return nil + } + + limit := channelLen + if channelLen >= 100 { + limit = 100 + } + + result := make([]interface{}, 0, limit) + for i := 0; i < limit; i++ { + select { + case event := <-c.eventCh: + result = append(result, event) + default: + return result + } + } + return result +} + +// Dropped returns the number of events that were dropped due to buffer being full. +func (c *Collector) Dropped() uint64 { + return c.dropped.Load() +} + +// IsFull returns true if the buffer is at capacity. +func (c *Collector) IsFull() bool { + return len(c.eventCh) >= c.capacity +} + +// Len returns the current number of events in the buffer. +func (c *Collector) Len() int { + return len(c.eventCh) +} + +// Run periodically flushes events until the context is canceled. +// Flushes occur when: +// 1. The flush interval (500ms) has elapsed and there are events +// 2. The buffer has reached the trigger capacity (capacity - 10) +func (c *Collector) Run(ctx context.Context) { + flushTicker := time.NewTicker(c.flushInterval) + defer flushTicker.Stop() + + logrus.WithField("prefix", "analytics").Debugf("analytics collector started with flush interval %v", c.flushInterval) + + for { + select { + case <-ctx.Done(): + logrus.WithField("prefix", "analytics").Debug("analytics collector stopping, performing final flush") + // Use fresh context for final flush since ctx is already cancelled + flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + c.Flush(flushCtx) + cancel() + logrus.WithField("prefix", "analytics").Debug("analytics collector stopped") + return + case <-flushTicker.C: + if c.Len() > 0 { + logrus.WithField("prefix", "analytics").Debug("analytics collector ticker fired") + c.Flush(ctx) + } + case <-c.notifyCh: + logrus.WithField("prefix", "analytics").Debugf("analytics collector buffer reached %d events, flushing", c.Len()) + c.Flush(ctx) + } + } +} + +func (c *Collector) Flush(ctx context.Context) { + events := c.PopAll() + if len(events) > 0 { + logrus.WithField("prefix", "analytics").Debugf("flushing %d events from collector", len(events)) + if err := c.sender.SendBatch(ctx, events); err != nil { + logrus.WithError(err).Warnf("analytics: failed to send batch of %d events", len(events)) + } + } +} diff --git a/internal/analytics/collector_test.go b/internal/analytics/collector_test.go new file mode 100644 index 00000000..ebbb61e0 --- /dev/null +++ b/internal/analytics/collector_test.go @@ -0,0 +1,622 @@ +package analytics + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +// mockAnalyticsClient is a mock implementation of tonmetrics.AnalyticsClient +type mockAnalyticsClient struct { + batches [][]interface{} + mu sync.Mutex + sendErr error + sendDelay time.Duration + callCount atomic.Int32 +} + +func (m *mockAnalyticsClient) SendBatch(ctx context.Context, events []interface{}) error { + m.callCount.Add(1) + if m.sendDelay > 0 { + time.Sleep(m.sendDelay) + } + if m.sendErr != nil { + return m.sendErr + } + m.mu.Lock() + defer m.mu.Unlock() + m.batches = append(m.batches, events) + return nil +} + +func (m *mockAnalyticsClient) getBatches() [][]interface{} { + m.mu.Lock() + defer m.mu.Unlock() + result := make([][]interface{}, len(m.batches)) + copy(result, m.batches) + return result +} + +func (m *mockAnalyticsClient) totalEvents() int { + m.mu.Lock() + defer m.mu.Unlock() + total := 0 + for _, batch := range m.batches { + total += len(batch) + } + return total +} + +func TestCollector_TryAdd_Basic(t *testing.T) { + rc := NewCollector(3, nil, 0) + + // First add should succeed + if !rc.TryAdd("event1") { + t.Error("expected first TryAdd to succeed") + } + + if rc.Len() != 1 { + t.Errorf("expected length 1, got %d", rc.Len()) + } + + // Second add should succeed + if !rc.TryAdd("event2") { + t.Error("expected second TryAdd to succeed") + } + + if rc.Len() != 2 { + t.Errorf("expected length 2, got %d", rc.Len()) + } + + // Third add should succeed (buffer full) + if !rc.TryAdd("event3") { + t.Error("expected third TryAdd to succeed") + } + + if rc.Len() != 3 { + t.Errorf("expected length 3, got %d", rc.Len()) + } + + // Fourth add should fail (buffer full, drop newest) + if rc.TryAdd("event4") { + t.Error("expected fourth TryAdd to fail (buffer full, drop newest)") + } + + if rc.Len() != 3 { + t.Errorf("expected length to remain 3, got %d", rc.Len()) + } + + if rc.Dropped() != 1 { + t.Errorf("expected dropped count 1, got %d", rc.Dropped()) + } + + // Verify the events in buffer + events := rc.PopAll() + if len(events) != 3 { + t.Errorf("expected 3 events, got %d", len(events)) + } + + expected := []interface{}{"event1", "event2", "event3"} + for i, event := range events { + if event != expected[i] { + t.Errorf("expected event %v at position %d, got %v", expected[i], i, event) + } + } +} + +func TestCollector_TryAdd_DropNewest(t *testing.T) { + rc := NewCollector(2, nil, 0) + + // Fill the buffer + rc.TryAdd("event1") + rc.TryAdd("event2") + + // These should be dropped (buffer full) + if rc.TryAdd("event3") { + t.Error("expected third TryAdd to fail (buffer full)") + } + + if rc.TryAdd("event4") { + t.Error("expected fourth TryAdd to fail (buffer full)") + } + + if rc.Dropped() != 2 { + t.Errorf("expected dropped count 2, got %d", rc.Dropped()) + } + + // Verify only first two events are in buffer + events := rc.PopAll() + if len(events) != 2 { + t.Errorf("expected 2 events, got %d", len(events)) + } + + expected := []interface{}{"event1", "event2"} + for i, event := range events { + if event != expected[i] { + t.Errorf("expected event %v at position %d, got %v", expected[i], i, event) + } + } +} + +func TestCollector_ZeroCapacity(t *testing.T) { + rc := NewCollector(0, nil, 0) + + // All adds should fail + if rc.TryAdd("event1") { + t.Error("expected TryAdd to fail with zero capacity") + } + + if rc.Dropped() != 1 { + t.Errorf("expected dropped count 1, got %d", rc.Dropped()) + } + + if rc.TryAdd("event2") { + t.Error("expected TryAdd to fail with zero capacity") + } + + if rc.Dropped() != 2 { + t.Errorf("expected dropped count 2, got %d", rc.Dropped()) + } + + events := rc.PopAll() + if events != nil { + t.Errorf("expected nil events, got %v", events) + } +} + +func TestCollector_PopAll(t *testing.T) { + rc := NewCollector(5, nil, 0) + + // Add events + for i := 1; i <= 5; i++ { + rc.TryAdd(i) + } + + // Pop all and verify order + events := rc.PopAll() + if len(events) != 5 { + t.Errorf("expected 5 events, got %d", len(events)) + } + + for i, event := range events { + if event.(int) != i+1 { + t.Errorf("expected event %d at position %d, got %v", i+1, i, event) + } + } + + // Buffer should be empty now + if rc.Len() != 0 { + t.Errorf("expected length 0 after PopAll, got %d", rc.Len()) + } + + // Second PopAll should return nil + events = rc.PopAll() + if events != nil { + t.Errorf("expected nil for second PopAll, got %v", events) + } +} + +func TestCollector_IsFull(t *testing.T) { + rc := NewCollector(2, nil, 0) + + if rc.IsFull() { + t.Error("expected buffer not to be full initially") + } + + rc.TryAdd("event1") + if rc.IsFull() { + t.Error("expected buffer not to be full with 1 event") + } + + rc.TryAdd("event2") + if !rc.IsFull() { + t.Error("expected buffer to be full with 2 events") + } + + // Pop all and check again + rc.PopAll() + if rc.IsFull() { + t.Error("expected buffer not to be full after PopAll") + } +} + +func TestCollector_Dropped(t *testing.T) { + rc := NewCollector(2, nil, 0) + + if rc.Dropped() != 0 { + t.Errorf("expected initial dropped count 0, got %d", rc.Dropped()) + } + + rc.TryAdd("event1") + rc.TryAdd("event2") + + if rc.Dropped() != 0 { + t.Errorf("expected dropped count 0 after filling buffer, got %d", rc.Dropped()) + } + + // These should be dropped + rc.TryAdd("event3") + rc.TryAdd("event4") + + if rc.Dropped() != 2 { + t.Errorf("expected dropped count 2, got %d", rc.Dropped()) + } +} + +func TestCollector_Concurrent(t *testing.T) { + rc := NewCollector(1000, nil, 0) + var wg sync.WaitGroup + + // Spawn multiple goroutines to add events + numGoroutines := 10 + eventsPerGoroutine := 100 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < eventsPerGoroutine; j++ { + rc.TryAdd(id*1000 + j) + } + }(i) + } + + wg.Wait() + + // PopAll should return at most 100 events per call when buffer has >= 100 + // So we need to call it multiple times to drain all 1000 events + totalEvents := 0 + for { + events := rc.PopAll() + if events == nil { + break + } + totalEvents += len(events) + // Each batch should be at most 100 events + if len(events) > 100 { + t.Errorf("expected at most 100 events per batch, got %d", len(events)) + } + } + + if totalEvents != numGoroutines*eventsPerGoroutine { + t.Errorf("expected %d total events, got %d", numGoroutines*eventsPerGoroutine, totalEvents) + } + + if rc.Dropped() != 0 { + t.Errorf("expected no dropped events, got %d", rc.Dropped()) + } +} + +func TestCollector_ConcurrentPopAndAdd(t *testing.T) { + rc := NewCollector(100, nil, 0) + var wg sync.WaitGroup + + // Goroutine adding events + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 200; i++ { + rc.TryAdd(i) + } + }() + + // Goroutine popping events + wg.Add(1) + totalPopped := 0 + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + events := rc.PopAll() + totalPopped += len(events) + } + }() + + wg.Wait() + + // Final pop to get remaining events + events := rc.PopAll() + totalPopped += len(events) + + // We should have received all 200 events (either popped or dropped) + totalReceived := totalPopped + int(rc.Dropped()) + if totalReceived != 200 { + t.Errorf("expected 200 total events (popped + dropped), got %d", totalReceived) + } +} + +func TestCollector_TriggerCapacity(t *testing.T) { + // For capacity > 10, triggerCapacity should be capacity - 10 + rc := NewCollector(20, nil, 0) + + // Add 9 events (below triggerCapacity of 10) + for i := 0; i < 9; i++ { + rc.TryAdd(i) + } + + // Check that notifyCh is empty (no notification yet) + select { + case <-rc.notifyCh: + t.Error("expected no notification before reaching triggerCapacity") + default: + // Expected - no notification + } + + // Add event to reach triggerCapacity (10) + rc.TryAdd(9) + + // Check that notifyCh has a notification + select { + case <-rc.notifyCh: + // Expected - notification received + default: + t.Error("expected notification when reaching triggerCapacity") + } +} + +func TestCollector_TriggerCapacitySmallBuffer(t *testing.T) { + // For capacity <= 10, triggerCapacity equals capacity + rc := NewCollector(5, nil, 0) + + // Fill the buffer to capacity + for i := 0; i < 5; i++ { + rc.TryAdd(i) + } + + // Check that notifyCh has a notification + select { + case <-rc.notifyCh: + // Expected - notification received + default: + t.Error("expected notification when reaching triggerCapacity") + } +} + +func TestCollector_Flush(t *testing.T) { + mock := &mockAnalyticsClient{} + rc := NewCollector(10, mock, time.Second) + + // Add some events + for i := 0; i < 5; i++ { + rc.TryAdd(i) + } + + // Flush manually + ctx := context.Background() + rc.Flush(ctx) + + // Check that events were sent + batches := mock.getBatches() + if len(batches) != 1 { + t.Errorf("expected 1 batch, got %d", len(batches)) + } + + if len(batches[0]) != 5 { + t.Errorf("expected 5 events in batch, got %d", len(batches[0])) + } + + // Buffer should be empty now + if rc.Len() != 0 { + t.Errorf("expected buffer to be empty after flush, got %d", rc.Len()) + } +} + +func TestCollector_FlushEmpty(t *testing.T) { + mock := &mockAnalyticsClient{} + rc := NewCollector(10, mock, time.Second) + + // Flush empty buffer + ctx := context.Background() + rc.Flush(ctx) + + // Check that no batches were sent + batches := mock.getBatches() + if len(batches) != 0 { + t.Errorf("expected 0 batches for empty buffer, got %d", len(batches)) + } +} + +func TestCollector_FlushWithError(t *testing.T) { + mock := &mockAnalyticsClient{ + sendErr: context.DeadlineExceeded, + } + rc := NewCollector(10, mock, time.Second) + + // Add some events + for i := 0; i < 5; i++ { + rc.TryAdd(i) + } + + // Flush should not panic even with error + ctx := context.Background() + rc.Flush(ctx) + + // Events should have been popped (even though send failed) + if rc.Len() != 0 { + t.Errorf("expected buffer to be empty after flush, got %d", rc.Len()) + } +} + +func TestCollector_Run_PeriodicFlush(t *testing.T) { + mock := &mockAnalyticsClient{} + flushInterval := 50 * time.Millisecond + rc := NewCollector(100, mock, flushInterval) + + ctx, cancel := context.WithCancel(context.Background()) + + // Start the collector in a goroutine + done := make(chan struct{}) + go func() { + rc.Run(ctx) + close(done) + }() + + // Add some events + for i := 0; i < 5; i++ { + rc.TryAdd(i) + } + + // Wait for at least one flush interval + time.Sleep(flushInterval * 2) + + // Check that events were flushed + if mock.totalEvents() < 5 { + t.Errorf("expected at least 5 events to be flushed, got %d", mock.totalEvents()) + } + + // Cancel and wait for Run to exit + cancel() + select { + case <-done: + // Expected + case <-time.After(time.Second): + t.Error("Run did not exit after context cancellation") + } +} + +func TestCollector_Run_TriggerCapacityFlush(t *testing.T) { + mock := &mockAnalyticsClient{} + // Use a long flush interval so we know flush is triggered by capacity, not timer + flushInterval := 10 * time.Second + capacity := 20 + rc := NewCollector(capacity, mock, flushInterval) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the collector in a goroutine + go rc.Run(ctx) + + // Add events to reach triggerCapacity (capacity - 10 = 10) + for i := 0; i < 10; i++ { + rc.TryAdd(i) + } + + // Wait a bit for the notification to be processed + time.Sleep(100 * time.Millisecond) + + // Check that events were flushed due to trigger capacity + if mock.totalEvents() < 10 { + t.Errorf("expected at least 10 events to be flushed by trigger capacity, got %d", mock.totalEvents()) + } +} + +func TestCollector_Run_FinalFlushOnCancel(t *testing.T) { + mock := &mockAnalyticsClient{} + flushInterval := 10 * time.Second // Long interval to ensure final flush is tested + rc := NewCollector(100, mock, flushInterval) + + ctx, cancel := context.WithCancel(context.Background()) + + // Start the collector in a goroutine + done := make(chan struct{}) + go func() { + rc.Run(ctx) + close(done) + }() + + // Add some events + for i := 0; i < 5; i++ { + rc.TryAdd(i) + } + + // Cancel immediately (before flush interval) + cancel() + + // Wait for Run to exit + select { + case <-done: + // Expected + case <-time.After(time.Second): + t.Error("Run did not exit after context cancellation") + } + + // Check that final flush happened + if mock.totalEvents() != 5 { + t.Errorf("expected 5 events from final flush, got %d", mock.totalEvents()) + } +} + +func TestCollector_Run_NoFlushWhenEmpty(t *testing.T) { + mock := &mockAnalyticsClient{} + flushInterval := 50 * time.Millisecond + rc := NewCollector(100, mock, flushInterval) + + ctx, cancel := context.WithCancel(context.Background()) + + // Start the collector in a goroutine + done := make(chan struct{}) + go func() { + rc.Run(ctx) + close(done) + }() + + // Wait for a few flush intervals without adding events + time.Sleep(flushInterval * 3) + + // Cancel and wait for Run to exit + cancel() + select { + case <-done: + // Expected + case <-time.After(time.Second): + t.Error("Run did not exit after context cancellation") + } + + // Check that no batches were sent (buffer was always empty) + if mock.totalEvents() != 0 { + t.Errorf("expected 0 events (empty buffer), got %d", mock.totalEvents()) + } +} + +func TestCollector_PopAllLimit100(t *testing.T) { + rc := NewCollector(200, nil, 0) + + // Add 150 events + for i := 0; i < 150; i++ { + rc.TryAdd(i) + } + + // First PopAll should return exactly 100 events + events := rc.PopAll() + if len(events) != 100 { + t.Errorf("expected 100 events (limit), got %d", len(events)) + } + + // Second PopAll should return remaining 50 events + events = rc.PopAll() + if len(events) != 50 { + t.Errorf("expected 50 remaining events, got %d", len(events)) + } + + // Third PopAll should return nil + events = rc.PopAll() + if events != nil { + t.Errorf("expected nil for empty buffer, got %v", events) + } +} + +func TestNewCollector_TriggerCapacityCalculation(t *testing.T) { + tests := []struct { + name string + capacity int + expectedTriggerCapacity int + }{ + {"zero capacity", 0, 0}, + {"capacity 1", 1, 1}, + {"capacity 10", 10, 10}, + {"capacity 11", 11, 1}, + {"capacity 20", 20, 10}, + {"capacity 100", 100, 90}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rc := NewCollector(tt.capacity, nil, 0) + if rc.triggerCapacity != tt.expectedTriggerCapacity { + t.Errorf("expected triggerCapacity %d, got %d", tt.expectedTriggerCapacity, rc.triggerCapacity) + } + }) + } +} diff --git a/internal/analytics/event.go b/internal/analytics/event.go new file mode 100644 index 00000000..ab6c2d5c --- /dev/null +++ b/internal/analytics/event.go @@ -0,0 +1,250 @@ +package analytics + +import ( + "fmt" + "time" + + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "github.com/ton-connect/bridge/tonmetrics" +) + +// EventBuilder defines methods to create various analytics events. +type EventBuilder interface { + NewBridgeEventsClientSubscribedEvent(clientID, traceID string) tonmetrics.BridgeEventsClientSubscribedEvent + NewBridgeEventsClientUnsubscribedEvent(clientID, traceID string) tonmetrics.BridgeEventsClientUnsubscribedEvent + NewBridgeMessageSentEvent(clientID, traceID string, messageID int64, messageHash string) tonmetrics.BridgeMessageSentEvent + NewBridgeMessageReceivedEvent(clientID, traceID, requestType string, messageID int64, messageHash string) tonmetrics.BridgeMessageReceivedEvent + NewBridgeMessageExpiredEvent(clientID, traceID string, messageID int64, messageHash string) tonmetrics.BridgeMessageExpiredEvent + NewBridgeMessageValidationFailedEvent(clientID, traceID, requestType, messageHash string) tonmetrics.BridgeMessageValidationFailedEvent + NewBridgeVerifyEvent(clientID, traceID, verificationResult string) tonmetrics.BridgeVerifyEvent + NewBridgeVerifyValidationFailedEvent(clientID, traceID string, errorCode int, errorMessage string) tonmetrics.BridgeVerifyValidationFailedEvent +} + +type AnalyticEventBuilder struct { + bridgeURL string + environment string + subsystem string + version string + networkId string +} + +func NewEventBuilder(bridgeURL, environment, subsystem, version, networkId string) EventBuilder { + return &AnalyticEventBuilder{ + bridgeURL: bridgeURL, + environment: environment, + subsystem: subsystem, + version: version, + networkId: networkId, + } +} + +// NewBridgeEventsClientSubscribedEvent builds a bridge-events-client-subscribed event. +func (a *AnalyticEventBuilder) NewBridgeEventsClientSubscribedEvent(clientID, traceID string) tonmetrics.BridgeEventsClientSubscribedEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeEventsClientSubscribedEventEventNameBridgeEventsClientSubscribed + environment := tonmetrics.BridgeEventsClientSubscribedEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeEventsClientSubscribedEventSubsystem(a.subsystem) + + return tonmetrics.BridgeEventsClientSubscribedEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + TraceId: &traceID, + ClientTimestamp: ×tamp, + EventId: newAnalyticsEventID(), + EventName: &eventName, + NetworkId: &a.networkId, + Subsystem: &subsystem, + Version: &a.version, + } +} + +// NewBridgeEventsClientUnsubscribedEvent builds a bridge-events-client-unsubscribed event. +func (a *AnalyticEventBuilder) NewBridgeEventsClientUnsubscribedEvent(clientID, traceID string) tonmetrics.BridgeEventsClientUnsubscribedEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeEventsClientUnsubscribedEventEventNameBridgeEventsClientUnsubscribed + environment := tonmetrics.BridgeEventsClientUnsubscribedEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeEventsClientUnsubscribedEventSubsystem(a.subsystem) + + return tonmetrics.BridgeEventsClientUnsubscribedEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + TraceId: &traceID, + ClientTimestamp: ×tamp, + EventId: newAnalyticsEventID(), + EventName: &eventName, + NetworkId: &a.networkId, + Subsystem: &subsystem, + Version: &a.version, + } +} + +// NewBridgeMessageSentEvent builds a bridge-message-sent event. +func (a *AnalyticEventBuilder) NewBridgeMessageSentEvent(clientID, traceID string, messageID int64, messageHash string) tonmetrics.BridgeMessageSentEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeMessageSentEventEventNameBridgeMessageSent + environment := tonmetrics.BridgeMessageSentEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeMessageSentEventSubsystem(a.subsystem) + messageIDStr := fmt.Sprintf("%d", messageID) + + return tonmetrics.BridgeMessageSentEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + ClientTimestamp: ×tamp, + EncryptedMessageHash: &messageHash, + EventId: newAnalyticsEventID(), + EventName: &eventName, + MessageId: &messageIDStr, + NetworkId: &a.networkId, + Subsystem: &subsystem, + TraceId: optionalString(traceID), + Version: &a.version, + } +} + +// NewBridgeMessageReceivedEvent builds a bridge message received event. +func (a *AnalyticEventBuilder) NewBridgeMessageReceivedEvent(clientID, traceID, requestType string, messageID int64, messageHash string) tonmetrics.BridgeMessageReceivedEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeMessageReceivedEventEventNameBridgeMessageReceived + environment := tonmetrics.BridgeMessageReceivedEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeMessageReceivedEventSubsystem(a.subsystem) + messageIDStr := fmt.Sprintf("%d", messageID) + + event := tonmetrics.BridgeMessageReceivedEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + ClientTimestamp: ×tamp, + EventId: newAnalyticsEventID(), + EventName: &eventName, + MessageId: &messageIDStr, + EncryptedMessageHash: &messageHash, + NetworkId: &a.networkId, + Subsystem: &subsystem, + TraceId: optionalString(traceID), + Version: &a.version, + } + if requestType != "" { + event.RequestType = &requestType + } + return event +} + +// NewBridgeMessageExpiredEvent builds a bridge-message-expired event. +func (a *AnalyticEventBuilder) NewBridgeMessageExpiredEvent(clientID, traceID string, messageID int64, messageHash string) tonmetrics.BridgeMessageExpiredEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeMessageExpiredEventEventNameBridgeMessageExpired + environment := tonmetrics.BridgeMessageExpiredEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeMessageExpiredEventSubsystem(a.subsystem) + messageIdStr := fmt.Sprintf("%d", messageID) + + return tonmetrics.BridgeMessageExpiredEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + ClientTimestamp: ×tamp, + EncryptedMessageHash: &messageHash, + EventId: newAnalyticsEventID(), + EventName: &eventName, + MessageId: &messageIdStr, + NetworkId: &a.networkId, + Subsystem: &subsystem, + TraceId: optionalString(traceID), + Version: &a.version, + } +} + +// NewBridgeMessageValidationFailedEvent builds a bridge-message-validation-failed event. +func (a *AnalyticEventBuilder) NewBridgeMessageValidationFailedEvent(clientID, traceID, requestType, messageHash string) tonmetrics.BridgeMessageValidationFailedEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeMessageValidationFailedEventEventNameBridgeMessageValidationFailed + environment := tonmetrics.BridgeMessageValidationFailedEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeMessageValidationFailedEventSubsystem(a.subsystem) + + event := tonmetrics.BridgeMessageValidationFailedEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + ClientTimestamp: ×tamp, + EncryptedMessageHash: optionalString(messageHash), + EventId: newAnalyticsEventID(), + EventName: &eventName, + NetworkId: &a.networkId, + Subsystem: &subsystem, + TraceId: optionalString(traceID), + Version: &a.version, + } + if requestType != "" { + event.RequestType = &requestType + } + return event +} + +// NewBridgeVerifyEvent builds a bridge-verify event. +func (a *AnalyticEventBuilder) NewBridgeVerifyEvent(clientID, traceID, verificationResult string) tonmetrics.BridgeVerifyEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeVerifyEventEventNameBridgeVerify + environment := tonmetrics.BridgeVerifyEventClientEnvironment(a.environment) + subsystem := tonmetrics.BridgeVerifyEventSubsystem(a.subsystem) + verifyType := tonmetrics.BridgeVerifyEventVerifyTypeConnect + + return tonmetrics.BridgeVerifyEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + ClientTimestamp: ×tamp, + EventId: newAnalyticsEventID(), + EventName: &eventName, + NetworkId: &a.networkId, + Subsystem: &subsystem, + VerificationResult: optionalString(verificationResult), + VerifyType: &verifyType, + Version: &a.version, + } +} + +// NewBridgeVerifyValidationFailedEvent builds a bridge-verify-validation-failed event. +func (a *AnalyticEventBuilder) NewBridgeVerifyValidationFailedEvent(clientID, traceID string, errorCode int, errorMessage string) tonmetrics.BridgeVerifyValidationFailedEvent { + timestamp := int(time.Now().Unix()) + eventName := tonmetrics.BridgeVerifyValidationFailed + environment := tonmetrics.BridgeVerifyValidationFailedEventClientEnvironment(a.environment) + subsystem := tonmetrics.Bridge + verifyType := tonmetrics.BridgeVerifyValidationFailedEventVerifyTypeConnect + + return tonmetrics.BridgeVerifyValidationFailedEvent{ + BridgeUrl: &a.bridgeURL, + ClientEnvironment: &environment, + ClientId: &clientID, + ClientTimestamp: ×tamp, + ErrorCode: &errorCode, + ErrorMessage: &errorMessage, + EventId: newAnalyticsEventID(), + EventName: &eventName, + NetworkId: &a.networkId, + Subsystem: &subsystem, + TraceId: optionalString(traceID), + VerifyType: &verifyType, + Version: &a.version, + } +} + +func optionalString(value string) *string { + if value == "" { + return nil + } + return &value +} + +func newAnalyticsEventID() *string { + id, err := uuid.NewV7() + if err != nil { + logrus.WithError(err).Warn("Failed to generate UUIDv7, falling back to UUIDv4") + str := uuid.New().String() + return &str + } + str := id.String() + return &str +} diff --git a/internal/config/config.go b/internal/config/config.go index 5d35edef..6ea7f542 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,13 +53,18 @@ var Config = struct { WebhookURL string `env:"WEBHOOK_URL"` CopyToURL string `env:"COPY_TO_URL"` + // NTP Configuration + NTPEnabled bool `env:"NTP_ENABLED" envDefault:"true"` + NTPServers []string `env:"NTP_SERVERS" envSeparator:"," envDefault:"time.google.com,time.cloudflare.com,pool.ntp.org"` + NTPSyncInterval int `env:"NTP_SYNC_INTERVAL" envDefault:"300"` + NTPQueryTimeout int `env:"NTP_QUERY_TIMEOUT" envDefault:"5"` + // TON Analytics - TFAnalyticsEnabled bool `env:"TF_ANALYTICS_ENABLED" envDefault:"false"` - BridgeName string `env:"BRIDGE_NAME" envDefault:"ton-connect-bridge"` - BridgeVersion string `env:"BRIDGE_VERSION" envDefault:"1.0.0"` // TODO start using build version - BridgeURL string `env:"BRIDGE_URL" envDefault:"localhost"` - Environment string `env:"ENVIRONMENT" envDefault:"production"` - NetworkId string `env:"NETWORK_ID" envDefault:"-239"` + TONAnalyticsEnabled bool `env:"TON_ANALYTICS_ENABLED" envDefault:"false"` + TonAnalyticsURL string `env:"TON_ANALYTICS_URL" envDefault:"https://analytics.ton.org/events"` + TonAnalyticsBridgeVersion string `env:"TON_ANALYTICS_BRIDGE_VERSION" envDefault:"1.0.0"` // TODO start using build version + TonAnalyticsBridgeURL string `env:"TON_ANALYTICS_BRIDGE_URL" envDefault:"localhost"` + TonAnalyticsNetworkId string `env:"TON_ANALYTICS_NETWORK_ID" envDefault:"-239"` }{} func LoadConfig() { diff --git a/internal/handler/trace.go b/internal/handler/trace.go new file mode 100644 index 00000000..66c67d06 --- /dev/null +++ b/internal/handler/trace.go @@ -0,0 +1,31 @@ +package handler + +import ( + "github.com/google/uuid" + "github.com/sirupsen/logrus" +) + +func ParseOrGenerateTraceID(traceIdParam string, ok bool) string { + log := logrus.WithField("prefix", "CreateSession") + traceId := "unknown" + if ok { + uuids, err := uuid.Parse(traceIdParam) + if err != nil { + log.WithFields(logrus.Fields{ + "error": err, + "invalid_trace_id": traceIdParam[0], + }).Warn("generating a new trace_id") + } else { + traceId = uuids.String() + } + } + if traceId == "unknown" { + uuids, err := uuid.NewV7() + if err != nil { + log.Error(err) + } else { + traceId = uuids.String() + } + } + return traceId +} diff --git a/internal/ntp/client.go b/internal/ntp/client.go new file mode 100644 index 00000000..f3e219f1 --- /dev/null +++ b/internal/ntp/client.go @@ -0,0 +1,155 @@ +package ntp + +import ( + "context" + "sync/atomic" + "time" + + "github.com/beevik/ntp" + "github.com/sirupsen/logrus" +) + +type Client struct { + servers []string + syncInterval time.Duration + queryTimeout time.Duration + offset atomic.Int64 // stored as nanoseconds (time.Duration) + lastSync atomic.Int64 + stopCh chan struct{} + stopped atomic.Bool +} + +type Options struct { + Servers []string + SyncInterval time.Duration + QueryTimeout time.Duration +} + +func NewClient(opts Options) *Client { + if len(opts.Servers) == 0 { + opts.Servers = []string{ + "time.google.com", + "time.cloudflare.com", + "pool.ntp.org", + } + } + + if opts.SyncInterval == 0 { + opts.SyncInterval = 5 * time.Minute + } + + if opts.QueryTimeout == 0 { + opts.QueryTimeout = 5 * time.Second + } + + client := &Client{ + servers: opts.Servers, + syncInterval: opts.SyncInterval, + queryTimeout: opts.QueryTimeout, + stopCh: make(chan struct{}), + } + + return client +} + +func (c *Client) Start(ctx context.Context) { + if !c.stopped.CompareAndSwap(true, false) { + logrus.Warn("NTP client already started") + return + } + + logrus.WithFields(logrus.Fields{ + "servers": c.servers, + "sync_interval": c.syncInterval, + }).Info("Starting NTP client") + + c.syncOnce() + + go c.syncLoop(ctx) +} + +func (c *Client) Stop() { + if !c.stopped.CompareAndSwap(false, true) { + return + } + close(c.stopCh) + logrus.Info("NTP client stopped") +} + +func (c *Client) syncLoop(ctx context.Context) { + ticker := time.NewTicker(c.syncInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-c.stopCh: + return + case <-ticker.C: + c.syncOnce() + } + } +} + +func (c *Client) syncOnce() { + for _, server := range c.servers { + if c.trySyncWithServer(server) { + return + } + } + + logrus.Warn("Failed to synchronize with any NTP server, using local time") +} + +func (c *Client) trySyncWithServer(server string) bool { + ctx, cancel := context.WithTimeout(context.Background(), c.queryTimeout) + defer cancel() + + options := ntp.QueryOptions{ + Timeout: c.queryTimeout, + } + + response, err := ntp.QueryWithOptions(server, options) + if err != nil { + logrus.WithFields(logrus.Fields{ + "server": server, + "error": err, + }).Debug("Failed to query NTP server") + return false + } + + if err := response.Validate(); err != nil { + logrus.WithFields(logrus.Fields{ + "server": server, + "error": err, + }).Debug("Invalid response from NTP server") + return false + } + + c.offset.Store(int64(response.ClockOffset)) + c.lastSync.Store(time.Now().Unix()) + + logrus.WithFields(logrus.Fields{ + "server": server, + "offset": response.ClockOffset, + "precision": response.RTT / 2, + "rtt": response.RTT, + }).Info("Successfully synchronized with NTP server") + + select { + case <-ctx.Done(): + return false + default: + return true + } +} + +func (c *Client) now() time.Time { + offset := time.Duration(c.offset.Load()) + return time.Now().Add(offset) +} + +func (c *Client) NowUnixMilli() int64 { + return c.now().UnixMilli() +} diff --git a/internal/ntp/interfaces.go b/internal/ntp/interfaces.go new file mode 100644 index 00000000..572b304a --- /dev/null +++ b/internal/ntp/interfaces.go @@ -0,0 +1,7 @@ +package ntp + +// TimeProvider provides the current time in milliseconds. +// This interface allows using either local time or NTP-synchronized time. +type TimeProvider interface { + NowUnixMilli() int64 +} diff --git a/internal/ntp/local.go b/internal/ntp/local.go new file mode 100644 index 00000000..6672f696 --- /dev/null +++ b/internal/ntp/local.go @@ -0,0 +1,16 @@ +package ntp + +import "time" + +// LocalTimeProvider provides time based on the local system clock. +type LocalTimeProvider struct{} + +// NewLocalTimeProvider creates a new local time provider. +func NewLocalTimeProvider() *LocalTimeProvider { + return &LocalTimeProvider{} +} + +// NowUnixMilli returns the current local system time in Unix milliseconds. +func (l *LocalTimeProvider) NowUnixMilli() int64 { + return time.Now().UnixMilli() +} diff --git a/internal/v1/handler/handler.go b/internal/v1/handler/handler.go index dbf57387..7bd8f665 100644 --- a/internal/v1/handler/handler.go +++ b/internal/v1/handler/handler.go @@ -17,17 +17,16 @@ import ( "sync/atomic" "time" - "github.com/google/uuid" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/config" handler_common "github.com/ton-connect/bridge/internal/handler" "github.com/ton-connect/bridge/internal/models" "github.com/ton-connect/bridge/internal/utils" "github.com/ton-connect/bridge/internal/v1/storage" - "github.com/ton-connect/bridge/tonmetrics" ) var validHeartbeatTypes = map[string]string{ @@ -82,10 +81,11 @@ type handler struct { heartbeatInterval time.Duration connectionCache *ConnectionCache realIP *utils.RealIPExtractor - analytics tonmetrics.AnalyticsClient + eventCollector analytics.EventCollector + eventBuilder analytics.EventBuilder } -func NewHandler(db storage.Storage, heartbeatInterval time.Duration, extractor *utils.RealIPExtractor) *handler { +func NewHandler(db storage.Storage, heartbeatInterval time.Duration, extractor *utils.RealIPExtractor, collector analytics.EventCollector, builder analytics.EventBuilder) *handler { connectionCache := NewConnectionCache(config.Config.ConnectCacheSize, time.Duration(config.Config.ConnectCacheTTL)*time.Second) connectionCache.StartBackgroundCleanup(nil) @@ -97,7 +97,8 @@ func NewHandler(db storage.Storage, heartbeatInterval time.Duration, extractor * heartbeatInterval: heartbeatInterval, connectionCache: connectionCache, realIP: extractor, - analytics: tonmetrics.NewAnalyticsClient(), + eventCollector: collector, + eventBuilder: builder, } return &h } @@ -122,9 +123,13 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { if err != nil { badRequestMetric.Inc() log.Error(err) + h.logEventRegistrationValidationFailure("", "", "NewParamsStorage error: ") return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) } + traceIdParam, ok := paramsStore.Get("trace_id") + traceId := handler_common.ParseOrGenerateTraceID(traceIdParam, ok) + heartbeatType := "legacy" if heartbeatParam, exists := paramsStore.Get("heartbeat"); exists { heartbeatType = heartbeatParam @@ -135,6 +140,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "invalid heartbeat type. Supported: legacy and message" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, errorMsg) return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } @@ -151,6 +157,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "Last-Event-ID should be int" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, errorMsg) return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } } @@ -161,6 +168,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "last_event_id should be int" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, errorMsg) return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } } @@ -169,13 +177,15 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "param \"client_id\" not present" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, errorMsg) return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } + clientIds := strings.Split(clientId, ",") clientIdsPerConnectionMetric.Observe(float64(len(clientIds))) connectIP := h.realIP.Extract(c.Request()) - session := h.CreateSession(clientIds, lastEventId) + session := h.CreateSession(clientIds, lastEventId, traceId) ip := h.realIP.Extract(c.Request()) origin := utils.ExtractOrigin(c.Request().Header.Get("Origin")) @@ -189,7 +199,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { go func() { <-notify close(session.Closer) - h.removeConnection(session) + h.removeConnection(session, traceId) log.Infof("connection: %v closed with error %v", session.ClientIds, ctx.Err()) }() @@ -199,8 +209,12 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { // Parse the message, add BridgeConnectSource, keep it for later logging var bridgeMsg models.BridgeMessage + fromID := "unknown" + traceID := "" messageToSend := msg.Message if err := json.Unmarshal(msg.Message, &bridgeMsg); err == nil { + fromID = bridgeMsg.From + traceID = bridgeMsg.TraceId bridgeMsg.BridgeConnectSource = models.BridgeConnectSource{ IP: connectIP, } @@ -228,22 +242,20 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { logrus.WithFields(logrus.Fields{ "hash": messageHash, - "from": bridgeMsg.From, + "from": fromID, "to": msg.To, "event_id": msg.EventId, - "trace_id": bridgeMsg.TraceId, + "trace_id": traceID, }).Debug("message sent") - go h.analytics.SendEvent(tonmetrics.CreateBridgeRequestReceivedEvent( - config.Config.BridgeURL, - msg.To, - bridgeMsg.TraceId, - config.Config.Environment, - config.Config.BridgeVersion, - config.Config.NetworkId, - msg.EventId, - )) - + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageSentEvent( + msg.To, + traceID, + msg.EventId, + messageHash, + )) + } deliveredMessagesMetric.Inc() storage.ExpiredCache.Mark(msg.EventId) } @@ -258,20 +270,29 @@ func (h *handler) SendMessageHandler(c echo.Context) error { log := logrus.WithContext(ctx).WithField("prefix", "SendMessageHandler") params := c.QueryParams() - clientId, ok := params["client_id"] + + traceIdParam, ok := params["trace_id"] + traceIdValue := "" + if ok && len(traceIdParam) > 0 { + traceIdValue = traceIdParam[0] + } + traceId := handler_common.ParseOrGenerateTraceID(traceIdValue, ok && len(traceIdParam) > 0) + + clientIdValues, ok := params["client_id"] if !ok { badRequestMetric.Inc() errorMsg := "param \"client_id\" not present" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, errorMsg, "", traceId, "", "") } + clientID := clientIdValues[0] toId, ok := params["to"] if !ok { badRequestMetric.Inc() errorMsg := "param \"to\" not present" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, errorMsg, clientID, traceId, "", "") } ttlParam, ok := params["ttl"] @@ -279,28 +300,28 @@ func (h *handler) SendMessageHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "param \"ttl\" not present" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, errorMsg, clientID, traceId, "", "") } ttl, err := strconv.ParseInt(ttlParam[0], 10, 32) if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, err.Error(), clientID, traceId, "", "") } if ttl > 300 { // TODO: config badRequestMetric.Inc() errorMsg := "param \"ttl\" too high" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, errorMsg, clientID, traceId, "", "") } message, err := io.ReadAll(c.Request().Body) if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, err.Error(), clientID, traceId, "", "") } - data := append(message, []byte(clientId[0])...) + data := append(message, []byte(clientID)...) sum := sha256.Sum256(data) messageId := int64(binary.BigEndian.Uint64(sum[:8])) if ok := storage.TransferedCache.MarkIfNotExists(messageId); ok { @@ -327,29 +348,7 @@ func (h *handler) SendMessageHandler(c echo.Context) error { topic = topicParam[0] go func(clientID, topic, message string) { handler_common.SendWebhook(clientID, handler_common.WebhookData{Topic: topic, Hash: message}) - }(clientId[0], topic, string(message)) - } - - traceIdParam, ok := params["trace_id"] - traceId := "unknown" - if ok { - uuids, err := uuid.Parse(traceIdParam[0]) - if err != nil { - log.WithFields(logrus.Fields{ - "error": err, - "invalid_trace_id": traceIdParam[0], - }).Warn("generating a new trace_id") - } else { - traceId = uuids.String() - } - } - if traceId == "unknown" { - uuids, err := uuid.NewV7() - if err != nil { - log.Error(err) - } else { - traceId = uuids.String() - } + }(clientID, topic, string(message)) } var requestSource string @@ -373,13 +372,20 @@ func (h *handler) SendMessageHandler(c echo.Context) error { if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(fmt.Sprintf("failed to encrypt request source: %v", err), http.StatusBadRequest)) + return h.logMessageSentValidationFailure( + c, + fmt.Sprintf("failed to encrypt request source: %v", err), + clientID, + traceId, + topic, + "", + ) } requestSource = encryptedRequestSource } mes, err := json.Marshal(models.BridgeMessage{ - From: clientId[0], + From: clientID, Message: string(message), BridgeRequestSource: requestSource, TraceId: traceId, @@ -387,7 +393,7 @@ func (h *handler) SendMessageHandler(c echo.Context) error { if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) + return h.logMessageSentValidationFailure(c, err.Error(), clientID, traceId, topic, "") } if topic == "disconnect" && len(mes) < config.Config.DisconnectEventMaxSize { @@ -435,19 +441,18 @@ func (h *handler) SendMessageHandler(c echo.Context) error { "from": fromId, "to": toId[0], "event_id": sseMessage.EventId, - "trace_id": bridgeMsg.TraceId, + "trace_id": traceId, }).Debug("message received") - go h.analytics.SendEvent(tonmetrics.CreateBridgeRequestSentEvent( - config.Config.BridgeURL, - clientId[0], - traceId, - topic, - config.Config.Environment, - config.Config.BridgeVersion, - config.Config.NetworkId, - sseMessage.EventId, - )) + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageReceivedEvent( + clientID, + traceId, + topic, + sseMessage.EventId, + messageHash, + )) + } transferedMessagesNumMetric.Inc() return c.JSON(http.StatusOK, utils.HttpResOk()) @@ -460,17 +465,44 @@ func (h *handler) ConnectVerifyHandler(c echo.Context) error { paramsStore, err := handler_common.NewParamsStorage(c, config.Config.MaxBodySize) if err != nil { badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + "", + "", + http.StatusBadRequest, + err.Error(), + )) + } return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) } + traceIdParam, ok := paramsStore.Get("trace_id") + traceId := handler_common.ParseOrGenerateTraceID(traceIdParam, ok) + clientId, ok := paramsStore.Get("client_id") if !ok { badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + "", + traceId, + http.StatusBadRequest, + "param \"client_id\" not present", + )) + } return c.JSON(utils.HttpResError("param \"client_id\" not present", http.StatusBadRequest)) } url, ok := paramsStore.Get("url") if !ok { badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + clientId, + traceId, + http.StatusBadRequest, + "param \"url\" not present", + )) + } return c.JSON(utils.HttpResError("param \"url\" not present", http.StatusBadRequest)) } qtype, ok := paramsStore.Get("type") @@ -481,14 +513,25 @@ func (h *handler) ConnectVerifyHandler(c echo.Context) error { switch strings.ToLower(qtype) { case "connect": status := h.connectionCache.Verify(clientId, ip, utils.ExtractOrigin(url)) + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyEvent(clientId, traceId, status)) + } return c.JSON(http.StatusOK, verifyResponse{Status: status}) default: badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + clientId, + traceId, + http.StatusBadRequest, + "param \"type\" must be one of: connect, message", + )) + } return c.JSON(utils.HttpResError("param \"type\" must be one of: connect, message", http.StatusBadRequest)) } } -func (h *handler) removeConnection(ses *Session) { +func (h *handler) removeConnection(ses *Session, traceID string) { log := logrus.WithField("prefix", "removeConnection") log.Infof("remove session: %v", ses.ClientIds) for _, id := range ses.ClientIds { @@ -515,10 +558,13 @@ func (h *handler) removeConnection(ses *Session) { h.Mux.Unlock() } activeSubscriptionsMetric.Dec() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeEventsClientUnsubscribedEvent(id, traceID)) + } } } -func (h *handler) CreateSession(clientIds []string, lastEventId int64) *Session { +func (h *handler) CreateSession(clientIds []string, lastEventId int64, traceId string) *Session { log := logrus.WithField("prefix", "CreateSession") log.Infof("make new session with ids: %v", clientIds) session := NewSession(h.storage, clientIds, lastEventId) @@ -541,6 +587,9 @@ func (h *handler) CreateSession(clientIds []string, lastEventId int64) *Session } activeSubscriptionsMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeEventsClientSubscribedEvent(id, traceId)) + } } return session } @@ -548,3 +597,34 @@ func (h *handler) CreateSession(clientIds []string, lastEventId int64) *Session func (h *handler) nextID() int64 { return atomic.AddInt64(&h._eventIDs, 1) } + +func (h *handler) logEventRegistrationValidationFailure(clientID, traceID, errorMsg string) { + if h.eventCollector == nil { + return + } + h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageValidationFailedEvent( + clientID, + traceID, + "", + errorMsg, + )) +} + +func (h *handler) logMessageSentValidationFailure( + c echo.Context, + msg string, + clientID string, + traceID string, + topic string, + messageHash string, +) error { + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageValidationFailedEvent( + clientID, + traceID, + topic, + messageHash, + )) + } + return c.JSON(utils.HttpResError(msg, http.StatusBadRequest)) +} diff --git a/internal/v1/storage/mem.go b/internal/v1/storage/mem.go index c355b073..66537f38 100644 --- a/internal/v1/storage/mem.go +++ b/internal/v1/storage/mem.go @@ -9,12 +9,15 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/models" ) type MemStorage struct { - db map[string][]message - lock sync.Mutex + db map[string][]message + lock sync.Mutex + analytics analytics.EventCollector + eventBuilder analytics.EventBuilder } type message struct { @@ -26,20 +29,39 @@ func (m message) IsExpired(now time.Time) bool { return m.expireAt.Before(now) } -func NewMemStorage() *MemStorage { +func NewMemStorage(collector analytics.EventCollector, builder analytics.EventBuilder) *MemStorage { s := MemStorage{ - db: map[string][]message{}, + db: map[string][]message{}, + analytics: collector, + eventBuilder: builder, } go s.watcher() return &s } -func removeExpiredMessages(ms []message, now time.Time, clientID string) []message { - log := log.WithField("prefix", "removeExpiredMessages") +func removeExpiredMessages(ms []message, now time.Time) ([]message, []message) { results := make([]message, 0) + expired := make([]message, 0) for _, m := range ms { if m.IsExpired(now) { if !ExpiredCache.IsMarked(m.EventId) { + expired = append(expired, m) + } + } else { + results = append(results, m) + } + } + return results, expired +} + +func (s *MemStorage) watcher() { + for { + s.lock.Lock() + for key, msgs := range s.db { + actual, expired := removeExpiredMessages(msgs, time.Now()) + s.db[key] = actual + + for _, m := range expired { var bridgeMsg models.BridgeMessage fromID := "unknown" hash := sha256.Sum256(m.Message) @@ -50,27 +72,21 @@ func removeExpiredMessages(ms []message, now time.Time, clientID string) []messa contentHash := sha256.Sum256([]byte(bridgeMsg.Message)) messageHash = hex.EncodeToString(contentHash[:]) } - log.WithFields(map[string]interface{}{ "hash": messageHash, "from": fromID, - "to": clientID, + "to": key, "event_id": m.EventId, "trace_id": bridgeMsg.TraceId, }).Debug("message expired") - } - } else { - results = append(results, m) - } - } - return results -} -func (s *MemStorage) watcher() { - for { - s.lock.Lock() - for key, msgs := range s.db { - s.db[key] = removeExpiredMessages(msgs, time.Now(), key) + _ = s.analytics.TryAdd(s.eventBuilder.NewBridgeMessageExpiredEvent( + key, + bridgeMsg.TraceId, + m.EventId, + messageHash, + )) + } } s.lock.Unlock() diff --git a/internal/v1/storage/mem_test.go b/internal/v1/storage/mem_test.go index 312a2ab9..60c02c72 100644 --- a/internal/v1/storage/mem_test.go +++ b/internal/v1/storage/mem_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/models" ) @@ -17,7 +18,6 @@ func newMessage(expire time.Time, i int) message { } func Test_removeExpiredMessages(t *testing.T) { - now := time.Now() tests := []struct { name string @@ -55,7 +55,7 @@ func Test_removeExpiredMessages(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := removeExpiredMessages(tt.ms, tt.now, "test-key"); !reflect.DeepEqual(got, tt.want) { + if got, _ := removeExpiredMessages(tt.ms, tt.now); !reflect.DeepEqual(got, tt.want) { t.Errorf("removeExpiredMessages() = %v, want %v", got, tt.want) } }) @@ -63,7 +63,12 @@ func Test_removeExpiredMessages(t *testing.T) { } func TestStorage(t *testing.T) { - s := &MemStorage{db: map[string][]message{}} + builder := analytics.NewEventBuilder("http://test", "test", "bridge", "1.0.0", "-239") + s := &MemStorage{ + db: map[string][]message{}, + analytics: analytics.NewCollector(10, nil, 0), + eventBuilder: builder, + } _ = s.Add(context.Background(), models.SseMessage{EventId: 1, To: "1"}, 2) _ = s.Add(context.Background(), models.SseMessage{EventId: 2, To: "2"}, 2) _ = s.Add(context.Background(), models.SseMessage{EventId: 3, To: "2"}, 2) @@ -145,7 +150,12 @@ func TestStorage_watcher(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := &MemStorage{db: tt.db} + builder := analytics.NewEventBuilder("http://test", "test", "bridge", "1.0.0", "-239") + s := &MemStorage{ + db: tt.db, + analytics: analytics.NewCollector(10, nil, 0), + eventBuilder: builder, + } go s.watcher() time.Sleep(500 * time.Millisecond) s.lock.Lock() diff --git a/internal/v1/storage/pg.go b/internal/v1/storage/pg.go index 65bfc322..6a41dd11 100644 --- a/internal/v1/storage/pg.go +++ b/internal/v1/storage/pg.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/config" "github.com/ton-connect/bridge/internal/models" ) @@ -38,7 +39,9 @@ var ( type Message []byte type PgStorage struct { - postgres *pgxpool.Pool + postgres *pgxpool.Pool + analytics analytics.EventCollector + eventBuilder analytics.EventBuilder } //go:embed migrations/*.sql @@ -112,7 +115,7 @@ func configurePoolSettings(postgresURI string) (*pgxpool.Config, error) { return poolConfig, nil } -func NewPgStorage(postgresURI string) (*PgStorage, error) { +func NewPgStorage(postgresURI string, collector analytics.EventCollector, builder analytics.EventBuilder) (*PgStorage, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) log := logrus.WithField("prefix", "NewStorage") defer cancel() @@ -135,7 +138,9 @@ func NewPgStorage(postgresURI string) (*PgStorage, error) { return nil, err } s := PgStorage{ - postgres: c, + postgres: c, + analytics: collector, + eventBuilder: builder, } go s.worker() return &s, nil @@ -203,6 +208,13 @@ func (s *PgStorage) worker() { "event_id": eventID, "trace_id": traceID, }).Debug("message expired") + + _ = s.analytics.TryAdd(s.eventBuilder.NewBridgeMessageExpiredEvent( + clientID, + traceID, + eventID, + messageHash, + )) } } rows.Close() diff --git a/internal/v1/storage/storage.go b/internal/v1/storage/storage.go index f8b9ea1a..6a012421 100644 --- a/internal/v1/storage/storage.go +++ b/internal/v1/storage/storage.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/config" "github.com/ton-connect/bridge/internal/models" common_storage "github.com/ton-connect/bridge/internal/storage" @@ -20,9 +21,9 @@ type Storage interface { HealthCheck() error } -func NewStorage(dbURI string) (Storage, error) { +func NewStorage(dbURI string, collector analytics.EventCollector, builder analytics.EventBuilder) (Storage, error) { if dbURI != "" { - return NewPgStorage(dbURI) + return NewPgStorage(dbURI, collector, builder) } - return NewMemStorage(), nil + return NewMemStorage(collector, builder), nil } diff --git a/internal/v3/handler/events.go b/internal/v3/handler/events.go index 48440d84..34960bf6 100644 --- a/internal/v3/handler/events.go +++ b/internal/v3/handler/events.go @@ -3,43 +3,54 @@ package handlerv3 import ( "math/rand" "sync/atomic" - "time" + + "github.com/ton-connect/bridge/internal/ntp" ) // EventIDGenerator generates monotonically increasing event IDs across multiple bridge instances. // Uses time-based ID generation with local sequence counter to ensure uniqueness and ordering. -// Format: (timestamp_ms << 16) | local_counter -// This provides ~65K events per millisecond per bridge instance. +// Format: (timestamp_ms << 11) | local_counter (53 bits total for JavaScript compatibility) +// This provides ~2K events per millisecond per bridge instance. // // Note: Due to concurrent generation and potential clock skew between instances, // up to 5% of events may not be in strict monotonic sequence, which is acceptable // for the bridge's event ordering requirements. type EventIDGenerator struct { - counter int64 // Local sequence counter, incremented atomically - offset int64 // Random offset per instance to avoid collisions + counter int64 // Local sequence counter, incremented atomically + offset int64 // Random offset per instance to avoid collisions (11 bits) + timeProvider ntp.TimeProvider // Time source (local or NTP-synchronized) } // NewEventIDGenerator creates a new event ID generator with counter starting from 0. -func NewEventIDGenerator() *EventIDGenerator { +// The timeProvider parameter determines the time source: +// - Use ntp.Client for NTP-synchronized time (better consistency across bridge instances) +// - Use ntp.LocalTimeProvider for local system time (fallback when NTP is unavailable) +func NewEventIDGenerator(timeProvider ntp.TimeProvider) *EventIDGenerator { return &EventIDGenerator{ - counter: 0, - offset: rand.Int63() & 0xFFFF, // Random offset to avoid collisions between instances + counter: 0, + offset: rand.Int63() & 0x7FF, // Random offset (11 bits) to avoid collisions between instances + timeProvider: timeProvider, } } // NextID generates the next monotonic event ID. // -// The ID format combines: -// - Upper 48 bits: Unix timestamp in milliseconds (provides time-based ordering) -// - Lower 16 bits: Local counter masked to 16 bits (handles multiple events per millisecond) +// The ID format combines (53 bits total for JavaScript Number.MAX_SAFE_INTEGER compatibility): +// - Upper 42 bits: Unix timestamp in milliseconds (supports dates up to year 2100) +// - Lower 11 bits: Local counter masked to 11 bits (handles multiple events per millisecond) // // This approach ensures: // - IDs are mostly monotonic if bridge instances have synchronized clocks (NTP) // - No central coordination needed → scalable across multiple bridge instances -// - Unique IDs even with high event rates (65K events/ms per instance) +// - Unique IDs even with high event rates (~2K events/ms per instance) // - Works well with SSE last_event_id for client reconnection +// - Safe integer representation in JavaScript (< 2^53) func (g *EventIDGenerator) NextID() int64 { - timestamp := time.Now().UnixMilli() + timestamp := g.timeProvider.NowUnixMilli() counter := atomic.AddInt64(&g.counter, 1) - return (timestamp << 16) | ((counter + g.offset) & 0xFFFF) + return getIdFromParams(timestamp, counter+g.offset) +} + +func getIdFromParams(timestamp int64, nonce int64) int64 { + return ((timestamp << 11) | (nonce & 0x7FF)) & 0x1FFFFFFFFFFFFF } diff --git a/internal/v3/handler/events_test.go b/internal/v3/handler/events_test.go index a4bb119b..1b121cf1 100644 --- a/internal/v3/handler/events_test.go +++ b/internal/v3/handler/events_test.go @@ -1,12 +1,16 @@ package handlerv3 import ( + "math/rand" "sync" "testing" + "time" + + "github.com/ton-connect/bridge/internal/ntp" ) func TestEventIDGenerator_NextID(t *testing.T) { - gen := NewEventIDGenerator() + gen := NewEventIDGenerator(ntp.NewLocalTimeProvider()) id1 := gen.NextID() id2 := gen.NextID() @@ -20,8 +24,8 @@ func TestEventIDGenerator_NextID(t *testing.T) { } func TestEventIDGenerator_RandomOffset(t *testing.T) { - gen1 := NewEventIDGenerator() - gen2 := NewEventIDGenerator() + gen1 := NewEventIDGenerator(ntp.NewLocalTimeProvider()) + gen2 := NewEventIDGenerator(ntp.NewLocalTimeProvider()) // Generators should have different offsets if gen1.offset == gen2.offset { @@ -38,7 +42,7 @@ func TestEventIDGenerator_RandomOffset(t *testing.T) { } func TestEventIDGenerator_SingleGenerators_Ordering(t *testing.T) { - gen := NewEventIDGenerator() + gen := NewEventIDGenerator(ntp.NewLocalTimeProvider()) const numIDs = 1000 idsChan := make(chan int64, numIDs) @@ -89,3 +93,160 @@ func TestEventIDGenerator_SingleGenerators_Ordering(t *testing.T) { t.Errorf("Too many out-of-order IDs: %d (max allowed: %d)", reversedOrderCount, maxOutOfOrder) } } + +func TestGetIdFromParams_TimestampEncoding(t *testing.T) { + // Test timestamps that fit within 42 bits (up to ~year 2109 from Unix epoch) + // With 53 bits total and 11 bits for nonce, we have 42 bits for timestamp + // These should round-trip exactly + testCasesExact := []struct { + name string + timestamp int64 + }{ + {"Unix epoch", 0}, + {"Year 1970", time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()}, + {"Year 2000", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()}, + {"Year 2024", time.Date(2024, 12, 2, 0, 0, 0, 0, time.UTC).UnixMilli()}, + {"Year 2050", time.Date(2050, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()}, + {"Year 2100", time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli()}, + {"Max 42-bit timestamp", int64(1<<42 - 1)}, + } + + for _, tc := range testCasesExact { + t.Run(tc.name, func(t *testing.T) { + id := getIdFromParams(tc.timestamp, 0) + + // Extract timestamp from ID (upper 42 bits) + extractedTimestamp := id >> 11 + + if extractedTimestamp != tc.timestamp { + t.Errorf("Timestamp mismatch: got %d, want %d", extractedTimestamp, tc.timestamp) + } + + // Verify ID is within 53-bit range + max53Bit := int64(0x1FFFFFFFFFFFFF) // 2^53 - 1 + if id > max53Bit { + t.Errorf("ID %d exceeds 53-bit limit %d", id, max53Bit) + } + }) + } +} + +func TestGetIdFromParams_NonceEncoding(t *testing.T) { + timestamp := time.Date(2024, 12, 2, 0, 0, 0, 0, time.UTC).UnixMilli() + + testCases := []struct { + name string + nonce int64 + expectedNonce int64 // Expected after 11-bit masking + }{ + {"Zero nonce", 0, 0}, + {"Nonce 1", 1, 1}, + {"Nonce 100", 100, 100}, + {"Nonce 1000", 1000, 1000}, + {"Max 11-bit nonce", 0x7FF, 0x7FF}, + {"Overflow nonce 0x800", 0x800, 0}, // Should wrap to 0 + {"Overflow nonce 0x801", 0x801, 1}, // Should wrap to 1 + {"Large nonce", 123456789, 123456789 & 0x7FF}, // Should be masked + {"Very large nonce", 9999999999, 9999999999 & 0x7FF}, + {"Random big number", 0x7FFFFFFFFFFFFFFF, 0x7FFFFFFFFFFFFFFF & 0x7FF}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + id := getIdFromParams(timestamp, tc.nonce) + + // Extract nonce from ID (lower 11 bits) + extractedNonce := id & 0x7FF + + if extractedNonce != tc.expectedNonce { + t.Errorf("Nonce mismatch: got %d, want %d", extractedNonce, tc.expectedNonce) + } + + // Verify timestamp is preserved + extractedTimestamp := id >> 11 + if extractedTimestamp != timestamp { + t.Errorf("Timestamp was corrupted: got %d, want %d", extractedTimestamp, timestamp) + } + }) + } +} + +func TestGetIdFromParams_RoundTrip(t *testing.T) { + // Test that we can encode and decode many random combinations + r := rand.New(rand.NewSource(42)) + + // Maximum timestamp that fits in 42 bits (2^42 - 1 ms ≈ 139 years from epoch) + // After 53-bit masking, timestamps up to 42 bits are preserved exactly + max42BitTimestamp := int64(1<<42 - 1) + + for i := 0; i < 10; i++ { + // Random timestamp within 42-bit range + timestamp := r.Int63n(max42BitTimestamp) + // Random nonce (can be any large number, will be masked to 11 bits) + nonce := r.Int63() + expectedNonce := nonce & 0x7FF + + id := getIdFromParams(timestamp, nonce) + + // Decode + extractedTimestamp := id >> 11 + extractedNonce := id & 0x7FF + + if extractedTimestamp != timestamp { + t.Errorf("Iteration %d: Timestamp mismatch: got %d, want %d", i, extractedTimestamp, timestamp) + } + if extractedNonce != expectedNonce { + t.Errorf("Iteration %d: Nonce mismatch: got %d, want %d (original: %d)", i, extractedNonce, expectedNonce, nonce) + } + + // Verify within 53-bit range + max53Bit := int64(0x1FFFFFFFFFFFFF) + if id > max53Bit { + t.Errorf("Iteration %d: ID %d exceeds 53-bit limit", i, id) + } + } +} + +func TestGetIdFromParams_53BitLimit(t *testing.T) { + // Test edge case: maximum 42-bit timestamp + max42BitTimestamp := int64(1<<42 - 1) + maxNonce := int64(0x7FF) + + id := getIdFromParams(max42BitTimestamp, maxNonce) + + // Should be exactly 2^53 - 1 + expectedMax := int64(0x1FFFFFFFFFFFFF) + if id != expectedMax { + t.Errorf("Max ID mismatch: got %d (0x%X), want %d (0x%X)", id, id, expectedMax, expectedMax) + } + + // Test that timestamps beyond 42 bits get truncated by the final mask + beyond42BitTimestamp := int64(1 << 43) + id2 := getIdFromParams(beyond42BitTimestamp, 0) + + // Should be truncated to fit within 53 bits + if id2 > expectedMax { + t.Errorf("ID with large timestamp exceeds 53 bits: %d", id2) + } +} + +func TestGetIdFromParams_Monotonicity(t *testing.T) { + // IDs should increase when timestamp increases (with same nonce) + timestamp1 := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixMilli() + timestamp2 := time.Date(2024, 1, 1, 0, 0, 1, 0, time.UTC).UnixMilli() // 1 second later + + id1 := getIdFromParams(timestamp1, 0) + id2 := getIdFromParams(timestamp2, 0) + + if id2 <= id1 { + t.Errorf("ID should increase with timestamp: id1=%d, id2=%d", id1, id2) + } + + // IDs should increase when nonce increases (with same timestamp) + id3 := getIdFromParams(timestamp1, 1) + id4 := getIdFromParams(timestamp1, 2) + + if id4 <= id3 { + t.Errorf("ID should increase with nonce: id3=%d, id4=%d", id3, id4) + } +} diff --git a/internal/v3/handler/handler.go b/internal/v3/handler/handler.go index ab8d41a9..3cd627c7 100644 --- a/internal/v3/handler/handler.go +++ b/internal/v3/handler/handler.go @@ -16,15 +16,15 @@ import ( "time" - "github.com/google/uuid" "github.com/labstack/echo/v4" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" - + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/config" handler_common "github.com/ton-connect/bridge/internal/handler" "github.com/ton-connect/bridge/internal/models" + "github.com/ton-connect/bridge/internal/ntp" "github.com/ton-connect/bridge/internal/utils" storagev3 "github.com/ton-connect/bridge/internal/v3/storage" ) @@ -72,17 +72,21 @@ type handler struct { eventIDGen *EventIDGenerator heartbeatInterval time.Duration realIP *utils.RealIPExtractor + eventCollector analytics.EventCollector + eventBuilder analytics.EventBuilder } -func NewHandler(s storagev3.Storage, heartbeatInterval time.Duration, extractor *utils.RealIPExtractor) *handler { +func NewHandler(s storagev3.Storage, heartbeatInterval time.Duration, extractor *utils.RealIPExtractor, timeProvider ntp.TimeProvider, collector analytics.EventCollector, builder analytics.EventBuilder) *handler { // TODO support extractor in v3 h := handler{ Mux: sync.RWMutex{}, Connections: make(map[string]*stream), storage: s, - eventIDGen: NewEventIDGenerator(), + eventIDGen: NewEventIDGenerator(timeProvider), realIP: extractor, heartbeatInterval: heartbeatInterval, + eventCollector: collector, + eventBuilder: builder, } return &h } @@ -107,6 +111,13 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { c.Response().Flush() params := c.QueryParams() + traceIdParam, ok := params["trace_id"] + traceIdValue := "" + if ok && len(traceIdParam) > 0 { + traceIdValue = traceIdParam[0] + } + traceId := handler_common.ParseOrGenerateTraceID(traceIdValue, ok && len(traceIdParam) > 0) + heartbeatType := "legacy" if heartbeatParam, exists := params["heartbeat"]; exists && len(heartbeatParam) > 0 { heartbeatType = heartbeatParam[0] @@ -117,6 +128,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "invalid heartbeat type. Supported: legacy and message" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, "events/heartbeat") return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } @@ -129,6 +141,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "Last-Event-ID should be int" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, "events/last-event-id-header") return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } } @@ -139,6 +152,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "last_event_id should be int" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, "events/last-event-id-query") return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } } @@ -147,11 +161,14 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "param \"client_id\" not present" log.Error(errorMsg) + h.logEventRegistrationValidationFailure("", traceId, "events/missing-client-id") return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) } + clientIds := strings.Split(clientId[0], ",") clientIdsPerConnectionMetric.Observe(float64(len(clientIds))) - session := h.CreateSession(clientIds, lastEventId) + + session := h.CreateSession(clientIds, lastEventId, traceId) // Track connection for verification if len(clientIds) > 0 { @@ -177,7 +194,7 @@ func (h *handler) EventRegistrationHandler(c echo.Context) error { go func() { <-notify session.Close() - h.removeConnection(session) + h.removeConnection(session, traceId) log.Infof("connection: %v closed with error %v", session.ClientIds, ctx.Err()) }() ticker := time.NewTicker(h.heartbeatInterval) @@ -200,6 +217,7 @@ loop: fromId := "unknown" toId := msg.To + traceID := "" hash := sha256.Sum256(msg.Message) messageHash := hex.EncodeToString(hash[:]) @@ -207,6 +225,7 @@ loop: var bridgeMsg models.BridgeMessage if err := json.Unmarshal(msg.Message, &bridgeMsg); err == nil { fromId = bridgeMsg.From + traceID = bridgeMsg.TraceId contentHash := sha256.Sum256([]byte(bridgeMsg.Message)) messageHash = hex.EncodeToString(contentHash[:]) } @@ -216,9 +235,17 @@ loop: "from": fromId, "to": toId, "event_id": msg.EventId, - "trace_id": bridgeMsg.TraceId, + "trace_id": traceID, }).Debug("message sent") + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageSentEvent( + msg.To, + traceID, + msg.EventId, + messageHash, + )) + } deliveredMessagesMetric.Inc() storagev3.ExpiredCache.Mark(msg.EventId) case <-ticker.C: @@ -239,20 +266,29 @@ func (h *handler) SendMessageHandler(c echo.Context) error { log := logrus.WithContext(ctx).WithField("prefix", "SendMessageHandler") params := c.QueryParams() - clientId, ok := params["client_id"] + + traceIdParam, ok := params["trace_id"] + traceIdValue := "" + if ok && len(traceIdParam) > 0 { + traceIdValue = traceIdParam[0] + } + traceId := handler_common.ParseOrGenerateTraceID(traceIdValue, ok && len(traceIdParam) > 0) + + clientIdValues, ok := params["client_id"] if !ok { badRequestMetric.Inc() errorMsg := "param \"client_id\" not present" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.failValidation(c, errorMsg, "", traceId, "", "") } + clientID := clientIdValues[0] toId, ok := params["to"] if !ok { badRequestMetric.Inc() errorMsg := "param \"to\" not present" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.failValidation(c, errorMsg, clientID, traceId, "", "") } ttlParam, ok := params["ttl"] @@ -260,26 +296,27 @@ func (h *handler) SendMessageHandler(c echo.Context) error { badRequestMetric.Inc() errorMsg := "param \"ttl\" not present" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.failValidation(c, errorMsg, clientID, traceId, "", "") } ttl, err := strconv.ParseInt(ttlParam[0], 10, 32) if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) + return h.failValidation(c, err.Error(), clientID, traceId, "", "") } if ttl > 300 { // TODO: config MaxTTL value badRequestMetric.Inc() errorMsg := "param \"ttl\" too high" log.Error(errorMsg) - return c.JSON(utils.HttpResError(errorMsg, http.StatusBadRequest)) + return h.failValidation(c, errorMsg, clientID, traceId, "", "") } message, err := io.ReadAll(c.Request().Body) if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) + return h.failValidation(c, err.Error(), clientID, traceId, "", "") } + if config.Config.CopyToURL != "" { go func() { u, err := url.Parse(config.Config.CopyToURL) @@ -294,44 +331,24 @@ func (h *handler) SendMessageHandler(c echo.Context) error { http.DefaultClient.Do(req) //nolint:errcheck// TODO review golangci-lint issue }() } - topic, ok := params["topic"] + topicParam, ok := params["topic"] + topic := "" if ok { + topic = topicParam[0] go func(clientID, topic, message string) { handler_common.SendWebhook(clientID, handler_common.WebhookData{Topic: topic, Hash: message}) - }(clientId[0], topic[0], string(message)) - } - - traceIdParam, ok := params["trace_id"] - traceId := "unknown" - if ok { - uuids, err := uuid.Parse(traceIdParam[0]) - if err != nil { - log.WithFields(logrus.Fields{ - "error": err, - "invalid_trace_id": traceIdParam[0], - }).Warn("generating a new trace_id") - } else { - traceId = uuids.String() - } - } - if traceId == "unknown" { - uuids, err := uuid.NewV7() - if err != nil { - log.Error(err) - } else { - traceId = uuids.String() - } + }(clientID, topic, string(message)) } mes, err := json.Marshal(models.BridgeMessage{ - From: clientId[0], + From: clientID, Message: string(message), TraceId: traceId, }) if err != nil { badRequestMetric.Inc() log.Error(err) - return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) + return h.failValidation(c, err.Error(), clientID, traceId, topic, "") } sseMessage := models.SseMessage{ @@ -367,9 +384,19 @@ func (h *handler) SendMessageHandler(c echo.Context) error { "from": fromId, "to": toId[0], "event_id": sseMessage.EventId, - "trace_id": bridgeMsg.TraceId, + "trace_id": traceId, }).Debug("message received") + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageReceivedEvent( + clientID, + traceId, + topic, + sseMessage.EventId, + messageHash, + )) + } + transferedMessagesNumMetric.Inc() return c.JSON(http.StatusOK, utils.HttpResOk()) } @@ -385,17 +412,44 @@ func (h *handler) ConnectVerifyHandler(c echo.Context) error { paramsStore, err := handler_common.NewParamsStorage(c, config.Config.MaxBodySize) if err != nil { badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + "", + "", + http.StatusBadRequest, + err.Error(), + )) + } return c.JSON(utils.HttpResError(err.Error(), http.StatusBadRequest)) } + traceIdParam, ok := paramsStore.Get("trace_id") + traceId := handler_common.ParseOrGenerateTraceID(traceIdParam, ok) + clientId, ok := paramsStore.Get("client_id") if !ok { badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + "", + traceId, + http.StatusBadRequest, + "param \"client_id\" not present", + )) + } return c.JSON(utils.HttpResError("param \"client_id\" not present", http.StatusBadRequest)) } urlParam, ok := paramsStore.Get("url") if !ok { badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + clientId, + traceId, + http.StatusBadRequest, + "param \"url\" not present", + )) + } return c.JSON(utils.HttpResError("param \"url\" not present", http.StatusBadRequest)) } qtype, ok := paramsStore.Get("type") @@ -412,16 +466,35 @@ func (h *handler) ConnectVerifyHandler(c echo.Context) error { } status, err := h.storage.VerifyConnection(ctx, conn) if err != nil { + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + clientId, + traceId, + http.StatusInternalServerError, + err.Error(), + )) + } return c.JSON(utils.HttpResError(err.Error(), http.StatusInternalServerError)) } + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyEvent(clientId, traceId, status)) + } return c.JSON(http.StatusOK, verifyResponse{Status: status}) default: badRequestMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeVerifyValidationFailedEvent( + clientId, + traceId, + http.StatusBadRequest, + "param \"type\" must be: connect", + )) + } return c.JSON(utils.HttpResError("param \"type\" must be: connect", http.StatusBadRequest)) } } -func (h *handler) removeConnection(ses *Session) { +func (h *handler) removeConnection(ses *Session, traceID string) { log := logrus.WithField("prefix", "removeConnection") log.Infof("remove session: %v", ses.ClientIds) for _, id := range ses.ClientIds { @@ -448,10 +521,13 @@ func (h *handler) removeConnection(ses *Session) { h.Mux.Unlock() } activeSubscriptionsMetric.Dec() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeEventsClientUnsubscribedEvent(id, traceID)) + } } } -func (h *handler) CreateSession(clientIds []string, lastEventId int64) *Session { +func (h *handler) CreateSession(clientIds []string, lastEventId int64, traceID string) *Session { log := logrus.WithField("prefix", "CreateSession") log.Infof("make new session with ids: %v", clientIds) session := NewSession(h.storage, clientIds, lastEventId) @@ -474,6 +550,40 @@ func (h *handler) CreateSession(clientIds []string, lastEventId int64) *Session } activeSubscriptionsMetric.Inc() + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeEventsClientSubscribedEvent(id, traceID)) + } } return session } + +func (h *handler) logEventRegistrationValidationFailure(clientID, traceID, requestType string) { + if h.eventCollector == nil { + return + } + h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageValidationFailedEvent( + clientID, + traceID, + requestType, + "", + )) +} + +func (h *handler) failValidation( + c echo.Context, + msg string, + clientID string, + traceID string, + topic string, + messageHash string, +) error { + if h.eventCollector != nil { + _ = h.eventCollector.TryAdd(h.eventBuilder.NewBridgeMessageValidationFailedEvent( + clientID, + traceID, + topic, + messageHash, + )) + } + return c.JSON(utils.HttpResError(msg, http.StatusBadRequest)) +} diff --git a/internal/v3/storage/mem.go b/internal/v3/storage/mem.go index 35aabedc..17c5a36b 100644 --- a/internal/v3/storage/mem.go +++ b/internal/v3/storage/mem.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/models" ) @@ -20,10 +21,12 @@ var expiredMessagesMetric = promauto.NewCounter(prometheus.CounterOpts{ }) type MemStorage struct { - db map[string][]message - subscribers map[string][]chan<- models.SseMessage - connections map[string][]memConnection // clientID -> connections - lock sync.Mutex + db map[string][]message + subscribers map[string][]chan<- models.SseMessage + connections map[string][]memConnection // clientID -> connections + lock sync.Mutex + analytics analytics.EventCollector + eventBuilder analytics.EventBuilder } type message struct { @@ -42,22 +45,41 @@ func (m message) IsExpired(now time.Time) bool { return m.expireAt.Before(now) } -func NewMemStorage() *MemStorage { +func NewMemStorage(collector analytics.EventCollector, builder analytics.EventBuilder) *MemStorage { s := MemStorage{ - db: map[string][]message{}, - subscribers: make(map[string][]chan<- models.SseMessage), - connections: make(map[string][]memConnection), + db: map[string][]message{}, + subscribers: make(map[string][]chan<- models.SseMessage), + connections: make(map[string][]memConnection), + analytics: collector, + eventBuilder: builder, } go s.watcher() return &s } -func removeExpiredMessages(ms []message, now time.Time, clientID string) []message { - log := logrus.WithField("prefix", "removeExpiredMessages") +func removeExpiredMessages(ms []message, now time.Time) ([]message, []message) { results := make([]message, 0) + expired := make([]message, 0) for _, m := range ms { if m.IsExpired(now) { if !ExpiredCache.IsMarked(m.EventId) { + expired = append(expired, m) + } + } else { + results = append(results, m) + } + } + return results, expired +} + +func (s *MemStorage) watcher() { + for { + s.lock.Lock() + for key, msgs := range s.db { + actual, expired := removeExpiredMessages(msgs, time.Now()) + s.db[key] = actual + + for _, m := range expired { var bridgeMsg models.BridgeMessage fromID := "unknown" hash := sha256.Sum256(m.Message) @@ -68,28 +90,22 @@ func removeExpiredMessages(ms []message, now time.Time, clientID string) []messa contentHash := sha256.Sum256([]byte(bridgeMsg.Message)) messageHash = hex.EncodeToString(contentHash[:]) } - expiredMessagesMetric.Inc() - log.WithFields(map[string]interface{}{ + logrus.WithFields(map[string]interface{}{ "hash": messageHash, "from": fromID, - "to": clientID, + "to": key, "event_id": m.EventId, "trace_id": bridgeMsg.TraceId, }).Debug("message expired") - } - } else { - results = append(results, m) - } - } - return results -} -func (s *MemStorage) watcher() { - for { - s.lock.Lock() - for key, ms := range s.db { - s.db[key] = removeExpiredMessages(ms, time.Now(), key) + _ = s.analytics.TryAdd(s.eventBuilder.NewBridgeMessageExpiredEvent( + key, + bridgeMsg.TraceId, + m.EventId, + messageHash, + )) + } } s.lock.Unlock() time.Sleep(time.Second) diff --git a/internal/v3/storage/mem_test.go b/internal/v3/storage/mem_test.go index 1816fae2..1af31521 100644 --- a/internal/v3/storage/mem_test.go +++ b/internal/v3/storage/mem_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/ton-connect/bridge/internal/analytics" + "github.com/ton-connect/bridge/internal/config" "github.com/ton-connect/bridge/internal/models" ) @@ -55,7 +57,7 @@ func Test_removeExpiredMessages(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := removeExpiredMessages(tt.ms, tt.now, "test-key"); !reflect.DeepEqual(got, tt.want) { + if got, _ := removeExpiredMessages(tt.ms, tt.now); !reflect.DeepEqual(got, tt.want) { t.Errorf("removeExpiredMessages() = %v, want %v", got, tt.want) } }) @@ -101,7 +103,12 @@ func TestMemStorage_watcher(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := &MemStorage{db: tt.db} + builder := analytics.NewEventBuilder("http://test", "test", "bridge", "1.0.0", "-239") + s := &MemStorage{ + db: tt.db, + analytics: analytics.NewCollector(10, nil, 0), + eventBuilder: builder, + } go s.watcher() time.Sleep(500 * time.Millisecond) s.lock.Lock() @@ -115,7 +122,8 @@ func TestMemStorage_watcher(t *testing.T) { } func TestMemStorage_PubSub(t *testing.T) { - s := NewMemStorage() + builder := analytics.NewEventBuilder(config.Config.TonAnalyticsBridgeURL, "bridge", "bridge", config.Config.TonAnalyticsBridgeVersion, config.Config.TonAnalyticsNetworkId) + s := NewMemStorage(analytics.NewCollector(10, nil, 0), builder) // Create channels to receive messages ch1 := make(chan models.SseMessage, 10) @@ -196,7 +204,8 @@ func TestMemStorage_PubSub(t *testing.T) { } func TestMemStorage_LastEventId(t *testing.T) { - s := NewMemStorage() + builder := analytics.NewEventBuilder(config.Config.TonAnalyticsBridgeURL, "bridge", "bridge", config.Config.TonAnalyticsBridgeVersion, config.Config.TonAnalyticsNetworkId) + s := NewMemStorage(analytics.NewCollector(10, nil, 0), builder) // Store some messages first _ = s.Pub(context.Background(), models.SseMessage{EventId: 1, To: "1"}, 60) diff --git a/internal/v3/storage/storage.go b/internal/v3/storage/storage.go index 10b7faea..0a246207 100644 --- a/internal/v3/storage/storage.go +++ b/internal/v3/storage/storage.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/ton-connect/bridge/internal/analytics" "github.com/ton-connect/bridge/internal/config" "github.com/ton-connect/bridge/internal/models" common_storage "github.com/ton-connect/bridge/internal/storage" @@ -35,14 +36,14 @@ type Storage interface { HealthCheck() error } -func NewStorage(storageType string, uri string) (Storage, error) { +func NewStorage(storageType string, uri string, collector analytics.EventCollector, builder analytics.EventBuilder) (Storage, error) { switch storageType { case "valkey", "redis": return NewValkeyStorage(uri) case "postgres": return nil, fmt.Errorf("postgres storage does not support pub-sub functionality yet") case "memory": - return NewMemStorage(), nil + return NewMemStorage(collector, builder), nil default: return nil, fmt.Errorf("unsupported storage type: %s", storageType) } diff --git a/test/gointegration/analytics_mock.go b/test/gointegration/analytics_mock.go new file mode 100644 index 00000000..e905be51 --- /dev/null +++ b/test/gointegration/analytics_mock.go @@ -0,0 +1,109 @@ +package bridge_test + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync" + + "github.com/sirupsen/logrus" +) + +// AnalyticsMock is a mock analytics server for testing +type AnalyticsMock struct { + Server *httptest.Server + mu sync.RWMutex + receivedEvents []map[string]interface{} + totalEvents int +} + +// NewAnalyticsMock creates a new mock analytics server +func NewAnalyticsMock() *AnalyticsMock { + mock := &AnalyticsMock{ + receivedEvents: make([]map[string]interface{}, 0), + } + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if r.URL.Path != "/events" { + w.WriteHeader(http.StatusNotFound) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + logrus.Errorf("analytics mock: failed to read body: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + var events []map[string]interface{} + if err := json.Unmarshal(body, &events); err != nil { + logrus.Errorf("analytics mock: failed to unmarshal events: %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + mock.mu.Lock() + mock.receivedEvents = append(mock.receivedEvents, events...) + mock.totalEvents += len(events) + mock.mu.Unlock() + + logrus.Infof("analytics mock: received batch of %d events (total: %d)", len(events), mock.totalEvents) + + // Return 202 Accepted like the real analytics server + w.WriteHeader(http.StatusAccepted) + }) + + mock.Server = httptest.NewServer(handler) + return mock +} + +// Close shuts down the mock server +func (m *AnalyticsMock) Close() { + m.Server.Close() +} + +// GetEvents returns all received events +func (m *AnalyticsMock) GetEvents() []map[string]interface{} { + m.mu.RLock() + defer m.mu.RUnlock() + + events := make([]map[string]interface{}, len(m.receivedEvents)) + copy(events, m.receivedEvents) + return events +} + +// GetEventCount returns the total number of events received +func (m *AnalyticsMock) GetEventCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + return m.totalEvents +} + +// GetEventsByName returns events filtered by event_name +func (m *AnalyticsMock) GetEventsByName(eventName string) []map[string]interface{} { + m.mu.RLock() + defer m.mu.RUnlock() + + filtered := make([]map[string]interface{}, 0) + for _, event := range m.receivedEvents { + if name, ok := event["event_name"].(string); ok && name == eventName { + filtered = append(filtered, event) + } + } + return filtered +} + +// Reset clears all received events +func (m *AnalyticsMock) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.receivedEvents = make([]map[string]interface{}, 0) + m.totalEvents = 0 +} diff --git a/test/gointegration/bridge_analytics_test.go b/test/gointegration/bridge_analytics_test.go new file mode 100644 index 00000000..806f50b2 --- /dev/null +++ b/test/gointegration/bridge_analytics_test.go @@ -0,0 +1,191 @@ +package bridge_test + +import ( + "context" + "os" + "testing" + "time" +) + +// TestBridgeAnalytics_EventsSentToMockServer verifies that analytics events +// are sent to the configured analytics endpoint during bridge operations +func TestBridgeAnalytics_EventsSentToMockServer(t *testing.T) { + // Check if analytics is enabled in test environment + analyticsEnabled := os.Getenv("TON_ANALYTICS_ENABLED") + if analyticsEnabled != "true" { + t.Skip("Analytics not enabled, set TON_ANALYTICS_ENABLED=true") + } + + // Create mock analytics server + mockServer := NewAnalyticsMock() + defer mockServer.Close() + + t.Logf("Mock analytics server running at: %s", mockServer.Server.URL) + t.Logf("Note: To use this mock, set TON_ANALYTICS_URL=%s/events", mockServer.Server.URL) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Create a session and connect + session := randomSessionID(t) + gw, err := OpenBridge(ctx, OpenOpts{ + BridgeURL: BRIDGE_URL, + SessionID: session, + OriginURL: "https://example.com", + }) + if err != nil { + t.Fatalf("open bridge: %v", err) + } + defer func() { _ = gw.Close() }() + + if err := gw.WaitReady(ctx); err != nil { + t.Fatalf("gateway not ready: %v", err) + } + + // Perform verify operation (wait a bit for connection to register) + time.Sleep(500 * time.Millisecond) + _, _, err = callVerifyEndpoint(t, BRIDGE_URL, session, "https://example.com", "connect") + if err != nil { + t.Logf("verify returned error (this may be expected): %v", err) + } + + // Close the connection + _ = gw.Close() + + // Wait a bit for analytics to be flushed + time.Sleep(2 * time.Second) + + // Check that we received analytics events + eventCount := mockServer.GetEventCount() + t.Logf("Mock server received %d analytics events", eventCount) + + if eventCount == 0 { + t.Log("No events received. This is expected if TON_ANALYTICS_URL is not set to point to the mock server.") + t.Log("To properly test analytics, rebuild bridge with TON_ANALYTICS_URL pointing to the mock server.") + } + + // Log event types received + allEvents := mockServer.GetEvents() + eventTypes := make(map[string]int) + for _, event := range allEvents { + if eventName, ok := event["event_name"].(string); ok { + eventTypes[eventName]++ + } + } + + t.Log("Events received by type:") + for eventType, count := range eventTypes { + t.Logf(" %s: %d", eventType, count) + } + + // Check for specific expected events + subscribedEvents := mockServer.GetEventsByName("bridge-events-client-subscribed") + if len(subscribedEvents) > 0 { + t.Logf("Found %d 'bridge-events-client-subscribed' events", len(subscribedEvents)) + // Verify event structure + event := subscribedEvents[0] + if clientID, ok := event["client_id"].(string); ok { + t.Logf(" client_id: %s", clientID) + } + if subsystem, ok := event["subsystem"].(string); ok { + t.Logf(" subsystem: %s", subsystem) + } + } + + verifyEvents := mockServer.GetEventsByName("bridge-verify") + if len(verifyEvents) > 0 { + t.Logf("Found %d 'bridge-verify' events", len(verifyEvents)) + } + + unsubscribedEvents := mockServer.GetEventsByName("bridge-events-client-unsubscribed") + if len(unsubscribedEvents) > 0 { + t.Logf("Found %d 'bridge-events-client-unsubscribed' events", len(unsubscribedEvents)) + } +} + +// TestBridgeAnalytics_MessageLifecycle tests that message lifecycle events are tracked +func TestBridgeAnalytics_MessageLifecycle(t *testing.T) { + analyticsEnabled := os.Getenv("TON_ANALYTICS_ENABLED") + if analyticsEnabled != "true" { + t.Skip("Analytics not enabled, set TON_ANALYTICS_ENABLED=true") + } + + mockServer := NewAnalyticsMock() + defer mockServer.Close() + + t.Logf("Mock analytics server running at: %s", mockServer.Server.URL) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Create sender and receiver + senderSession := randomSessionID(t) + sender, err := OpenBridge(ctx, OpenOpts{ + BridgeURL: BRIDGE_URL, + SessionID: senderSession, + }) + if err != nil { + t.Fatalf("open sender: %v", err) + } + defer func() { _ = sender.Close() }() + + receiverSession := randomSessionID(t) + receiver, err := OpenBridge(ctx, OpenOpts{ + BridgeURL: BRIDGE_URL, + SessionID: receiverSession, + }) + if err != nil { + t.Fatalf("open receiver: %v", err) + } + defer func() { _ = receiver.Close() }() + + if err := sender.WaitReady(ctx); err != nil { + t.Fatalf("sender not ready: %v", err) + } + if err := receiver.WaitReady(ctx); err != nil { + t.Fatalf("receiver not ready: %v", err) + } + + // Send a message + if err := sender.Send(ctx, []byte("test message"), senderSession, receiverSession, nil); err != nil { + t.Fatalf("send message: %v", err) + } + + // Receive the message + ev, err := receiver.WaitMessage(ctx) + if err != nil { + t.Fatalf("wait message: %v", err) + } + t.Logf("Received message with ID: %s", ev.ID) + + // Close connections + _ = sender.Close() + _ = receiver.Close() + + // Wait for analytics flush + time.Sleep(3 * time.Second) + + eventCount := mockServer.GetEventCount() + t.Logf("Mock server received %d total analytics events", eventCount) + + // Log all event types + allEvents := mockServer.GetEvents() + eventTypes := make(map[string]int) + for _, event := range allEvents { + if eventName, ok := event["event_name"].(string); ok { + eventTypes[eventName]++ + } + } + + t.Log("Events received by type:") + for eventType, count := range eventTypes { + t.Logf(" %s: %d", eventType, count) + } + + // Note: Actual event validation would require the bridge to be configured + // with TON_ANALYTICS_URL pointing to this mock server + if eventCount == 0 { + t.Log("Note: To test analytics properly, rebuild bridge container with:") + t.Logf(" TON_ANALYTICS_URL=%s/events", mockServer.Server.URL) + } +} diff --git a/test/gointegration/bridge_gateway_test.go b/test/gointegration/bridge_gateway_test.go index 4a680d2d..3465e6f4 100644 --- a/test/gointegration/bridge_gateway_test.go +++ b/test/gointegration/bridge_gateway_test.go @@ -545,7 +545,7 @@ func TestBridge_ReceiveMessageAgainAfterReconnectWithValidLastEventID(t *testing } func TestBridge_NoDeliveryAfterReconnectWithFutureLastEventID(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*testSSETimeout) + ctx, cancel := context.WithTimeout(context.Background(), 5*testSSETimeout) defer cancel() senderSession := randomSessionID(t) diff --git a/test/gointegration/bridge_provider_test.go b/test/gointegration/bridge_provider_test.go index 0962b01e..13504b3c 100644 --- a/test/gointegration/bridge_provider_test.go +++ b/test/gointegration/bridge_provider_test.go @@ -455,7 +455,7 @@ func TestBridgeProvider_ReceiveAfterReconnectWithLastEventID(t *testing.T) { if first.Method != "sendTransaction" || first.ID != "1" || first.Params[0] != "abc" { t.Fatalf("payload mismatch #1: %+v", first) } - case <-time.After(3 * time.Second): + case <-time.After(10 * time.Second): t.Fatal("timeout waiting event #1") } diff --git a/tonmetrics/analytics.go b/tonmetrics/analytics.go index 028ef99d..ed0fa824 100644 --- a/tonmetrics/analytics.go +++ b/tonmetrics/analytics.go @@ -2,6 +2,7 @@ package tonmetrics import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -11,102 +12,106 @@ import ( "github.com/ton-connect/bridge/internal/config" ) -const ( - analyticsURL = "https://analytics.ton.org/events/" -) - // AnalyticsClient defines the interface for analytics clients type AnalyticsClient interface { - SendEvent(event interface{}) + SendBatch(ctx context.Context, events []interface{}) error } // TonMetricsClient handles sending analytics events type TonMetricsClient struct { - client *http.Client + client *http.Client + analyticsURL string + bridgeURL string + environment string + subsystem string + version string + networkId string } // NewAnalyticsClient creates a new analytics client func NewAnalyticsClient() AnalyticsClient { - if !config.Config.TFAnalyticsEnabled { - return &NoopMetricsClient{} + configuredAnalyticsURL := config.Config.TonAnalyticsURL + if !config.Config.TONAnalyticsEnabled { + return NewNoopMetricsClient(configuredAnalyticsURL) + } + if config.Config.TonAnalyticsNetworkId != "-239" && config.Config.TonAnalyticsNetworkId != "-3" { + logrus.Fatalf("invalid NETWORK_ID '%s'. Allowed values: -239 (mainnet) and -3 (testnet)", config.Config.TonAnalyticsNetworkId) } return &TonMetricsClient{ - client: http.DefaultClient, + client: http.DefaultClient, + analyticsURL: configuredAnalyticsURL, + bridgeURL: config.Config.TonAnalyticsBridgeURL, + subsystem: "bridge", + environment: "bridge", + version: config.Config.TonAnalyticsBridgeVersion, + networkId: config.Config.TonAnalyticsNetworkId, } } -// sendEvent sends an event to the analytics endpoint -func (a *TonMetricsClient) SendEvent(event interface{}) { - log := logrus.WithField("prefix", "analytics") - analyticsData, err := json.Marshal(event) +// SendBatch sends a batch of events to the analytics endpoint in a single HTTP request. +func (a *TonMetricsClient) SendBatch(ctx context.Context, events []interface{}) error { + return a.send(ctx, events, a.analyticsURL, "analytics") +} + +func (a *TonMetricsClient) send(ctx context.Context, events []interface{}, endpoint string, prefix string) error { + if len(events) == 0 { + return nil + } + + log := logrus.WithField("prefix", prefix) + + log.Debugf("preparing to send analytics batch of %d events to %s", len(events), endpoint) + + analyticsData, err := json.Marshal(events) if err != nil { - log.Errorf("failed to marshal analytics data: %v", err) + return fmt.Errorf("failed to marshal analytics batch: %w", err) } - req, err := http.NewRequest(http.MethodPost, analyticsURL, bytes.NewReader(analyticsData)) + log.Debugf("marshaled analytics data size: %d bytes", len(analyticsData)) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(analyticsData)) if err != nil { - log.Errorf("failed to create analytics request: %v", err) + return fmt.Errorf("failed to create analytics request: %w", err) } req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Client-Timestamp", fmt.Sprintf("%d", time.Now().Unix())) + + log.Debugf("sending analytics batch request to %s", endpoint) - _, err = a.client.Do(req) + resp, err := a.client.Do(req) if err != nil { - log.Errorf("failed to send analytics request: %v", err) + return fmt.Errorf("failed to send analytics batch: %w", err) + } + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + log.Errorf("failed to close response body: %v", closeErr) + } + }() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("analytics batch request to %s returned status %d", endpoint, resp.StatusCode) } - // log.Debugf("analytics request sent successfully: %s", string(analyticsData)) + log.Debugf("analytics batch of %d events sent successfully with status %d", len(events), resp.StatusCode) + return nil } -// CreateBridgeRequestReceivedEvent creates a BridgeClientMessageReceivedEvent with common fields populated -func CreateBridgeRequestReceivedEvent(bridgeURL, clientID, traceID, environment, version, networkId string, eventID int64) BridgeClientMessageReceivedEvent { - timestamp := int(time.Now().Unix()) - eventName := BridgeClientMessageReceivedEventEventNameBridgeClientMessageReceived - subsystem := BridgeClientMessageReceivedEventSubsystemBridge - clientEnv := BridgeClientMessageReceivedEventClientEnvironment(environment) - eventIDStr := fmt.Sprintf("%d", eventID) - - return BridgeClientMessageReceivedEvent{ - BridgeUrl: &bridgeURL, - ClientEnvironment: &clientEnv, - ClientId: &clientID, - ClientTimestamp: ×tamp, - EventId: &eventIDStr, - EventName: &eventName, - NetworkId: &networkId, - Subsystem: &subsystem, - TraceId: &traceID, - Version: &version, - } +// NoopMetricsClient forwards analytics to a mock endpoint when analytics are disabled. +type NoopMetricsClient struct { + client *http.Client + mockURL string } -// CreateBridgeRequestSentEvent creates a BridgeRequestSentEvent with common fields populated -func CreateBridgeRequestSentEvent(bridgeURL, clientID, traceID, requestType, environment, version, networkId string, eventID int64) BridgeRequestSentEvent { - timestamp := int(time.Now().Unix()) - eventName := BridgeRequestSentEventEventNameBridgeClientMessageSent - subsystem := BridgeRequestSentEventSubsystemBridge - clientEnv := BridgeRequestSentEventClientEnvironment(environment) - eventIDStr := fmt.Sprintf("%d", eventID) - - return BridgeRequestSentEvent{ - BridgeUrl: &bridgeURL, - ClientEnvironment: &clientEnv, - ClientId: &clientID, - ClientTimestamp: ×tamp, - EventId: &eventIDStr, - EventName: &eventName, - NetworkId: &networkId, - RequestType: &requestType, - Subsystem: &subsystem, - TraceId: &traceID, - Version: &version, +// NewNoopMetricsClient builds a mock metrics client to help integration tests capture analytics payloads. +func NewNoopMetricsClient(mockURL string) *NoopMetricsClient { + return &NoopMetricsClient{ + client: http.DefaultClient, + mockURL: mockURL, } } -// NoopMetricsClient does nothing when analytics are disabled -type NoopMetricsClient struct{} - -// SendEvent does nothing for NoopMetricsClient -func (n *NoopMetricsClient) SendEvent(event interface{}) { - // No-op +// SendBatch forwards analytics to the configured mock endpoint to aid testing. +func (n *NoopMetricsClient) SendBatch(ctx context.Context, events []interface{}) error { + return nil } diff --git a/tonmetrics/bridge_events.gen.go b/tonmetrics/bridge_events.gen.go index aede8f2f..a33ae1a7 100644 --- a/tonmetrics/bridge_events.gen.go +++ b/tonmetrics/bridge_events.gen.go @@ -179,8 +179,8 @@ const ( // Defines values for BridgeMessageReceivedEventEventName. const ( - BridgeMessageReceivedEventEventNameEmpty BridgeMessageReceivedEventEventName = "" - BridgeMessageReceivedEventEventNameWalletConnectRequestReceived BridgeMessageReceivedEventEventName = "wallet-connect-request-received" + BridgeMessageReceivedEventEventNameBridgeMessageReceived BridgeMessageReceivedEventEventName = "bridge-message-received" + BridgeMessageReceivedEventEventNameEmpty BridgeMessageReceivedEventEventName = "" ) // Defines values for BridgeMessageReceivedEventSubsystem. @@ -263,8 +263,8 @@ const ( // Defines values for BridgeVerifyEventEventName. const ( - BridgeEventsClientUnsubscribed BridgeVerifyEventEventName = "bridge-events-client-unsubscribed" - Empty BridgeVerifyEventEventName = "" + BridgeVerifyEventEventNameBridgeVerify BridgeVerifyEventEventName = "bridge-verify" + BridgeVerifyEventEventNameEmpty BridgeVerifyEventEventName = "" ) // Defines values for BridgeVerifyEventSubsystem. @@ -276,6 +276,37 @@ const ( BridgeVerifyEventSubsystemWalletSdk BridgeVerifyEventSubsystem = "wallet-sdk" ) +// Defines values for BridgeVerifyEventVerifyType. +const ( + BridgeVerifyEventVerifyTypeConnect BridgeVerifyEventVerifyType = "connect" +) + +// Defines values for BridgeVerifyValidationFailedEventClientEnvironment. +const ( + BridgeVerifyValidationFailedEventClientEnvironmentBridge BridgeVerifyValidationFailedEventClientEnvironment = "bridge" + BridgeVerifyValidationFailedEventClientEnvironmentEmpty BridgeVerifyValidationFailedEventClientEnvironment = "" +) + +// Defines values for BridgeVerifyValidationFailedEventEventName. +const ( + BridgeVerifyValidationFailed BridgeVerifyValidationFailedEventEventName = "bridge-verify-validation-failed" + Empty BridgeVerifyValidationFailedEventEventName = "" +) + +// Defines values for BridgeVerifyValidationFailedEventSubsystem. +const ( + Bridge BridgeVerifyValidationFailedEventSubsystem = "bridge" + Dapp BridgeVerifyValidationFailedEventSubsystem = "dapp" + DappSdk BridgeVerifyValidationFailedEventSubsystem = "dapp-sdk" + Wallet BridgeVerifyValidationFailedEventSubsystem = "wallet" + WalletSdk BridgeVerifyValidationFailedEventSubsystem = "wallet-sdk" +) + +// Defines values for BridgeVerifyValidationFailedEventVerifyType. +const ( + BridgeVerifyValidationFailedEventVerifyTypeConnect BridgeVerifyValidationFailedEventVerifyType = "connect" +) + // BridgeClientConnectErrorEvent The event is fired on SSE connection failure or unexpected disconnect. type BridgeClientConnectErrorEvent struct { // BridgeUrl Bridge URL. @@ -679,6 +710,9 @@ type BridgeMessageReceivedEvent struct { // ClientTimestamp The timestamp of the event on the client side, in Unix time (stored as an integer). ClientTimestamp *int `json:"client_timestamp,omitempty"` + // EncryptedMessageHash Bridge encrypted message hash. + EncryptedMessageHash *string `json:"encrypted_message_hash,omitempty"` + // EventId Unique random event UUID generated by the sender. Used for deduplication on the backend side. EventId *string `json:"event_id,omitempty"` EventName *BridgeMessageReceivedEventEventName `json:"event_name,omitempty"` @@ -907,7 +941,8 @@ type BridgeVerifyEvent struct { UserId *string `json:"user_id,omitempty"` // VerificationResult Status of verification. - VerificationResult *string `json:"verification_result,omitempty"` + VerificationResult *string `json:"verification_result,omitempty"` + VerifyType *BridgeVerifyEventVerifyType `json:"verify_type,omitempty"` // Version The version of the sending subsystem. Version *string `json:"version,omitempty"` @@ -922,6 +957,62 @@ type BridgeVerifyEventEventName string // BridgeVerifyEventSubsystem The subsystem used to collect the event (possible values: dapp, bridge, wallet). type BridgeVerifyEventSubsystem string +// BridgeVerifyEventVerifyType defines model for BridgeVerifyEvent.VerifyType. +type BridgeVerifyEventVerifyType string + +// BridgeVerifyValidationFailedEvent When the client sends a verification request to bridge events. +type BridgeVerifyValidationFailedEvent struct { + // BridgeUrl Bridge URL. + BridgeUrl *string `json:"bridge_url,omitempty"` + + // ClientEnvironment The client environment. + ClientEnvironment *BridgeVerifyValidationFailedEventClientEnvironment `json:"client_environment,omitempty"` + + // ClientId A unique session ID. + ClientId *string `json:"client_id,omitempty"` + + // ClientTimestamp The timestamp of the event on the client side, in Unix time (stored as an integer). + ClientTimestamp *int `json:"client_timestamp,omitempty"` + + // ErrorCode Error code. + ErrorCode *int `json:"error_code,omitempty"` + + // ErrorMessage Error message. + ErrorMessage *string `json:"error_message,omitempty"` + + // EventId Unique random event UUID generated by the sender. Used for deduplication on the backend side. + EventId *string `json:"event_id,omitempty"` + EventName *BridgeVerifyValidationFailedEventEventName `json:"event_name,omitempty"` + + // NetworkId Network id (-239 for the mainnet and -3 for the testnet). Other values should be rejected. + NetworkId *string `json:"network_id,omitempty"` + + // Subsystem The subsystem used to collect the event (possible values: dapp, bridge, wallet). + Subsystem *BridgeVerifyValidationFailedEventSubsystem `json:"subsystem,omitempty"` + + // TraceId ID to aggregate multiple events into one trace. UUIDv7 must be used (first 48 bits must be unix_ts_ms as in the specification). trace_id older than 24h won't be accepted. + TraceId *string `json:"trace_id,omitempty"` + + // UserId A unique identifier for the user (refer to subsystem session details for more information). May be omitted, in this case it will be generated on the backend side and generated. UUID must be used. + UserId *string `json:"user_id,omitempty"` + VerifyType *BridgeVerifyValidationFailedEventVerifyType `json:"verify_type,omitempty"` + + // Version The version of the sending subsystem. + Version *string `json:"version,omitempty"` +} + +// BridgeVerifyValidationFailedEventClientEnvironment The client environment. +type BridgeVerifyValidationFailedEventClientEnvironment string + +// BridgeVerifyValidationFailedEventEventName defines model for BridgeVerifyValidationFailedEvent.EventName. +type BridgeVerifyValidationFailedEventEventName string + +// BridgeVerifyValidationFailedEventSubsystem The subsystem used to collect the event (possible values: dapp, bridge, wallet). +type BridgeVerifyValidationFailedEventSubsystem string + +// BridgeVerifyValidationFailedEventVerifyType defines model for BridgeVerifyValidationFailedEvent.VerifyType. +type BridgeVerifyValidationFailedEventVerifyType string + // PostDummyBridgeClientConnectErrorEventJSONRequestBody defines body for PostDummyBridgeClientConnectErrorEvent for application/json ContentType. type PostDummyBridgeClientConnectErrorEventJSONRequestBody = BridgeClientConnectErrorEvent @@ -960,3 +1051,6 @@ type PostDummyBridgeRequestSentEventJSONRequestBody = BridgeRequestSentEvent // PostDummyBridgeVerifyEventJSONRequestBody defines body for PostDummyBridgeVerifyEvent for application/json ContentType. type PostDummyBridgeVerifyEventJSONRequestBody = BridgeVerifyEvent + +// PostDummyBridgeVerifyValidationFailedEventJSONRequestBody defines body for PostDummyBridgeVerifyValidationFailedEvent for application/json ContentType. +type PostDummyBridgeVerifyValidationFailedEventJSONRequestBody = BridgeVerifyValidationFailedEvent diff --git a/tonmetrics/swagger-tonconnect-bridge.json b/tonmetrics/swagger-tonconnect-bridge.json index 443e2d08..eea51dc0 100644 --- a/tonmetrics/swagger-tonconnect-bridge.json +++ b/tonmetrics/swagger-tonconnect-bridge.json @@ -667,6 +667,12 @@ "type": "integer", "example": 0 }, + "encrypted_message_hash": { + "description": "Bridge encrypted message hash.", + "type": "string", + "format": "base64", + "example": "ZXhhbXBsZQ==" + }, "event_id": { "description": "Unique random event UUID generated by the sender. Used for deduplication on the backend side.", "type": "string", @@ -676,7 +682,7 @@ "type": "string", "enum": [ "", - "wallet-connect-request-received" + "bridge-message-received" ] }, "message_id": { @@ -1021,7 +1027,7 @@ "type": "string", "enum": [ "", - "bridge-events-client-unsubscribed" + "bridge-verify" ] }, "network_id": { @@ -1055,6 +1061,96 @@ "description": "Status of verification.", "type": "string" }, + "verify_type": { + "type": "string", + "enum": [ + "connect" + ] + }, + "version": { + "description": "The version of the sending subsystem.", + "type": "string" + } + } + }, + "BridgeVerifyValidationFailedEvent": { + "description": "When the client sends a verification request to bridge events.", + "type": "object", + "properties": { + "bridge_url": { + "description": "Bridge URL.", + "type": "string" + }, + "client_environment": { + "description": "The client environment.", + "type": "string", + "enum": [ + "", + "bridge" + ] + }, + "client_id": { + "description": "A unique session ID.", + "type": "string" + }, + "client_timestamp": { + "description": "The timestamp of the event on the client side, in Unix time (stored as an integer).", + "type": "integer", + "example": 0 + }, + "error_code": { + "description": "Error code.", + "type": "integer" + }, + "error_message": { + "description": "Error message.", + "type": "string" + }, + "event_id": { + "description": "Unique random event UUID generated by the sender. Used for deduplication on the backend side.", + "type": "string", + "example": "8d5e90bd-d6f8-4ab0-bff8-0ee2f26b44c3" + }, + "event_name": { + "type": "string", + "enum": [ + "", + "bridge-verify-validation-failed" + ] + }, + "network_id": { + "description": "Network id (-239 for the mainnet and -3 for the testnet). Other values should be rejected.", + "type": "string", + "example": "-3" + }, + "subsystem": { + "description": "The subsystem used to collect the event (possible values: dapp, bridge, wallet).", + "type": "string", + "enum": [ + "bridge", + "dapp", + "dapp-sdk", + "wallet", + "wallet-sdk" + ], + "example": "dapp" + }, + "trace_id": { + "description": "ID to aggregate multiple events into one trace. UUIDv7 must be used (first 48 bits must be unix_ts_ms as in the specification). trace_id older than 24h won't be accepted.", + "type": "string", + "example": "00000000-0000-0000-0000-000000000000" + }, + "user_id": { + "description": "A unique identifier for the user (refer to subsystem session details for more information). May be omitted, in this case it will be generated on the backend side and generated. UUID must be used.", + "type": "string", + "example": "8d5e90bd-d6f8-4ab0-bff8-0ee2f26b44c3" + }, + "verify_type": { + "type": "string", + "enum": [ + "connect" + ] + }, "version": { "description": "The version of the sending subsystem.", "type": "string" @@ -1297,6 +1393,24 @@ } } } + }, + "/dummy/BridgeVerifyValidationFailedEvent": { + "post": { + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/BridgeVerifyValidationFailedEvent" + } + } + } + }, + "responses": { + "204": { + "description": "stub" + } + } + } } } } diff --git a/tonmetrics/swagger-tonconnect.json b/tonmetrics/swagger-tonconnect.json index e869bfec..657df462 100644 --- a/tonmetrics/swagger-tonconnect.json +++ b/tonmetrics/swagger-tonconnect.json @@ -402,6 +402,34 @@ "responses": {} } }, + "/events/bridge-verify-validation-failed": { + "post": { + "consumes": [ + "application/json" + ], + "parameters": [ + { + "type": "integer", + "description": "Unix timestamp on the client at the time of sending", + "name": "X-Client-Timestamp", + "in": "header" + }, + { + "description": "events", + "name": "events", + "in": "body", + "required": true, + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/BridgeVerifyValidationFailedEvent" + } + } + } + ], + "responses": {} + } + }, "/events/connection-completed": { "post": { "consumes": [ @@ -1877,6 +1905,12 @@ "type": "integer", "example": 0 }, + "encrypted_message_hash": { + "description": "Bridge encrypted message hash.", + "type": "string", + "format": "base64", + "example": "ZXhhbXBsZQ==" + }, "event_id": { "description": "Unique random event UUID generated by the sender. Used for deduplication on the backend side.", "type": "string", @@ -1886,7 +1920,7 @@ "type": "string", "enum": [ "", - "wallet-connect-request-received" + "bridge-message-received" ] }, "message_id": { @@ -2231,7 +2265,7 @@ "type": "string", "enum": [ "", - "bridge-events-client-unsubscribed" + "bridge-verify" ] }, "network_id": { @@ -2265,6 +2299,96 @@ "description": "Status of verification.", "type": "string" }, + "verify_type": { + "type": "string", + "enum": [ + "connect" + ] + }, + "version": { + "description": "The version of the sending subsystem.", + "type": "string" + } + } + }, + "BridgeVerifyValidationFailedEvent": { + "description": "When the client sends a verification request to bridge events.", + "type": "object", + "properties": { + "bridge_url": { + "description": "Bridge URL.", + "type": "string" + }, + "client_environment": { + "description": "The client environment.", + "type": "string", + "enum": [ + "", + "bridge" + ] + }, + "client_id": { + "description": "A unique session ID.", + "type": "string" + }, + "client_timestamp": { + "description": "The timestamp of the event on the client side, in Unix time (stored as an integer).", + "type": "integer", + "example": 0 + }, + "error_code": { + "description": "Error code.", + "type": "integer" + }, + "error_message": { + "description": "Error message.", + "type": "string" + }, + "event_id": { + "description": "Unique random event UUID generated by the sender. Used for deduplication on the backend side.", + "type": "string", + "example": "8d5e90bd-d6f8-4ab0-bff8-0ee2f26b44c3" + }, + "event_name": { + "type": "string", + "enum": [ + "", + "bridge-verify-validation-failed" + ] + }, + "network_id": { + "description": "Network id (-239 for the mainnet and -3 for the testnet). Other values should be rejected.", + "type": "string", + "example": "-3" + }, + "subsystem": { + "description": "The subsystem used to collect the event (possible values: dapp, bridge, wallet).", + "type": "string", + "enum": [ + "bridge", + "dapp", + "dapp-sdk", + "wallet", + "wallet-sdk" + ], + "example": "dapp" + }, + "trace_id": { + "description": "ID to aggregate multiple events into one trace. UUIDv7 must be used (first 48 bits must be unix_ts_ms as in the specification). trace_id older than 24h won't be accepted.", + "type": "string", + "example": "00000000-0000-0000-0000-000000000000" + }, + "user_id": { + "description": "A unique identifier for the user (refer to subsystem session details for more information). May be omitted, in this case it will be generated on the backend side and generated. UUID must be used.", + "type": "string", + "example": "8d5e90bd-d6f8-4ab0-bff8-0ee2f26b44c3" + }, + "verify_type": { + "type": "string", + "enum": [ + "connect" + ] + }, "version": { "description": "The version of the sending subsystem.", "type": "string" @@ -3787,6 +3911,12 @@ ], "example": "ok" }, + "verify_type": { + "type": "string", + "enum": [ + "connect" + ] + }, "version": { "type": "string" },