Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/skywire-cli/commands/route/calc.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ func newMemoryStoreFromEntries(entries []*transport.Entry) *memoryStore {
return &memoryStore{entries: entries, byEdge: byEdge}
}

func (s *memoryStore) GetTransportsByEdgeNoLatency(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
return s.GetTransportsByEdge(ctx, pk)
}

func (s *memoryStore) GetTransportsByEdge(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
if tps, ok := s.byEdge[pk]; ok {
return tps, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/route-finder/store/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *mockStore) DeregisterTransport(context.Context, uuid.UUID) error {
func (m *mockStore) GetTransportByID(context.Context, uuid.UUID) (*transport.Entry, error) {
return nil, nil
}
func (m *mockStore) GetTransportsByEdgeNoLatency(ctx context.Context, edgePK cipher.PubKey) ([]*transport.Entry, error) {
return m.GetTransportsByEdge(ctx, edgePK)
}

func (m *mockStore) GetTransportsByEdge(_ context.Context, edgePK cipher.PubKey) ([]*transport.Entry, error) {
trs, ok := m.transports[edgePK]
if !ok {
Expand Down
8 changes: 7 additions & 1 deletion pkg/transport-discovery/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,13 @@ func (api *API) mirrorEdges(ctx context.Context, edges map[cipher.PubKey]struct{
}
seq := uint64(time.Now().UnixNano()) //nolint:gosec
for edge := range edges {
entries, err := api.store.GetTransportsByEdge(ctx, edge)
// DHT consumers don't read the Latency field — skip the
// per-call MGET on lat:<id> + decode by using the
// no-latency variant. Production pprof showed mirrorEdges
// as 97.7% of GetTransportsByEdge alloc traffic, with
// hydrateDurableLatency contributing 10.5% of total
// alloc_objects.
entries, err := api.store.GetTransportsByEdgeNoLatency(ctx, edge)
if err != nil || len(entries) == 0 {
api.dhtMirror.Delete(edge)
continue
Expand Down
6 changes: 5 additions & 1 deletion pkg/transport-discovery/store/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func (s *memoryStore) GetTransportByID(_ context.Context, id uuid.UUID) (*transp
return v, nil
}

func (s *memoryStore) GetTransportsByEdge(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
func (s *memoryStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
return s.GetTransportsByEdgeNoLatency(ctx, pk)
}

func (s *memoryStore) GetTransportsByEdgeNoLatency(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
if s.err != nil {
return nil, s.err
}
Expand Down
59 changes: 47 additions & 12 deletions pkg/transport-discovery/store/redis_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -214,7 +213,41 @@ func (s *redisStore) GetTransportByID(ctx context.Context, id uuid.UUID) (*trans
return entry, nil
}

// GetTransportsByEdge returns every transport involving pk, with the
// durable latency overlay applied so HTTP responses include current
// min/max/avg from the lat:<id> keys.
//
// Callers that don't need latency (DHT mirroring, transport-count
// stats) should use GetTransportsByEdgeNoLatency to skip the per-call
// MGET on the latency keyspace and the JSON decode that follows.
// Production pprof traced 97.7% of GetTransportsByEdge invocations
// to mirrorEdges, where the DHT consumer doesn't read the latency
// field at all — that's the hot path the no-latency variant is for.
func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
entries, err := s.getTransportsByEdge(ctx, pk)
if err != nil {
return nil, err
}
s.hydrateDurableLatency(ctx, entries)
s.edgeCache.Put(pk, entries)
return entries, nil
}

// GetTransportsByEdgeNoLatency is the no-overlay sibling of
// GetTransportsByEdge. The returned entries' Latency field reflects
// what was in the tp:<id> blob (typically 0 — see #2418), not the
// durable lat:<id> store.
func (s *redisStore) GetTransportsByEdgeNoLatency(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
if entries, ok := s.edgeCache.Get(pk); ok {
return entries, nil
}
return s.getTransportsByEdge(ctx, pk)
}

// getTransportsByEdge does the MGET + decode without any latency
// hydration or cache write. Shared by both public methods so the
// fetch logic stays in one place.
func (s *redisStore) getTransportsByEdge(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
if entries, ok := s.edgeCache.Get(pk); ok {
return entries, nil
}
Expand All @@ -235,8 +268,8 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey)
idStr string
id uuid.UUID
}
var mappings []idMapping
var keys []string
mappings := make([]idMapping, 0, len(ids))
keys := make([]string, 0, len(ids))
for _, idStr := range ids {
id, err := uuid.Parse(idStr)
if err != nil {
Expand All @@ -256,7 +289,7 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey)
return nil, err
}

var entries []*transport.Entry
entries := make([]*transport.Entry, 0, len(vals))
var staleIDs []interface{}
for i, val := range vals {
raw, ok := val.(string)
Expand Down Expand Up @@ -288,9 +321,6 @@ func (s *redisStore) GetTransportsByEdge(ctx context.Context, pk cipher.PubKey)
if len(entries) == 0 {
return nil, ErrTransportNotFound
}

s.hydrateDurableLatency(ctx, entries)
s.edgeCache.Put(pk, entries)
return entries, nil
}

Expand Down Expand Up @@ -472,8 +502,13 @@ func (s *redisStore) scanAllTransports(ctx context.Context, selfTransports, with
// Maintained on Register/Deregister; replaces the pre-existing
// SCAN of the tp:* keyspace used by GetNumberOfTransports,
// scanAllTransports, and getP2PTransportCounts.
//
// Hot-path key builders use string concatenation rather than
// fmt.Sprintf. Sprintf was 9% of TPD's total alloc_objects in
// production pprof; concat compiles to a single buffer write and
// avoids the format-state machinery.
func (s *redisStore) allTpsIndexKey() string {
return fmt.Sprintf("%s:tp:_index", serviceName)
return serviceName + ":tp:_index"
}

// allTransportKeysFromIndex reads the transport-id index set and returns
Expand All @@ -486,7 +521,7 @@ func (s *redisStore) allTransportKeysFromIndex(ctx context.Context) (keys, ids [
}
keys = make([]string, len(ids))
for i, id := range ids {
keys[i] = fmt.Sprintf("%s:tp:%s", serviceName, id)
keys[i] = serviceName + ":tp:" + id
}
return keys, ids, nil
}
Expand All @@ -505,15 +540,15 @@ func (s *redisStore) maybeReapStaleTransports(stale []interface{}) {
}

func (s *redisStore) transportKey(id uuid.UUID) string {
return fmt.Sprintf("%s:tp:%s", serviceName, id.String())
return serviceName + ":tp:" + id.String()
}

func (s *redisStore) latencyKey(id uuid.UUID) string {
return fmt.Sprintf("%s:lat:%s", serviceName, id.String())
return serviceName + ":lat:" + id.String()
}

func (s *redisStore) edgeKey(pk cipher.PubKey) string {
return fmt.Sprintf("%s:edge:%s", serviceName, pk.Hex())
return serviceName + ":edge:" + pk.Hex()
}

// dataToEntry converts TransportData to Entry with full QoS metrics.
Expand Down
5 changes: 5 additions & 0 deletions pkg/transport-discovery/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ type TransportStore interface {
DeregisterTransport(context.Context, uuid.UUID) error
GetTransportByID(context.Context, uuid.UUID) (*transport.Entry, error)
GetTransportsByEdge(context.Context, cipher.PubKey) ([]*transport.Entry, error)
// GetTransportsByEdgeNoLatency is the cheap variant for callers
// (DHT mirror, transport-count stats) that don't need the
// durable latency overlay. Skips the per-call MGET on lat:<id>
// keys and the JSON decode that follows.
GetTransportsByEdgeNoLatency(context.Context, cipher.PubKey) ([]*transport.Entry, error)
GetNumberOfTransports(context.Context) (map[types.Type]int, error)
GetAllTransports(context.Context, bool) ([]*transport.Entry, error)
// Bandwidth ingest (called by the CXO aggregator with the
Expand Down
4 changes: 4 additions & 0 deletions pkg/visor/rpcgrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,10 @@ func newCalcMemStore(entries []*transport.Entry) *calcMemStore {
return &calcMemStore{byEdge: byEdge}
}

func (s *calcMemStore) GetTransportsByEdgeNoLatency(ctx context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
return s.GetTransportsByEdge(ctx, pk)
}

func (s *calcMemStore) GetTransportsByEdge(_ context.Context, pk cipher.PubKey) ([]*transport.Entry, error) {
if tps, ok := s.byEdge[pk]; ok {
return tps, nil
Expand Down
Loading