diff --git a/pkg/skyenv/skyenv.go b/pkg/skyenv/skyenv.go index de5ec50909..b77e8ad0f9 100644 --- a/pkg/skyenv/skyenv.go +++ b/pkg/skyenv/skyenv.go @@ -219,12 +219,13 @@ const ( // Routing constants - // TpLogStore is where tp logs are stored + // TpLogStore is the legacy on-disk transport-log directory name. + // Retained because cmd/skywire-cli/commands/config/gen.go still + // emits it as the default LogStore.Location for backward-compatible + // config files; the runtime no longer writes anything there + // (historical bandwidth lives in the bbolt stats store). TpLogStore = "transport_logs" - // LatencyLogStore is where transport latency logs are stored - LatencyLogStore = "latency_logs" - // LocalPath where the visor writes files to LocalPath = "./local" diff --git a/pkg/transport/log.go b/pkg/transport/log.go index 83d29a6573..f7f2cc2789 100644 --- a/pkg/transport/log.go +++ b/pkg/transport/log.go @@ -1,37 +1,25 @@ // Package transport pkg/transport/log.go +// +// Per-transport bandwidth counters that the manager exposes to the rest +// of the visor. The store is in-memory only — historical (per-day) +// bandwidth is served from pkg/visor/stats's bbolt-backed Tracker, not +// from this store. See pkg/visor/stats/tracker.go for the daily rollups +// fed into the visor's CXO publisher and the /stats HTTP endpoints. package transport import ( - "bufio" "bytes" - "context" - "encoding/csv" "encoding/gob" "errors" "fmt" - "os" - "path/filepath" - "strings" "sync" "sync/atomic" - "time" - "github.com/gocarina/gocsv" "github.com/google/uuid" "github.com/skycoin/skywire/pkg/logging" ) -const dateFormat string = "2006-01-02" - -// CsvEntry represents a logging entry for csv for a given Transport. -type CsvEntry struct { - TpID uuid.UUID `csv:"tp_id"` - // atomic requires 64-bit alignment for struct field access - LogEntry - TimeStamp int64 `csv:"time_stamp"` // TimeStamp should be time.RFC3339Nano formatted -} - // LogEntry represents a logging entry for a given Transport. // The entry is updated every time a packet is received or sent. type LogEntry struct { @@ -137,7 +125,12 @@ func (le *LogEntry) GobDecode(b []byte) error { return nil } -// LogStore stores transport log entries. +// LogStore stores transport log entries. The only implementation is +// the in-memory store — historical persistence is the stats Tracker's +// job (pkg/visor/stats), not this store's. The store exists so that a +// transport that closes and re-opens within the same visor session +// preserves its cumulative byte counters across the gap; nothing in +// this package persists across visor restarts. type LogStore interface { Entry(id uuid.UUID) (*LogEntry, error) Record(id uuid.UUID, entry *LogEntry) error @@ -175,509 +168,3 @@ func (tls *inMemoryTransportLogStore) Record(id uuid.UUID, entry *LogEntry) erro tls.mu.Unlock() return nil } - -type fileTransportLogStore struct { - dir string - log *logging.Logger - mu sync.Mutex - fileName string -} - -// FileTransportLogStore implements file TransportLogStore. -func FileTransportLogStore(ctx context.Context, dir string, rInterval time.Duration, log *logging.Logger) (LogStore, error) { - if err := os.MkdirAll(dir, 0750); err != nil { - return nil, err - } - - fLogStore := &fileTransportLogStore{ - dir: dir, - log: log, - } - - go func() { - ticker := time.NewTicker(time.Hour * 5) - defer ticker.Stop() - fLogStore.cleanLogs(rInterval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - fLogStore.cleanLogs(rInterval) - } - } - }() - - return fLogStore, nil -} - -func (tls *fileTransportLogStore) Entry(tpID uuid.UUID) (*LogEntry, error) { - tls.mu.Lock() - defer tls.mu.Unlock() - - entries, err := tls.readFromCSV(tls.todayFileName()) - if err != nil { - return nil, err - } - for _, entry := range entries { - if entry.TpID == tpID { - return &entry.LogEntry, nil - } - } - return nil, nil -} - -func (tls *fileTransportLogStore) Record(tpID uuid.UUID, lEntry *LogEntry) error { - tls.mu.Lock() - defer tls.mu.Unlock() - - cEntry := &CsvEntry{ - TpID: tpID, - LogEntry: *lEntry, - TimeStamp: time.Now().UTC().Unix(), - } - - return tls.writeToCSV(cEntry) -} - -func (tls *fileTransportLogStore) writeToCSV(cEntry *CsvEntry) error { - - today := tls.todayFileName() - // we check if the date of the file has changed or not - // if it is then it means it's a new day so we need to reset the LogEntry - // so that we can start the count again for the new day and file - if tls.fileName != "" && tls.fileName != tls.todayFileName() { - // before we reset we need to save the current data so we save it in the previous days file - // note: the timestamp of this entry will likely be of the current day so if a log file has - // a timestamp of next day then it is an indicator that it's an inter-day transport log - today = tls.fileName - } - - filePath := filepath.Join(tls.dir, today) - - //nolint:gosec - f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return err - } - - readClients := []*CsvEntry{} - writeClients := []*CsvEntry{} - - if err := gocsv.UnmarshalFile(f, &readClients); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { - // Close the file before attempting recovery - f.Close() //nolint:errcheck,gosec - - // Attempt to recover from corrupted CSV - recovered, recoverErr := tls.recoverCSV(filePath) - if recoverErr != nil { - return fmt.Errorf("CSV parse error and recovery failed: %w (original: %v)", recoverErr, err) - } - readClients = recovered - - // Repair the file by rewriting it without corrupted lines - if repairErr := tls.repairCSVFile(filePath, readClients); repairErr != nil { - tls.log.WithError(repairErr).Warn("Failed to repair CSV file") - } - - // Reopen the repaired file - f, err = os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0600) //nolint:gosec - if err != nil { - return err - } - } - - defer func() { - if err := f.Close(); err != nil { - tls.log.WithError(err).Errorln("Failed to close csv file") - } - }() - - var update bool - for _, client := range readClients { - // update if readClients contains the cEntry - if client.TpID == cEntry.TpID { - writeClients = append(writeClients, cEntry) - update = true - continue - } - writeClients = append(writeClients, client) - } - - // write when the readClients are does not contain cEntry - if !update { - writeClients = append(writeClients, cEntry) - } - - if _, err := f.Seek(0, 0); err != nil { // Go to the start of the file - return err - } - - err = gocsv.MarshalFile(&writeClients, f) // Use this to save the CSV back to the file - if err != nil { - return err - } - - // we reset the entry after it is saved - if tls.fileName != "" && tls.fileName != tls.todayFileName() { - cEntry.LogEntry.Reset() - } - - tls.fileName = tls.todayFileName() - - return nil -} - -func (tls *fileTransportLogStore) readFromCSV(fileName string) ([]*CsvEntry, error) { - f, err := os.OpenFile(filepath.Join(tls.dir, fmt.Sprint(fileName)), os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return nil, err - } - - defer func() { - if err := f.Close(); err != nil { - tls.log.WithError(err).Errorln("Failed to close csv file") - } - }() - - readClients := []*CsvEntry{} - - if err := gocsv.UnmarshalFile(f, &readClients); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { // Load clients from file - return nil, err - } - return readClients, nil -} - -// recoverCSV attempts to recover valid entries from a corrupted CSV file. -// It reads line-by-line, skipping any malformed lines, and returns valid entries. -func (tls *fileTransportLogStore) recoverCSV(filePath string) ([]*CsvEntry, error) { - f, err := os.Open(filePath) //nolint:gosec // filePath is from internal config - if err != nil { - return nil, err - } - defer f.Close() //nolint:errcheck - - var validLines []string - scanner := bufio.NewScanner(f) - lineNum := 0 - skippedLines := 0 - - for scanner.Scan() { - lineNum++ - line := scanner.Text() - - // Keep the header line - if lineNum == 1 { - validLines = append(validLines, line) - continue - } - - // Validate line has correct number of fields (4: tp_id, recv, sent, time_stamp) - reader := csv.NewReader(strings.NewReader(line)) - fields, err := reader.Read() - if err != nil || len(fields) != 4 { - tls.log.Debugf("Skipping corrupted CSV line %d: %q", lineNum, line) - skippedLines++ - continue - } - - validLines = append(validLines, line) - } - - if err := scanner.Err(); err != nil { - return nil, err - } - - if skippedLines > 0 { - tls.log.Infof("Recovered CSV: skipped %d corrupted line(s)", skippedLines) - } - - // Parse the valid lines - if len(validLines) <= 1 { - return []*CsvEntry{}, nil - } - - csvData := strings.Join(validLines, "\n") - var entries []*CsvEntry - if err := gocsv.UnmarshalString(csvData, &entries); err != nil { - return nil, fmt.Errorf("failed to parse recovered CSV: %w", err) - } - - return entries, nil -} - -// repairCSVFile repairs a corrupted CSV file by removing invalid lines. -func (tls *fileTransportLogStore) repairCSVFile(filePath string, entries []*CsvEntry) error { - // Write repaired content to a temp file - tmpPath := filePath + ".tmp" - tmpFile, err := os.Create(tmpPath) //nolint:gosec // tmpPath is derived from internal config - if err != nil { - return err - } - - if err := gocsv.MarshalFile(&entries, tmpFile); err != nil { - tmpFile.Close() //nolint:errcheck,gosec - os.Remove(tmpPath) //nolint:errcheck,gosec - return err - } - tmpFile.Close() //nolint:errcheck,gosec - - // Replace original with repaired file - return os.Rename(tmpPath, filePath) -} - -// CleanLogs cleans the logs that are older than the given log rotation interval -func (tls *fileTransportLogStore) cleanLogs(rInterval time.Duration) { - - files, err := os.ReadDir(tls.dir) - if err != nil { - tls.log.Warn(err) - } - - for _, file := range files { - if !file.IsDir() { - interval := time.Now().UTC().Add(-rInterval) - date, err := time.Parse(dateFormat, strings.ReplaceAll(file.Name(), ".csv", "")) - if err != nil { - tls.log.Warn(err) - } - if date.Before(interval) { - err = os.Remove(tls.dir + "/" + file.Name()) - if err != nil { - tls.log.Warn(err) - } - tls.log.Debugf("transport log file cleaned: %v", file.Name()) - } - } - } -} - -func (tls *fileTransportLogStore) todayFileName() string { - return fmt.Sprintf("%s.csv", time.Now().UTC().Format(dateFormat)) -} - -// LatencyCsvEntry represents a logging entry for csv for transport latency. -type LatencyCsvEntry struct { - TpID uuid.UUID `csv:"tp_id"` - MinMs float64 `csv:"min_ms"` - MaxMs float64 `csv:"max_ms"` - AvgMs float64 `csv:"avg_ms"` - TimeStamp int64 `csv:"time_stamp"` -} - -// LatencyLogStore stores transport latency log entries. -type LatencyLogStore interface { - Entry(id uuid.UUID) (*LatencyCsvEntry, error) - Record(id uuid.UUID, min, max, avg float64) error -} - -type inMemoryLatencyLogStore struct { - entries map[uuid.UUID]*LatencyCsvEntry - mu sync.Mutex -} - -// InMemoryLatencyLogStore implements in-memory LatencyLogStore. -func InMemoryLatencyLogStore() LatencyLogStore { - return &inMemoryLatencyLogStore{ - entries: make(map[uuid.UUID]*LatencyCsvEntry), - } -} - -func (ls *inMemoryLatencyLogStore) Entry(id uuid.UUID) (*LatencyCsvEntry, error) { - ls.mu.Lock() - entry, ok := ls.entries[id] - ls.mu.Unlock() - if !ok { - return nil, errors.New("latency log entry not found") - } - return entry, nil -} - -func (ls *inMemoryLatencyLogStore) Record(id uuid.UUID, min, max, avg float64) error { - ls.mu.Lock() - if ls.entries == nil { - ls.entries = make(map[uuid.UUID]*LatencyCsvEntry) - } - ls.entries[id] = &LatencyCsvEntry{ - TpID: id, - MinMs: min, - MaxMs: max, - AvgMs: avg, - TimeStamp: time.Now().UTC().Unix(), - } - ls.mu.Unlock() - return nil -} - -type fileLatencyLogStore struct { - dir string - log *logging.Logger - mu sync.Mutex - fileName string -} - -// FileLatencyLogStore implements file LatencyLogStore. -func FileLatencyLogStore(ctx context.Context, dir string, rInterval time.Duration, log *logging.Logger) (LatencyLogStore, error) { - if err := os.MkdirAll(dir, 0750); err != nil { - return nil, err - } - - fLogStore := &fileLatencyLogStore{ - dir: dir, - log: log, - } - - go func() { - ticker := time.NewTicker(time.Hour * 5) - defer ticker.Stop() - fLogStore.cleanLogs(rInterval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - fLogStore.cleanLogs(rInterval) - } - } - }() - - return fLogStore, nil -} - -func (ls *fileLatencyLogStore) Entry(tpID uuid.UUID) (*LatencyCsvEntry, error) { - ls.mu.Lock() - defer ls.mu.Unlock() - - entries, err := ls.readFromCSV(ls.todayFileName()) - if err != nil { - return nil, err - } - for _, entry := range entries { - if entry.TpID == tpID { - return entry, nil - } - } - return nil, nil -} - -func (ls *fileLatencyLogStore) Record(tpID uuid.UUID, min, max, avg float64) error { - ls.mu.Lock() - defer ls.mu.Unlock() - - entry := &LatencyCsvEntry{ - TpID: tpID, - MinMs: min, - MaxMs: max, - AvgMs: avg, - TimeStamp: time.Now().UTC().Unix(), - } - - return ls.writeToCSV(entry) -} - -func (ls *fileLatencyLogStore) writeToCSV(entry *LatencyCsvEntry) error { - today := ls.todayFileName() - if ls.fileName != "" && ls.fileName != ls.todayFileName() { - today = ls.fileName - } - - filePath := filepath.Join(ls.dir, today) - - //nolint:gosec - f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0600) - if err != nil { - return err - } - - defer func() { - if err := f.Close(); err != nil { - ls.log.WithError(err).Errorln("Failed to close latency csv file") - } - }() - - readEntries := []*LatencyCsvEntry{} - writeEntries := []*LatencyCsvEntry{} - - if err := gocsv.UnmarshalFile(f, &readEntries); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { - ls.log.WithError(err).Warn("Failed to parse latency CSV, starting fresh") - readEntries = []*LatencyCsvEntry{} - } - - var update bool - for _, existing := range readEntries { - if existing.TpID == entry.TpID { - writeEntries = append(writeEntries, entry) - update = true - continue - } - writeEntries = append(writeEntries, existing) - } - - if !update { - writeEntries = append(writeEntries, entry) - } - - if _, err := f.Seek(0, 0); err != nil { - return err - } - - if err := f.Truncate(0); err != nil { - return err - } - - if err := gocsv.MarshalFile(&writeEntries, f); err != nil { - return err - } - - ls.fileName = ls.todayFileName() - return nil -} - -func (ls *fileLatencyLogStore) readFromCSV(fileName string) ([]*LatencyCsvEntry, error) { - f, err := os.OpenFile(filepath.Join(ls.dir, fileName), os.O_RDWR|os.O_CREATE, 0600) //nolint:gosec - if err != nil { - return nil, err - } - - defer func() { - if err := f.Close(); err != nil { - ls.log.WithError(err).Errorln("Failed to close latency csv file") - } - }() - - entries := []*LatencyCsvEntry{} - if err := gocsv.UnmarshalFile(f, &entries); err != nil && !errors.Is(err, gocsv.ErrEmptyCSVFile) { - return nil, err - } - return entries, nil -} - -func (ls *fileLatencyLogStore) cleanLogs(rInterval time.Duration) { - files, err := os.ReadDir(ls.dir) - if err != nil { - ls.log.Warn(err) - return - } - - for _, file := range files { - if !file.IsDir() { - interval := time.Now().UTC().Add(-rInterval) - date, err := time.Parse(dateFormat, strings.ReplaceAll(file.Name(), ".csv", "")) - if err != nil { - ls.log.Warn(err) - continue - } - if date.Before(interval) { - err = os.Remove(filepath.Join(ls.dir, file.Name())) - if err != nil { - ls.log.Warn(err) - } - ls.log.Debugf("latency log file cleaned: %v", file.Name()) - } - } - } -} - -func (ls *fileLatencyLogStore) todayFileName() string { - return fmt.Sprintf("%s.csv", time.Now().UTC().Format(dateFormat)) -} diff --git a/pkg/transport/log_test.go b/pkg/transport/log_test.go index ac28f36972..e9116d200b 100644 --- a/pkg/transport/log_test.go +++ b/pkg/transport/log_test.go @@ -2,18 +2,14 @@ package transport_test import ( - "context" "encoding/json" "fmt" - "os" "testing" - "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/skycoin/skywire/pkg/logging" "github.com/skycoin/skywire/pkg/transport" ) @@ -44,26 +40,6 @@ func TestInMemoryTransportLogStore(t *testing.T) { testTransportLogStore(t, transport.InMemoryTransportLogStore()) } -func TestFileTransportLogStore(t *testing.T) { - dir, err := os.MkdirTemp("", "log_store") - require.NoError(t, err) - defer func() { - require.NoError(t, os.RemoveAll(dir)) - }() - - log := logging.MustGetLogger("transport") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ls, err := transport.FileTransportLogStore(ctx, dir, time.Hour*24*7, log) - require.NoError(t, err) - testTransportLogStore(t, ls) - - // Cancel context and wait briefly for background goroutine to exit - cancel() - time.Sleep(100 * time.Millisecond) -} - func TestLogEntry_MarshalJSON(t *testing.T) { entry := transport.NewLogEntry() entry.AddSent(10) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 500439149a..d450a65590 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -38,7 +38,6 @@ type ManagerConfig struct { SecKey cipher.SecKey DiscoveryClient DiscoveryClient LogStore LogStore - LatencyLogStore LatencyLogStore PersistentTransportsCache []PersistentTransports PTpsCacheMu sync.RWMutex Version string // Visor version for reporting to TPD diff --git a/pkg/visor/api_transport.go b/pkg/visor/api_transport.go index e5d1cbdf42..652b908f7c 100644 --- a/pkg/visor/api_transport.go +++ b/pkg/visor/api_transport.go @@ -5,11 +5,8 @@ import ( "context" "errors" "fmt" - "os" - "path/filepath" "time" - "github.com/gocarina/gocsv" "github.com/google/uuid" "github.com/skycoin/skywire/pkg/app/appnet" @@ -233,79 +230,50 @@ func (v *Visor) DiscoverTransportByID(id uuid.UUID) (*transport.Entry, error) { } // GetTransportLogs implements API. -// Returns transport log entries from the last N days. +// Returns one entry per (transport, day) for the last N UTC days, drawn +// from the bbolt-backed stats store. The on-disk CSV transport log was +// retired — see pkg/transport/log.go and pkg/visor/stats. Each entry's +// RecvBytes/SentBytes is the within-day delta (matches the historical +// CSV semantics, where the per-day file accumulated bytes from a +// zero-anchored start-of-day baseline). Timestamp is the start of the +// UTC day, expressed as a Unix epoch second. func (v *Visor) GetTransportLogs(days int) ([]TransportLogEntry, error) { if days <= 0 { return nil, nil } - - logDir := v.conf.Transport.LogStore.Location - if logDir == "" { - return nil, fmt.Errorf("transport log store location not configured") - } - - var allEntries []TransportLogEntry - now := time.Now().UTC() - - // Read log files for the specified number of days - for i := 0; i < days; i++ { - date := now.AddDate(0, 0, -i) - filename := filepath.Join(logDir, fmt.Sprintf("%s.csv", date.Format("2006-01-02"))) - - entries, err := readTransportLogFile(filename) - if err != nil { - // Skip files that don't exist or can't be read - continue - } - allEntries = append(allEntries, entries...) + if v.statsTracker == nil { + return nil, errors.New("stats store not available (stats subsystem disabled or not yet initialized)") } - return allEntries, nil -} - -// csvEntry matches the transport.CsvEntry struct format for gocsv parsing. -type csvEntry struct { - TpID uuid.UUID `csv:"tp_id"` - RecvBytes *uint64 `csv:"recv"` - SentBytes *uint64 `csv:"sent"` - TimeStamp int64 `csv:"time_stamp"` -} - -// readTransportLogFile reads transport log entries from a CSV file. -func readTransportLogFile(filename string) ([]TransportLogEntry, error) { - f, err := os.Open(filename) //nolint:gosec // filename is from internal config + records, err := v.statsTracker.Store().AllTransportRecords() if err != nil { - return nil, err + return nil, fmt.Errorf("read stats store: %w", err) } - defer f.Close() //nolint:errcheck - var csvEntries []*csvEntry - if err := gocsv.UnmarshalFile(f, &csvEntries); err != nil { - // Handle empty file case - if errors.Is(err, gocsv.ErrEmptyCSVFile) { - return nil, nil - } - return nil, err - } + // Keep entries whose date is within the last `days` UTC days + // (inclusive of today). Lexical comparison works because the + // daily-rollup date format is YYYY-MM-DD. + cutoff := time.Now().UTC().AddDate(0, 0, -(days - 1)).Format("2006-01-02") - var entries []TransportLogEntry - for _, ce := range csvEntries { - var recv, sent uint64 - if ce.RecvBytes != nil { - recv = *ce.RecvBytes - } - if ce.SentBytes != nil { - sent = *ce.SentBytes + var out []TransportLogEntry + for _, rec := range records { + for _, d := range rec.Daily { + if d.Date < cutoff { + continue + } + ts, perr := time.Parse("2006-01-02", d.Date) + if perr != nil { + continue + } + out = append(out, TransportLogEntry{ + TpID: rec.ID, + RecvBytes: d.RecvBytes, + SentBytes: d.SentBytes, + Timestamp: ts.Unix(), + }) } - entries = append(entries, TransportLogEntry{ - TpID: ce.TpID, - RecvBytes: recv, - SentBytes: sent, - Timestamp: ce.TimeStamp, - }) } - - return entries, nil + return out, nil } // SetPersistentTransports sets min_hops routing config of visor diff --git a/pkg/visor/init_transport.go b/pkg/visor/init_transport.go index d5305c2601..d4512597e1 100644 --- a/pkg/visor/init_transport.go +++ b/pkg/visor/init_transport.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "path/filepath" "sync" "time" @@ -22,7 +21,6 @@ import ( "github.com/skycoin/skywire/pkg/logging" "github.com/skycoin/skywire/pkg/netutil" "github.com/skycoin/skywire/pkg/servicedisc" - "github.com/skycoin/skywire/pkg/skyenv" "github.com/skycoin/skywire/pkg/transport" "github.com/skycoin/skywire/pkg/transport/network" "github.com/skycoin/skywire/pkg/transport/network/addrresolver" @@ -298,29 +296,19 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { return err } - var logS transport.LogStore - if v.conf.Transport.LogStore.Type == visorconfig.MemoryLogStore { - logS = transport.InMemoryTransportLogStore() - } else if v.conf.Transport.LogStore.Type == visorconfig.FileLogStore { - logS, err = transport.FileTransportLogStore(ctx, v.conf.Transport.LogStore.Location, time.Duration(v.conf.Transport.LogStore.RotationInterval), log) - if err != nil { - return err - } - } else { - return fmt.Errorf("invalid store type: %v", v.conf.Transport.LogStore.Type) - } - - // Initialize latency log store (uses same type as transport log store) - var latencyLogS transport.LatencyLogStore - latencyLogDir := filepath.Join(filepath.Dir(v.conf.Transport.LogStore.Location), skyenv.LatencyLogStore) - if v.conf.Transport.LogStore.Type == visorconfig.MemoryLogStore { - latencyLogS = transport.InMemoryLatencyLogStore() - } else if v.conf.Transport.LogStore.Type == visorconfig.FileLogStore { - latencyLogS, err = transport.FileLatencyLogStore(ctx, latencyLogDir, time.Duration(v.conf.Transport.LogStore.RotationInterval), log) - if err != nil { - return err - } + // The on-disk CSV log store has been retired — historical + // bandwidth/latency now lives in the bbolt-backed stats store + // (pkg/visor/stats), reachable via /stats/transports/history and + // the GetTransportLogs RPC. This in-memory store only exists so + // a transport that closes and re-opens within the same visor + // session preserves its cumulative byte counters across the gap. + // LogStore.Type / LogStore.Location in config are no-ops; warn + // if a config still requests file-mode so operators notice. + if v.conf.Transport.LogStore.Type == visorconfig.FileLogStore { + log.Warn("transport.log_store.type=\"file\" is deprecated; the on-disk CSV log store has been retired. " + + "Historical bandwidth is now served from the bbolt stats store; this setting is now treated as \"memory\".") } + logS := transport.InMemoryTransportLogStore() pTps, err := v.conf.GetPersistentTransports() if err != nil { @@ -337,7 +325,6 @@ func initTransport(ctx context.Context, v *Visor, log *logging.Logger) error { SecKey: v.conf.SK, DiscoveryClient: tpdC, LogStore: logS, - LatencyLogStore: latencyLogS, PersistentTransportsCache: pTps, Version: buildinfo.Version(), ARTransportLimit: arLimit, diff --git a/pkg/visor/logserver/api.go b/pkg/visor/logserver/api.go index ce6e0a1f85..8f94ccc059 100644 --- a/pkg/visor/logserver/api.go +++ b/pkg/visor/logserver/api.go @@ -101,7 +101,13 @@ type API struct { } // New creates a new API. -func New(log *logging.Logger, tpLogPath, localPath, _ string, whitelistedPKs []cipher.PubKey, survey *visorconfig.Survey, printLog bool) *API { +// +// The unused string parameter (formerly the per-day transport-log CSV +// directory) is preserved for call-site compatibility — historical +// bandwidth is now served from the bbolt stats store via +// /stats/transports/history, and the /transport_logs/:file route has +// been removed along with the on-disk CSV log store. +func New(log *logging.Logger, _, localPath, _ string, whitelistedPKs []cipher.PubKey, survey *visorconfig.Survey, printLog bool) *API { api := &API{ logger: log, startedAt: time.Now(), @@ -172,18 +178,6 @@ func New(log *logging.Logger, tpLogPath, localPath, _ string, whitelistedPKs []c c.JSON(http.StatusOK, api.cxoFeedsLister.ListCXOFeeds()) }) - // Transport log files (auth'd) - authRoute.GET("/transport_logs/:file", func(c *gin.Context) { - if filepath.Ext(c.Param("file")) == ".csv" { - fpath := filepath.Join(tpLogPath, c.Param("file")) - if _, err := os.Stat(fpath); err == nil { - c.File(fpath) - return - } - } - c.Writer.WriteHeader(http.StatusNotFound) - }) - // Serve visor log file (auth'd) — written when visor runs with -s/--save-log authRoute.GET("/visor.log", func(c *gin.Context) { logFile := filepath.Join(localPath, "visor.log") @@ -301,14 +295,6 @@ func New(log *logging.Logger, tpLogPath, localPath, _ string, whitelistedPKs []c } } } - // List transport log files - if entries, err := os.ReadDir(tpLogPath); err == nil { - for _, e := range entries { - if !e.IsDir() && filepath.Ext(e.Name()) == ".csv" { - links = append(links, fmt.Sprintf(`/transport_logs/%s`, e.Name(), e.Name())) - } - } - } } // Add forwarded ports visible on the landing page. // Use the request Host to construct proper URLs that work