Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
2e20643
sync time between bridges (#170)
Nov 26, 2025
ed9efd0
send events
Nov 14, 2025
dabf1c2
review message sent-received
Nov 17, 2025
cdf8b2b
expired messages
Nov 17, 2025
c213ff1
fix unit tests
Nov 17, 2025
3150a28
fixes after review
Nov 17, 2025
9e3442d
refactor CreateBridgeMessageValidationFailedEvent
Nov 17, 2025
3a8340a
get rid of bridge-client-message-decode-error
Nov 17, 2025
3d07152
get rid of CreateBridgeClientConnectStartedEvent, CreateBridgeConnect…
Nov 17, 2025
21d859f
restore some stuff
Nov 17, 2025
ead3504
event collector and event sender
Nov 18, 2025
e05ab32
fix topic TODOs
Nov 20, 2025
bfa57e9
minor typo
Nov 20, 2025
3e713fb
remove duplicating code
Nov 20, 2025
52a8ce0
generate new types
Nov 20, 2025
03945a0
fix all TODO send missing analytics event
Nov 20, 2025
b03b89e
analytics.AnalyticCollector -> analytics.EventCollector
Nov 20, 2025
336b431
SendBatch + minor refactoring
Nov 20, 2025
8c55571
Refactor analytics integration across handlers and storage
Nov 20, 2025
811809f
minor fixes
Nov 20, 2025
12bce12
fix todos
Nov 20, 2025
1b92821
add debug logs
Nov 21, 2025
846e336
revert me
Nov 21, 2025
8746d62
rename values in config
Nov 21, 2025
c317c48
fix
Nov 21, 2025
4cf7472
add TON_ANALYTICS vars
Nov 21, 2025
d716599
mock analytics server
Nov 21, 2025
80237a9
fix lint
Nov 21, 2025
26cfbd9
add ring_buffer_test.go
Nov 25, 2025
714beb9
docker/analytics-mock-server.go -> cmd/analytics-mock/main.go
Nov 25, 2025
3aa0f63
fixes after review
Nov 25, 2025
f22dc7e
simplify ring buffer
Nov 26, 2025
e28e2fb
update docs
Nov 26, 2025
ed0b0b4
simplify ring collector
Nov 26, 2025
774e923
move sender to separate file, use atomic
Nov 26, 2025
a9a5747
even simpler
Nov 26, 2025
fca79eb
even simpler
Nov 26, 2025
54364b6
fixes after review
Nov 28, 2025
243d97c
refactor flush part
Nov 28, 2025
ac4fe7f
pass trace_id everywhere
Nov 28, 2025
bb659ad
fix panic
Nov 28, 2025
a1f1b6c
use eventCh
Dec 1, 2025
3ed4481
final
Dec 1, 2025
2a515a8
feat: Add flush notification channel for metrics collector
TrueCarry Dec 2, 2025
6dac359
fix: Change random id to 2^53 - 1 to allow safe JS integer conversions
TrueCarry Dec 2, 2025
df0c97e
Merge branch 'ton-analytics-send-events' of github.com:ton-connect/br…
TrueCarry Dec 2, 2025
566e3b6
tests: Increase timeouts for the bridge tests
TrueCarry Dec 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions cmd/analytics-mock/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"encoding/json"
"io"
"log"
"net/http"
"sync"
)

type AnalyticsMock struct {
mu sync.RWMutex
receivedEvents []map[string]interface{}
totalEvents int
}

func NewAnalyticsMock() *AnalyticsMock {
return &AnalyticsMock{
receivedEvents: make([]map[string]interface{}, 0),
}
}

func (m *AnalyticsMock) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/health" {
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("OK")); err != nil {
log.Printf("Failed to write health response: %v", err)
}
return
}

if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

if r.URL.Path != "/events" {
w.WriteHeader(http.StatusNotFound)
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read body: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

var events []map[string]interface{}
if err := json.Unmarshal(body, &events); err != nil {
log.Printf("Failed to unmarshal events: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}

m.mu.Lock()
m.receivedEvents = append(m.receivedEvents, events...)
m.totalEvents += len(events)
total := m.totalEvents
m.mu.Unlock()

log.Printf("✅ Received batch of %d events (total: %d)", len(events), total)
for _, event := range events {
if eventName, ok := event["event_name"].(string); ok {
log.Printf(" - %s", eventName)
}
}

// Return 202 Accepted like the real analytics server
w.WriteHeader(http.StatusAccepted)
}

func (m *AnalyticsMock) GetStats(w http.ResponseWriter, r *http.Request) {
m.mu.RLock()
defer m.mu.RUnlock()

eventTypes := make(map[string]int)
for _, event := range m.receivedEvents {
if eventName, ok := event["event_name"].(string); ok {
eventTypes[eventName]++
}
}

stats := map[string]interface{}{
"total_events": m.totalEvents,
"event_types": eventTypes,
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(stats); err != nil {
log.Printf("Failed to encode stats: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

func main() {
mock := NewAnalyticsMock()

http.HandleFunc("/events", mock.ServeHTTP)
http.HandleFunc("/health", mock.ServeHTTP)
http.HandleFunc("/stats", mock.GetStats)

port := ":9090"
log.Printf("🚀 Analytics Mock Server starting on %s", port)
log.Printf("📊 Endpoints:")
log.Printf(" POST /events - Receive analytics events")
log.Printf(" GET /health - Health check")
log.Printf(" GET /stats - Get statistics")
log.Printf("")
if err := http.ListenAndServe(port, nil); err != nil {
log.Fatal(err)
}
}
20 changes: 18 additions & 2 deletions cmd/bridge/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
Expand All @@ -12,12 +13,14 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/ton-connect/bridge/internal"
"github.com/ton-connect/bridge/internal/analytics"
"github.com/ton-connect/bridge/internal/app"
"github.com/ton-connect/bridge/internal/config"
bridge_middleware "github.com/ton-connect/bridge/internal/middleware"
"github.com/ton-connect/bridge/internal/utils"
handlerv1 "github.com/ton-connect/bridge/internal/v1/handler"
"github.com/ton-connect/bridge/internal/v1/storage"
"github.com/ton-connect/bridge/tonmetrics"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
)
Expand All @@ -32,7 +35,20 @@ func main() {
app.SetBridgeInfo("bridgev1", "postgres")
}

dbConn, err := storage.NewStorage(config.Config.PostgresURI)
tonAnalytics := tonmetrics.NewAnalyticsClient()

collector := analytics.NewCollector(200, tonAnalytics, 500*time.Millisecond)
go collector.Run(context.Background())

analyticsBuilder := analytics.NewEventBuilder(
config.Config.TonAnalyticsBridgeURL,
"bridge",
"bridge",
config.Config.TonAnalyticsBridgeVersion,
config.Config.TonAnalyticsNetworkId,
)

dbConn, err := storage.NewStorage(config.Config.PostgresURI, collector, analyticsBuilder)
if err != nil {
log.Fatalf("db connection %v", err)
}
Expand Down Expand Up @@ -93,7 +109,7 @@ func main() {
e.Use(corsConfig)
}

h := handlerv1.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor)
h := handlerv1.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor, collector, analyticsBuilder)

e.GET("/bridge/events", h.EventRegistrationHandler)
e.POST("/bridge/message", h.SendMessageHandler)
Expand Down
40 changes: 38 additions & 2 deletions cmd/bridge3/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
Expand All @@ -12,12 +13,15 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/ton-connect/bridge/internal"
"github.com/ton-connect/bridge/internal/analytics"
"github.com/ton-connect/bridge/internal/app"
"github.com/ton-connect/bridge/internal/config"
bridge_middleware "github.com/ton-connect/bridge/internal/middleware"
"github.com/ton-connect/bridge/internal/ntp"
"github.com/ton-connect/bridge/internal/utils"
handlerv3 "github.com/ton-connect/bridge/internal/v3/handler"
storagev3 "github.com/ton-connect/bridge/internal/v3/storage"
"github.com/ton-connect/bridge/tonmetrics"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
)
Expand All @@ -27,6 +31,27 @@ func main() {
config.LoadConfig()
app.InitMetrics()

var timeProvider ntp.TimeProvider
if config.Config.NTPEnabled {
ntpClient := ntp.NewClient(ntp.Options{
Servers: config.Config.NTPServers,
SyncInterval: time.Duration(config.Config.NTPSyncInterval) * time.Second,
QueryTimeout: time.Duration(config.Config.NTPQueryTimeout) * time.Second,
})
ctx := context.Background()
ntpClient.Start(ctx)
defer ntpClient.Stop()
timeProvider = ntpClient
log.WithFields(log.Fields{
"servers": config.Config.NTPServers,
"sync_interval": config.Config.NTPSyncInterval,
}).Info("NTP synchronization enabled")
} else {
timeProvider = ntp.NewLocalTimeProvider()
log.Info("NTP synchronization disabled, using local time")
}
tonAnalytics := tonmetrics.NewAnalyticsClient()

dbURI := ""
store := "memory"
if config.Config.Storage != "" {
Expand All @@ -45,7 +70,18 @@ func main() {
// No URI needed for memory storage
}

dbConn, err := storagev3.NewStorage(store, dbURI)
collector := analytics.NewCollector(200, tonAnalytics, 500*time.Millisecond)
go collector.Run(context.Background())

analyticsBuilder := analytics.NewEventBuilder(
config.Config.TonAnalyticsBridgeURL,
"bridge",
"bridge",
config.Config.TonAnalyticsBridgeVersion,
config.Config.TonAnalyticsNetworkId,
)

dbConn, err := storagev3.NewStorage(store, dbURI, collector, analyticsBuilder)

if err != nil {
log.Fatalf("failed to create storage: %v", err)
Expand Down Expand Up @@ -116,7 +152,7 @@ func main() {
e.Use(corsConfig)
}

h := handlerv3.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor)
h := handlerv3.NewHandler(dbConn, time.Duration(config.Config.HeartbeatInterval)*time.Second, extractor, timeProvider, collector, analyticsBuilder)

e.GET("/bridge/events", h.EventRegistrationHandler)
e.POST("/bridge/message", h.SendMessageHandler)
Expand Down
16 changes: 16 additions & 0 deletions docker/Dockerfile.analytics-mock
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.24-alpine AS builder

WORKDIR /app

# Copy the standalone mock server
COPY cmd/analytics-mock/main.go ./main.go

# Build the server
RUN go build -o analytics-mock main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates curl
WORKDIR /root/
COPY --from=builder /app/analytics-mock .
EXPOSE 9090
CMD ["./analytics-mock"]
24 changes: 24 additions & 0 deletions docker/docker-compose.cluster-valkey.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ services:
"
restart: "no"

analytics-mock:
build:
context: ..
dockerfile: docker/Dockerfile.analytics-mock
container_name: analytics-mock
ports:
- "9090:9090"
networks:
bridge_network:
ipv4_address: 172.20.0.25
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9090/health"]
interval: 5s
timeout: 3s
retries: 3
restart: unless-stopped

bridge:
build:
context: ..
Expand All @@ -93,6 +110,11 @@ services:
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
TON_ANALYTICS_ENABLED: "true"
TON_ANALYTICS_URL: "http://analytics-mock:9090/events"
TON_ANALYTICS_NETWORK_ID: "-239"
TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0"
TON_ANALYTICS_BRIDGE_URL: "http://bridge:8081"
ports:
- "8081:8081" # bridge
- "9103:9103" # pprof and metrics
Expand All @@ -105,6 +127,8 @@ services:
condition: service_healthy
valkey-cluster-init:
condition: service_completed_successfully
analytics-mock:
condition: service_healthy
networks:
bridge_network:
ipv4_address: 172.20.0.30
Expand Down
39 changes: 39 additions & 0 deletions docker/docker-compose.dnsmasq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ services:
"
restart: "no"

analytics-mock:
build:
context: ..
dockerfile: docker/Dockerfile.analytics-mock
container_name: analytics-mock
ports:
- "9090:9090"
networks:
bridge_network:
ipv4_address: 172.20.0.25
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9090/health"]
interval: 5s
timeout: 3s
retries: 3
restart: unless-stopped

bridge1:
build:
context: ..
Expand All @@ -151,11 +168,18 @@ services:
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
TON_ANALYTICS_ENABLED: "true"
TON_ANALYTICS_URL: "http://analytics-mock:9090/events"
TON_ANALYTICS_NETWORK_ID: "-239"
TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0"
TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance1:8080"
ports:
- "9103:9103" # pprof and metrics
depends_on:
valkey-cluster-init:
condition: service_completed_successfully
analytics-mock:
condition: service_healthy
dnsmasq:
condition: service_started
networks:
Expand All @@ -178,9 +202,17 @@ services:
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
# Analytics configuration
TON_ANALYTICS_ENABLED: "true"
TON_ANALYTICS_URL: "http://analytics-mock:9090/events"
TON_ANALYTICS_NETWORK_ID: "-239"
TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0"
TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance2:8080"
depends_on:
valkey-cluster-init:
condition: service_completed_successfully
analytics-mock:
condition: service_healthy
dnsmasq:
condition: service_started
networks:
Expand All @@ -202,9 +234,16 @@ services:
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
TON_ANALYTICS_ENABLED: "true"
TON_ANALYTICS_URL: "http://analytics-mock:9090/events"
TON_ANALYTICS_NETWORK_ID: "-239"
TON_ANALYTICS_BRIDGE_VERSION: "test-1.0.0"
TON_ANALYTICS_BRIDGE_URL: "http://bridge-instance2:8080"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Copy-paste error in bridge3 analytics URL configuration

The bridge3 service has TON_ANALYTICS_BRIDGE_URL set to "http://bridge-instance2:8080" which is copied from the bridge2 service configuration. This causes bridge3 to report itself as bridge-instance2 in analytics events, making it impossible to distinguish which bridge instance generated the events. The URL should reference bridge3's own endpoint.

Fix in Cursor Fix in Web

depends_on:
valkey-cluster-init:
condition: service_completed_successfully
analytics-mock:
condition: service_healthy
dnsmasq:
condition: service_started
networks:
Expand Down
Loading