diff --git a/cmd/skywire-cli/commands/route/calc.go b/cmd/skywire-cli/commands/route/calc.go index 6728a15abb..d4653a2d99 100644 --- a/cmd/skywire-cli/commands/route/calc.go +++ b/cmd/skywire-cli/commands/route/calc.go @@ -520,6 +520,10 @@ func newMemoryStoreFromEntries(entries []*transport.Entry) *memoryStore { return &memoryStore{entries: entries, byEdge: byEdge} } +func (s *memoryStore) GetTransportsByEdgeNoLatency(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { + return s.GetTransportsByEdge(ctx, pk) +} + func (s *memoryStore) GetTransportsByEdge(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { if tps, ok := s.byEdge[pk]; ok { return tps, nil diff --git a/pkg/route-finder/store/graph_test.go b/pkg/route-finder/store/graph_test.go index 2ab5bdc0d4..bcc98b34bc 100644 --- a/pkg/route-finder/store/graph_test.go +++ b/pkg/route-finder/store/graph_test.go @@ -41,6 +41,10 @@ func (m *mockStore) DeregisterTransport(context.Context, uuid.UUID) error { func (m *mockStore) GetTransportByID(context.Context, uuid.UUID) (*transport.Entry, error) { return nil, nil } +func (m *mockStore) GetTransportsByEdgeNoLatency(ctx context.Context, edgePK cipher.PubKey) ([]*transport.Entry, error) { + return m.GetTransportsByEdge(ctx, edgePK) +} + func (m *mockStore) GetTransportsByEdge(_ context.Context, edgePK cipher.PubKey) ([]*transport.Entry, error) { trs, ok := m.transports[edgePK] if !ok { diff --git a/pkg/transport-discovery/api/api.go b/pkg/transport-discovery/api/api.go index 67b67c911a..94d19b2e04 100644 --- a/pkg/transport-discovery/api/api.go +++ b/pkg/transport-discovery/api/api.go @@ -295,7 +295,13 @@ func (api *API) mirrorEdges(ctx context.Context, edges map[cipher.PubKey]struct{ } seq := uint64(time.Now().UnixNano()) //nolint:gosec for edge := range edges { - entries, err := api.store.GetTransportsByEdge(ctx, edge) + // DHT consumers don't read the Latency field — skip the + // per-call MGET on lat: + decode by using the + // no-latency variant. Production pprof showed mirrorEdges + // as 97.7% of GetTransportsByEdge alloc traffic, with + // hydrateDurableLatency contributing 10.5% of total + // alloc_objects. + entries, err := api.store.GetTransportsByEdgeNoLatency(ctx, edge) if err != nil || len(entries) == 0 { api.dhtMirror.Delete(edge) continue diff --git a/pkg/transport-discovery/store/memory_store.go b/pkg/transport-discovery/store/memory_store.go index 5540ebcb1e..a3aeb4072c 100644 --- a/pkg/transport-discovery/store/memory_store.go +++ b/pkg/transport-discovery/store/memory_store.go @@ -92,7 +92,11 @@ func (s *memoryStore) GetTransportByID(_ context.Context, id uuid.UUID) (*transp return v, nil } -func (s *memoryStore) GetTransportsByEdge(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { +func (s *memoryStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { + return s.GetTransportsByEdgeNoLatency(ctx, pk) +} + +func (s *memoryStore) GetTransportsByEdgeNoLatency(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { if s.err != nil { return nil, s.err } diff --git a/pkg/transport-discovery/store/redis_transport.go b/pkg/transport-discovery/store/redis_transport.go index 5f44a18288..b2938cdf74 100644 --- a/pkg/transport-discovery/store/redis_transport.go +++ b/pkg/transport-discovery/store/redis_transport.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "time" "github.com/go-redis/redis/v8" @@ -214,7 +213,41 @@ func (s *redisStore) GetTransportByID(ctx context.Context, id uuid.UUID) (*trans return entry, nil } +// GetTransportsByEdge returns every transport involving pk, with the +// durable latency overlay applied so HTTP responses include current +// min/max/avg from the lat: keys. +// +// Callers that don't need latency (DHT mirroring, transport-count +// stats) should use GetTransportsByEdgeNoLatency to skip the per-call +// MGET on the latency keyspace and the JSON decode that follows. +// Production pprof traced 97.7% of GetTransportsByEdge invocations +// to mirrorEdges, where the DHT consumer doesn't read the latency +// field at all — that's the hot path the no-latency variant is for. func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { + entries, err := s.getTransportsByEdge(ctx, pk) + if err != nil { + return nil, err + } + s.hydrateDurableLatency(ctx, entries) + s.edgeCache.Put(pk, entries) + return entries, nil +} + +// GetTransportsByEdgeNoLatency is the no-overlay sibling of +// GetTransportsByEdge. The returned entries' Latency field reflects +// what was in the tp: blob (typically 0 — see #2418), not the +// durable lat: store. +func (s *redisStore) GetTransportsByEdgeNoLatency(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { + if entries, ok := s.edgeCache.Get(pk); ok { + return entries, nil + } + return s.getTransportsByEdge(ctx, pk) +} + +// getTransportsByEdge does the MGET + decode without any latency +// hydration or cache write. Shared by both public methods so the +// fetch logic stays in one place. +func (s *redisStore) getTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { if entries, ok := s.edgeCache.Get(pk); ok { return entries, nil } @@ -235,8 +268,8 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) idStr string id uuid.UUID } - var mappings []idMapping - var keys []string + mappings := make([]idMapping, 0, len(ids)) + keys := make([]string, 0, len(ids)) for _, idStr := range ids { id, err := uuid.Parse(idStr) if err != nil { @@ -256,7 +289,7 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) return nil, err } - var entries []*transport.Entry + entries := make([]*transport.Entry, 0, len(vals)) var staleIDs []interface{} for i, val := range vals { raw, ok := val.(string) @@ -288,9 +321,6 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) if len(entries) == 0 { return nil, ErrTransportNotFound } - - s.hydrateDurableLatency(ctx, entries) - s.edgeCache.Put(pk, entries) return entries, nil } @@ -472,8 +502,13 @@ func (s *redisStore) scanAllTransports(ctx context.Context, selfTransports, with // Maintained on Register/Deregister; replaces the pre-existing // SCAN of the tp:* keyspace used by GetNumberOfTransports, // scanAllTransports, and getP2PTransportCounts. +// +// Hot-path key builders use string concatenation rather than +// fmt.Sprintf. Sprintf was 9% of TPD's total alloc_objects in +// production pprof; concat compiles to a single buffer write and +// avoids the format-state machinery. func (s *redisStore) allTpsIndexKey() string { - return fmt.Sprintf("%s:tp:_index", serviceName) + return serviceName + ":tp:_index" } // allTransportKeysFromIndex reads the transport-id index set and returns @@ -486,7 +521,7 @@ func (s *redisStore) allTransportKeysFromIndex(ctx context.Context) (keys, ids [ } keys = make([]string, len(ids)) for i, id := range ids { - keys[i] = fmt.Sprintf("%s:tp:%s", serviceName, id) + keys[i] = serviceName + ":tp:" + id } return keys, ids, nil } @@ -505,15 +540,15 @@ func (s *redisStore) maybeReapStaleTransports(stale []interface{}) { } func (s *redisStore) transportKey(id uuid.UUID) string { - return fmt.Sprintf("%s:tp:%s", serviceName, id.String()) + return serviceName + ":tp:" + id.String() } func (s *redisStore) latencyKey(id uuid.UUID) string { - return fmt.Sprintf("%s:lat:%s", serviceName, id.String()) + return serviceName + ":lat:" + id.String() } func (s *redisStore) edgeKey(pk cipher.PubKey) string { - return fmt.Sprintf("%s:edge:%s", serviceName, pk.Hex()) + return serviceName + ":edge:" + pk.Hex() } // dataToEntry converts TransportData to Entry with full QoS metrics. diff --git a/pkg/transport-discovery/store/store.go b/pkg/transport-discovery/store/store.go index 1cb0037532..efc41f7462 100644 --- a/pkg/transport-discovery/store/store.go +++ b/pkg/transport-discovery/store/store.go @@ -177,6 +177,11 @@ type TransportStore interface { DeregisterTransport(context.Context, uuid.UUID) error GetTransportByID(context.Context, uuid.UUID) (*transport.Entry, error) GetTransportsByEdge(context.Context, cipher.PubKey) ([]*transport.Entry, error) + // GetTransportsByEdgeNoLatency is the cheap variant for callers + // (DHT mirror, transport-count stats) that don't need the + // durable latency overlay. Skips the per-call MGET on lat: + // keys and the JSON decode that follows. + GetTransportsByEdgeNoLatency(context.Context, cipher.PubKey) ([]*transport.Entry, error) GetNumberOfTransports(context.Context) (map[types.Type]int, error) GetAllTransports(context.Context, bool) ([]*transport.Entry, error) // Bandwidth ingest (called by the CXO aggregator with the diff --git a/pkg/visor/rpcgrpc/server.go b/pkg/visor/rpcgrpc/server.go index 2f8b733e58..68b172c6a6 100644 --- a/pkg/visor/rpcgrpc/server.go +++ b/pkg/visor/rpcgrpc/server.go @@ -845,6 +845,10 @@ func newCalcMemStore(entries []*transport.Entry) *calcMemStore { return &calcMemStore{byEdge: byEdge} } +func (s *calcMemStore) GetTransportsByEdgeNoLatency(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { + return s.GetTransportsByEdge(ctx, pk) +} + func (s *calcMemStore) GetTransportsByEdge(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) { if tps, ok := s.byEdge[pk]; ok { return tps, nil