From 88f9e17815927fe3208c300c5700cd337734ec4e Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 26 Jan 2025 00:12:19 +0100 Subject: [PATCH 1/2] add prometheus metrics --- clients/consensus/pool.go | 50 ++++++++++- clients/execution/pool.go | 41 ++++++++- cmd/dora-explorer/main.go | 13 +++ go.mod | 3 +- go.sum | 2 + indexer/beacon/debug.go | 18 ++-- indexer/beacon/epochcache.go | 4 +- indexer/beacon/epochvotes.go | 4 +- indexer/beacon/indexer.go | 2 + indexer/beacon/metrics.go | 170 +++++++++++++++++++++++++++++++++++ metrics/metrics.go | 73 +++++++++++++++ types/config.go | 7 ++ 12 files changed, 368 insertions(+), 19 deletions(-) create mode 100644 indexer/beacon/metrics.go create mode 100644 metrics/metrics.go diff --git a/clients/consensus/pool.go b/clients/consensus/pool.go index a68a1e0e..ee588efd 100644 --- a/clients/consensus/pool.go +++ b/clients/consensus/pool.go @@ -5,7 +5,10 @@ import ( "time" v1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/ethpandaops/dora/metrics" "github.com/ethpandaops/ethwallclock" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" "golang.org/x/exp/rand" ) @@ -19,12 +22,16 @@ type Pool struct { } func NewPool(ctx context.Context, logger logrus.FieldLogger) *Pool { - return &Pool{ + pool := &Pool{ ctx: ctx, logger: logger, clients: make([]*Client, 0), chainState: newChainState(), } + + pool.registerMetrics() + + return pool } func (pool *Pool) SubscribeFinalizedEvent(capacity int) *Subscription[*v1.Finality] { @@ -97,3 +104,44 @@ func (pool *Pool) AwaitReadyEndpoint(ctx context.Context, clientType ClientType) } } } + +func (pool *Pool) registerMetrics() { + clientCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_pool_clients", + Help: "Number of consensus clients", + }) + onlineCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_pool_clients_online", + Help: "Number of consensus clients online", + }) + syncingCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_pool_clients_syncing", + Help: "Number of consensus clients syncing", + }) + optimisticCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_pool_clients_optimistic", + Help: "Number of consensus clients optimistic", + }) + + metrics.AddPreCollectFn(func() { + online := 0 + syncing := 0 + optimistic := 0 + for _, client := range pool.clients { + if client.isOnline { + online++ + } + if client.isSyncing { + syncing++ + } + if client.isOptimistic { + optimistic++ + } + } + + clientCountGauge.Set(float64(len(pool.clients))) + onlineCountGauge.Set(float64(online)) + syncingCountGauge.Set(float64(syncing)) + optimisticCountGauge.Set(float64(optimistic)) + }) +} diff --git a/clients/execution/pool.go b/clients/execution/pool.go index 5ee66008..41437a5e 100644 --- a/clients/execution/pool.go +++ b/clients/execution/pool.go @@ -5,6 +5,9 @@ import ( "math/rand/v2" "time" + "github.com/ethpandaops/dora/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" ) @@ -17,12 +20,16 @@ type Pool struct { } func NewPool(ctx context.Context, logger logrus.FieldLogger) *Pool { - return &Pool{ + pool := &Pool{ ctx: ctx, logger: logger, clients: make([]*Client, 0), chainState: newChainState(), } + + pool.registerMetrics() + + return pool } func (pool *Pool) GetChainState() *ChainState { @@ -92,3 +99,35 @@ func (pool *Pool) AwaitReadyEndpoint(ctx context.Context, clientType ClientType) } } } + +func (pool *Pool) registerMetrics() { + clientCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_el_pool_clients", + Help: "Number of execution clients", + }) + onlineCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_el_pool_clients_online", + Help: "Number of execution clients online", + }) + syncingCountGauge := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_el_pool_clients_syncing", + Help: "Number of execution clients syncing", + }) + + metrics.AddPreCollectFn(func() { + online := 0 + syncing := 0 + for _, client := range pool.clients { + if client.isOnline { + online++ + } + if client.isSyncing { + syncing++ + } + } + + clientCountGauge.Set(float64(len(pool.clients))) + onlineCountGauge.Set(float64(online)) + syncingCountGauge.Set(float64(syncing)) + }) +} diff --git a/cmd/dora-explorer/main.go b/cmd/dora-explorer/main.go index d5d771e7..b7001249 100644 --- a/cmd/dora-explorer/main.go +++ b/cmd/dora-explorer/main.go @@ -15,6 +15,7 @@ import ( "github.com/ethpandaops/dora/db" "github.com/ethpandaops/dora/handlers" + "github.com/ethpandaops/dora/metrics" "github.com/ethpandaops/dora/services" "github.com/ethpandaops/dora/static" "github.com/ethpandaops/dora/types" @@ -66,6 +67,13 @@ func main() { } } + if cfg.Metrics.Enabled && !cfg.Metrics.Public { + err = metrics.StartMetricsServer(logger.WithField("module", "metrics"), cfg.Metrics.Host, cfg.Metrics.Port) + if err != nil { + logger.Fatalf("error starting metrics server: %v", err) + } + } + err = services.GlobalBeaconService.StartService() if err != nil { logger.Fatalf("error starting beacon service: %v", err) @@ -176,6 +184,11 @@ func startFrontend(webserver *http.Server) { // add pprof handler router.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) router.HandleFunc("/debug/cache", handlers.DebugCache).Methods("GET") + router.Handle("/debug/metrics", metrics.GetMetricsHandler()) + } + + if utils.Config.Metrics.Enabled && utils.Config.Metrics.Public { + router.Handle("/metrics", metrics.GetMetricsHandler()) } if utils.Config.Frontend.Debug { diff --git a/go.mod b/go.mod index 52eedc4f..ca4686c8 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( require ( github.com/ipfs/go-cid v0.4.1 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mr-tron/base58 v1.2.0 // indirect @@ -116,7 +117,7 @@ require ( github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.20.0 // indirect + github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect diff --git a/go.sum b/go.sum index b54ab8ba..ad0841ae 100644 --- a/go.sum +++ b/go.sum @@ -357,6 +357,8 @@ github.com/pressly/goose/v3 v3.24.1 h1:bZmxRco2uy5uu5Ng1MMVEfYsFlrMJI+e/VMXHQ3C4 github.com/pressly/goose/v3 v3.24.1/go.mod h1:rEWreU9uVtt0DHCyLzF9gRcWiiTF/V+528DV+4DORug= github.com/prometheus/client_golang v1.20.0 h1:jBzTZ7B099Rg24tny+qngoynol8LtVYlA2bqx3vEloI= github.com/prometheus/client_golang v1.20.0/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= +github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= diff --git a/indexer/beacon/debug.go b/indexer/beacon/debug.go index 4fa749d8..4191dd7c 100644 --- a/indexer/beacon/debug.go +++ b/indexer/beacon/debug.go @@ -19,15 +19,13 @@ type CacheDebugStats struct { BlockSize uint64 } EpochCache struct { - StatsMap CacheDebugMapSize - StateMap CacheDebugMapSize - StatsFull uint64 - StatsPrecalc uint64 - StatsPruned uint64 - StateLoaded uint64 - VotesCacheLen uint64 - VotesCacheHit uint64 - VotesCacheMiss uint64 + StatsMap CacheDebugMapSize + StateMap CacheDebugMapSize + StatsFull uint64 + StatsPrecalc uint64 + StatsPruned uint64 + StateLoaded uint64 + VotesCacheLen uint64 } ForkCache struct { ForkMap CacheDebugMapSize @@ -126,8 +124,6 @@ func (indexer *Indexer) getEpochCacheDebugStats(cacheStats *CacheDebugStats) { } cacheStats.EpochCache.VotesCacheLen = uint64(indexer.epochCache.votesCache.Len()) - cacheStats.EpochCache.VotesCacheHit = indexer.epochCache.votesCacheHit - cacheStats.EpochCache.VotesCacheMiss = indexer.epochCache.votesCacheMiss } func (indexer *Indexer) getForkCacheDebugStats(cacheStats *CacheDebugStats) { diff --git a/indexer/beacon/epochcache.go b/indexer/beacon/epochcache.go index edbdf5cd..9bc6c422 100644 --- a/indexer/beacon/epochcache.go +++ b/indexer/beacon/epochcache.go @@ -41,9 +41,7 @@ type epochCache struct { syncCache []phase0.ValidatorIndex // global sync committee cache for reuse if matching precomputeLock sync.Mutex // mutex to prevent concurrent precomputing of epoch stats - votesCache *lru.Cache[epochVotesKey, *EpochVotes] // cache for epoch vote aggregations - votesCacheHit uint64 - votesCacheMiss uint64 + votesCache *lru.Cache[epochVotesKey, *EpochVotes] // cache for epoch vote aggregations } // newEpochCache creates & returns a new instance of epochCache. diff --git a/indexer/beacon/epochvotes.go b/indexer/beacon/epochvotes.go index 39d3f4b0..71897cb1 100644 --- a/indexer/beacon/epochvotes.go +++ b/indexer/beacon/epochvotes.go @@ -69,12 +69,12 @@ func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *cons votesKey := getEpochVotesKey(epoch, targetRoot, blocks[len(blocks)-1].Root, uint8(len(blocks)), votesWithValues, votesWithPrecalc) if cachedVotes, isOk := indexer.epochCache.votesCache.Get(votesKey); isOk { - indexer.epochCache.votesCacheHit++ + indexer.metrics.epochCacheVotesCacheHit.Inc() return cachedVotes } votes := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats) - indexer.epochCache.votesCacheMiss++ + indexer.metrics.epochCacheVotesCacheMiss.Inc() return votes } diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index a66bf632..5826cf76 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -29,6 +29,7 @@ type Indexer struct { consensusPool *consensus.Pool dynSsz *dynssz.DynSsz synchronizer *synchronizer + metrics *beaconMetrics // configuration disableSync bool @@ -100,6 +101,7 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index backfillCompleteChan: make(chan bool), } + indexer.metrics = indexer.registerMetrics() indexer.blockCache = newBlockCache(indexer) indexer.epochCache = newEpochCache(indexer) indexer.forkCache = newForkCache(indexer) diff --git a/indexer/beacon/metrics.go b/indexer/beacon/metrics.go new file mode 100644 index 00000000..bb64c6bb --- /dev/null +++ b/indexer/beacon/metrics.go @@ -0,0 +1,170 @@ +package beacon + +import ( + "github.com/ethpandaops/dora/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type beaconMetrics struct { + indexer *Indexer + blockCacheSlotMapSize prometheus.Gauge + blockCacheRootMapSize prometheus.Gauge + blockCacheParentMapSize prometheus.Gauge + blockCacheExecBlockMapSize prometheus.Gauge + blockCacheBlockHeaderCount prometheus.Gauge + blockCacheBlockBodyCount prometheus.Gauge + blockCacheBlockIndexCount prometheus.Gauge + + epochCacheStatsMapSize prometheus.Gauge + epochCacheStateMapSize prometheus.Gauge + epochCacheStatsFullCount prometheus.Gauge + epochCacheStatsPrecalcCount prometheus.Gauge + epochCacheStatsPrunedCount prometheus.Gauge + epochCacheStateLoadedCount prometheus.Gauge + epochCacheVotesCacheLen prometheus.Gauge + epochCacheVotesCacheHit prometheus.Counter + epochCacheVotesCacheMiss prometheus.Counter +} + +func (indexer *Indexer) registerMetrics() *beaconMetrics { + beaconMetrics := &beaconMetrics{ + indexer: indexer, + blockCacheSlotMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_slot_map_size", + Help: "Number of entries in the block cache slot map", + }), + blockCacheRootMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_root_map_size", + Help: "Number of entries in the block cache root map", + }), + blockCacheParentMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_parent_map_size", + Help: "Number of entries in the block cache parent map", + }), + blockCacheExecBlockMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_exec_block_map_size", + Help: "Number of entries in the block cache exec block map", + }), + blockCacheBlockHeaderCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_block_header_count", + Help: "Number of blocks with a block header", + }), + blockCacheBlockBodyCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_block_body_count", + Help: "Number of blocks with a block body", + }), + blockCacheBlockIndexCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_block_cache_block_index_count", + Help: "Number of blocks with a block index", + }), + epochCacheStatsMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_stats_map_size", + Help: "Number of entries in the epoch cache stats map", + }), + epochCacheStateMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_state_map_size", + Help: "Number of entries in the epoch cache state map", + }), + epochCacheStatsFullCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_stats_full_count", + Help: "Number of full epoch cache stats", + }), + epochCacheStatsPrecalcCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_stats_precalc_count", + Help: "Number of precalculated epoch cache stats", + }), + epochCacheStatsPrunedCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_stats_pruned_count", + Help: "Number of pruned epoch cache stats", + }), + epochCacheStateLoadedCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_state_loaded_count", + Help: "Number of loaded epoch cache state", + }), + epochCacheVotesCacheLen: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_epoch_cache_votes_cache_len", + Help: "Number of entries in the epoch cache votes cache", + }), + epochCacheVotesCacheHit: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_epoch_cache_votes_cache_hit", + Help: "Number of hits in the epoch cache votes cache", + }), + epochCacheVotesCacheMiss: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_epoch_cache_votes_cache_miss", + Help: "Number of misses in the epoch cache votes cache", + }), + } + + metrics.AddPreCollectFn(func() { + beaconMetrics.updateBlockCacheMetrics() + beaconMetrics.updateEpochCacheMetrics() + }) + + return beaconMetrics +} + +func (metrics *beaconMetrics) updateBlockCacheMetrics() { + metrics.indexer.blockCache.cacheMutex.RLock() + defer metrics.indexer.blockCache.cacheMutex.RUnlock() + + metrics.blockCacheSlotMapSize.Set(float64(len(metrics.indexer.blockCache.slotMap))) + metrics.blockCacheRootMapSize.Set(float64(len(metrics.indexer.blockCache.rootMap))) + metrics.blockCacheParentMapSize.Set(float64(len(metrics.indexer.blockCache.parentMap))) + metrics.blockCacheExecBlockMapSize.Set(float64(len(metrics.indexer.blockCache.execBlockMap))) + + blockHeaderCount := 0 + blockBodyCount := 0 + blockIndexCount := 0 + for _, block := range metrics.indexer.blockCache.rootMap { + if block.header != nil { + blockHeaderCount++ + } + if block.block != nil { + blockBodyCount++ + } + if block.blockIndex != nil { + blockIndexCount++ + } + } + + metrics.blockCacheBlockHeaderCount.Set(float64(blockHeaderCount)) + metrics.blockCacheBlockBodyCount.Set(float64(blockBodyCount)) + metrics.blockCacheBlockIndexCount.Set(float64(blockIndexCount)) +} + +func (metrics *beaconMetrics) updateEpochCacheMetrics() { + metrics.indexer.epochCache.cacheMutex.RLock() + defer metrics.indexer.epochCache.cacheMutex.RUnlock() + + metrics.epochCacheStatsMapSize.Set(float64(len(metrics.indexer.epochCache.statsMap))) + metrics.epochCacheStateMapSize.Set(float64(len(metrics.indexer.epochCache.stateMap))) + + fullStatsCount := 0 + precalcStatsCount := 0 + prunedStatsCount := 0 + for _, stats := range metrics.indexer.epochCache.statsMap { + if stats.values != nil { + fullStatsCount++ + } + if stats.precalcValues != nil { + precalcStatsCount++ + } + if stats.prunedValues != nil { + prunedStatsCount++ + } + } + + stateLoadedCount := 0 + for _, state := range metrics.indexer.epochCache.stateMap { + if state.loadingStatus == 2 { + stateLoadedCount++ + } + } + + metrics.epochCacheStatsFullCount.Set(float64(fullStatsCount)) + metrics.epochCacheStatsPrecalcCount.Set(float64(precalcStatsCount)) + metrics.epochCacheStatsPrunedCount.Set(float64(prunedStatsCount)) + metrics.epochCacheStateLoadedCount.Set(float64(stateLoadedCount)) + metrics.epochCacheVotesCacheLen.Set(float64(metrics.indexer.epochCache.votesCache.Len())) +} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..8f225c74 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,73 @@ +package metrics + +import ( + "net" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +type Metrics struct { + preCollectFns []func() +} + +type MetricsHandler struct { + handler http.Handler + lastCollectTime time.Time +} + +var metrics *Metrics = &Metrics{ + preCollectFns: []func(){}, +} + +func AddPreCollectFn(fn func()) { + metrics.preCollectFns = append(metrics.preCollectFns, fn) +} + +func StartMetricsServer(logger logrus.FieldLogger, host string, port string) error { + if host == "" { + host = "127.0.0.1" + } + if port == "" { + port = "9090" + } + + srv := &http.Server{ + Addr: host + ":" + port, + Handler: promhttp.Handler(), + } + + listener, err := net.Listen("tcp", srv.Addr) + if err != nil { + return err + } + + go func() { + logger.Infof("metrics server listening on %v", srv.Addr) + if err := srv.Serve(listener); err != nil { + logger.WithError(err).Fatal("Error serving metrics") + } + }() + + return nil +} + +func GetMetricsHandler() http.Handler { + return &MetricsHandler{ + handler: promhttp.Handler(), + lastCollectTime: time.Now(), + } +} + +func (mh *MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if time.Since(mh.lastCollectTime) > 1*time.Second { + for _, fn := range metrics.preCollectFns { + fn() + } + mh.lastCollectTime = time.Now() + } + + mh.handler.ServeHTTP(w, r) +} diff --git a/types/config.go b/types/config.go index ec9d86ab..b9761c37 100644 --- a/types/config.go +++ b/types/config.go @@ -17,6 +17,13 @@ type Config struct { Host string `yaml:"host" envconfig:"FRONTEND_SERVER_HOST"` } `yaml:"server"` + Metrics struct { + Enabled bool `yaml:"enabled" envconfig:"METRICS_ENABLED"` + Public bool `yaml:"public" envconfig:"METRICS_PUBLIC"` + Port string `yaml:"port" envconfig:"METRICS_PORT"` + Host string `yaml:"host" envconfig:"METRICS_HOST"` + } `yaml:"metrics"` + Chain struct { DisplayName string `yaml:"displayName" envconfig:"CHAIN_DISPLAY_NAME"` From f3e8f74ccc0ee58f234e10d5b2df4575997359cc Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 26 Jan 2025 01:13:45 +0100 Subject: [PATCH 2/2] add more metrics --- indexer/beacon/client.go | 4 + indexer/beacon/debug.go | 15 +-- indexer/beacon/epochstate.go | 4 + indexer/beacon/epochstats.go | 11 +- indexer/beacon/epochvotes.go | 2 + indexer/beacon/finalization.go | 9 +- indexer/beacon/forkcache.go | 30 ++--- indexer/beacon/metrics.go | 219 +++++++++++++++++++++++++++++++++ indexer/beacon/pruning.go | 5 + services/callratelimiter.go | 24 ++++ services/frontendcache.go | 34 +++++ 11 files changed, 326 insertions(+), 31 deletions(-) diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index c3733051..def4f18e 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -127,6 +127,10 @@ func (c *Client) emitBlockLogEntry(slot phase0.Slot, root phase0.Root, source st if isNew { c.logger.Infof("received block %v:%v [0x%x] %v %v fork: %v", chainState.EpochOfSlot(slot), slot, root[:], source, processingTimesStr, forkId) + + c.indexer.metrics.blockLoadDuration.Observe(float64(processingTimes[0].Milliseconds())) + c.indexer.metrics.blockProcessDuration.Observe(float64(processingTimes[1].Milliseconds())) + c.indexer.metrics.blockStoreDuration.Observe(float64(processingTimes[2].Milliseconds())) } else { c.logger.Debugf("received known block %v:%v [0x%x] %v %v fork: %v", chainState.EpochOfSlot(slot), slot, root[:], source, processingTimesStr, forkId) } diff --git a/indexer/beacon/debug.go b/indexer/beacon/debug.go index 4d7fc8ed..fd2bd6f1 100644 --- a/indexer/beacon/debug.go +++ b/indexer/beacon/debug.go @@ -28,13 +28,9 @@ type CacheDebugStats struct { VotesCacheLen uint64 } ForkCache struct { - ForkMap CacheDebugMapSize - ParentIdCacheLen uint64 - ParentIdCacheHit uint64 - ParentIdCacheMiss uint64 - ParentIdsCacheLen uint64 - ParentIdsCacheHit uint64 - ParentIdsCacheMiss uint64 + ForkMap CacheDebugMapSize + ParentIdCacheLen uint64 + ParentIdsCacheLen uint64 } ValidatorCache struct { Validators uint64 @@ -139,12 +135,7 @@ func (indexer *Indexer) getForkCacheDebugStats(cacheStats *CacheDebugStats) { } cacheStats.ForkCache.ParentIdCacheLen = uint64(indexer.forkCache.parentIdCache.Len()) - cacheStats.ForkCache.ParentIdCacheHit = indexer.forkCache.parentIdCacheHit - cacheStats.ForkCache.ParentIdCacheMiss = indexer.forkCache.parentIdCacheMiss - cacheStats.ForkCache.ParentIdsCacheLen = uint64(indexer.forkCache.parentIdsCache.Len()) - cacheStats.ForkCache.ParentIdsCacheHit = indexer.forkCache.parentIdsCacheHit - cacheStats.ForkCache.ParentIdsCacheMiss = indexer.forkCache.parentIdsCacheMiss } func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats) { diff --git a/indexer/beacon/epochstate.go b/indexer/beacon/epochstate.go index fde0d5fe..63c17c29 100644 --- a/indexer/beacon/epochstate.go +++ b/indexer/beacon/epochstate.go @@ -114,11 +114,15 @@ func (s *epochState) loadState(ctx context.Context, client *Client, cache *epoch s.stateRoot = blockHeader.Message.StateRoot + t1 := time.Now() resState, err := LoadBeaconState(ctx, client, blockHeader.Message.StateRoot) if err != nil { return nil, err } + client.indexer.metrics.epochStateLoadDuration.Observe(float64(time.Since(t1).Milliseconds())) + client.indexer.metrics.epochStateLoadCount.Inc() + err = s.processState(resState, cache) if err != nil { return nil, err diff --git a/indexer/beacon/epochstats.go b/indexer/beacon/epochstats.go index 77f27189..6b6e1d08 100644 --- a/indexer/beacon/epochstats.go +++ b/indexer/beacon/epochstats.go @@ -412,6 +412,9 @@ func (es *EpochStats) processState(indexer *Indexer, validatorSet []*phase0.Vali DutiesSSZ: packedSsz, } + t1dur := time.Since(t1) + t1 = time.Now() + err = db.RunDBTransaction(func(tx *sqlx.Tx) error { return db.InsertUnfinalizedDuty(dbDuty, tx) }) @@ -419,8 +422,14 @@ func (es *EpochStats) processState(indexer *Indexer, validatorSet []*phase0.Vali indexer.logger.WithError(err).Errorf("failed storing epoch %v stats (%v / %v) to unfinalized duties", es.epoch, es.dependentRoot.String(), es.dependentState.stateRoot.String()) } + t2dur := time.Since(t1) + es.isInDb = true + indexer.metrics.epochStatsProcessDuration.Observe(float64(t1dur.Milliseconds())) + indexer.metrics.epochStatsStoreDuration.Observe(float64(t2dur.Milliseconds())) + indexer.metrics.epochStatsPackedSize.Observe(float64(len(packedSsz))) + indexer.logger.Infof( "processed epoch %v stats (root: %v / state: %v, validators: %v/%v, %v ms), %v bytes", es.epoch, @@ -428,7 +437,7 @@ func (es *EpochStats) processState(indexer *Indexer, validatorSet []*phase0.Vali es.dependentState.stateRoot.String(), values.ActiveValidators, len(validatorSet), - time.Since(t1).Milliseconds(), + (t1dur + t2dur).Milliseconds(), len(packedSsz), ) diff --git a/indexer/beacon/epochvotes.go b/indexer/beacon/epochvotes.go index 71897cb1..9859ecd5 100644 --- a/indexer/beacon/epochvotes.go +++ b/indexer/beacon/epochvotes.go @@ -233,6 +233,8 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:]) indexer.epochCache.votesCache.Add(votesKey, votes) + indexer.metrics.epochVoteAggregateDuration.Observe(float64(time.Since(t1).Milliseconds())) + return votes } diff --git a/indexer/beacon/finalization.go b/indexer/beacon/finalization.go index 9aae391c..0d29776d 100644 --- a/indexer/beacon/finalization.go +++ b/indexer/beacon/finalization.go @@ -400,8 +400,15 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R indexer.blockCache.removeBlock(block) } + t3dur := time.Since(t1) + + indexer.metrics.finalizationLoadDuration.Observe(float64(t1loading.Milliseconds())) + indexer.metrics.finalizationProcessDuration.Observe(float64(t1dur.Milliseconds())) + indexer.metrics.finalizationStoreDuration.Observe(float64(t2dur.Milliseconds())) + indexer.metrics.finalizationCleanDuration.Observe(float64(t3dur.Milliseconds())) + // log summary - indexer.logger.Infof("completed epoch %v finalization (process: %v ms, load: %v s, write: %v ms, clean: %v ms)", epoch, t1dur.Milliseconds(), t1loading.Seconds(), t2dur.Milliseconds(), time.Since(t1).Milliseconds()) + indexer.logger.Infof("completed epoch %v finalization (process: %v ms, load: %v s, write: %v ms, clean: %v ms)", epoch, t1dur.Milliseconds(), t1loading.Seconds(), t2dur.Milliseconds(), t3dur.Milliseconds()) indexer.logger.Infof("epoch %v blocks: %v canonical, %v orphaned", epoch, len(canonicalBlocks), len(orphanedBlocks)) if epochStatsValues != nil { indexer.logger.Infof("epoch %v stats: %v validators (%v ETH)", epoch, epochStatsValues.ActiveValidators, epochStatsValues.EffectiveBalance/EtherGweiFactor) diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index 195e8734..6e1b2bde 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -15,18 +15,14 @@ import ( // forkCache is a struct that represents the fork cache in the indexer. type forkCache struct { - indexer *Indexer - cacheMutex sync.RWMutex - forkMap map[ForkKey]*Fork - finalizedForkId ForkKey - lastForkId ForkKey - parentIdCache *lru.Cache[ForkKey, ForkKey] - parentIdCacheHit uint64 - parentIdCacheMiss uint64 - parentIdsCache *lru.Cache[ForkKey, []ForkKey] - parentIdsCacheHit uint64 - parentIdsCacheMiss uint64 - forkProcessLock sync.Mutex + indexer *Indexer + cacheMutex sync.RWMutex + forkMap map[ForkKey]*Fork + finalizedForkId ForkKey + lastForkId ForkKey + parentIdCache *lru.Cache[ForkKey, ForkKey] + parentIdsCache *lru.Cache[ForkKey, []ForkKey] + forkProcessLock sync.Mutex } // newForkCache creates a new instance of the forkCache struct. @@ -126,7 +122,7 @@ func (cache *forkCache) removeFork(forkId ForkKey) { func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey { parentForks, isCached := cache.parentIdsCache.Get(forkId) if isCached { - cache.parentIdsCacheHit++ + cache.indexer.metrics.forkCacheParentIdsCacheHit.Inc() return parentForks } @@ -135,25 +131,25 @@ func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey { for parentForkId > 1 { if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached { - cache.parentIdCacheHit++ + cache.indexer.metrics.forkCacheParentIdCacheHit.Inc() parentForkId = cachedParent } else if parentFork := cache.getForkById(parentForkId); parentFork != nil { parentForkId = parentFork.parentFork } else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil { cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(dbFork.ParentFork)) parentForkId = ForkKey(dbFork.ParentFork) - cache.parentIdCacheMiss++ + cache.indexer.metrics.forkCacheParentIdCacheMiss.Inc() } else { cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(0)) parentForkId = 0 - cache.parentIdCacheMiss++ + cache.indexer.metrics.forkCacheParentIdCacheMiss.Inc() } parentForks = append(parentForks, parentForkId) } cache.parentIdsCache.Add(forkId, parentForks) - cache.parentIdsCacheMiss++ + cache.indexer.metrics.forkCacheParentIdsCacheMiss.Inc() return parentForks } diff --git a/indexer/beacon/metrics.go b/indexer/beacon/metrics.go index bb64c6bb..f1068814 100644 --- a/indexer/beacon/metrics.go +++ b/indexer/beacon/metrics.go @@ -1,6 +1,7 @@ package beacon import ( + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/dora/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -25,6 +26,41 @@ type beaconMetrics struct { epochCacheVotesCacheLen prometheus.Gauge epochCacheVotesCacheHit prometheus.Counter epochCacheVotesCacheMiss prometheus.Counter + + forkCacheForkMapSize prometheus.Gauge + forkCacheParentIdCacheLen prometheus.Gauge + forkCacheParentIdCacheHit prometheus.Counter + forkCacheParentIdCacheMiss prometheus.Counter + forkCacheParentIdsCacheLen prometheus.Gauge + forkCacheParentIdsCacheHit prometheus.Counter + forkCacheParentIdsCacheMiss prometheus.Counter + + validatorCacheValidators prometheus.Gauge + validatorCacheValidatorDiffs prometheus.Gauge + validatorCacheValidatorData prometheus.Gauge + validatorCacheValidatorActivity prometheus.Gauge + validatorCachePubkeyMapSize prometheus.Gauge + + blockLoadDuration prometheus.Histogram + blockProcessDuration prometheus.Histogram + blockStoreDuration prometheus.Histogram + + epochStateLoadDuration prometheus.Histogram + epochStateLoadCount prometheus.Counter + epochStatsProcessDuration prometheus.Histogram + epochStatsStoreDuration prometheus.Histogram + epochStatsPackedSize prometheus.Histogram + epochVoteAggregateDuration prometheus.Histogram + + finalizationLoadDuration prometheus.Histogram + finalizationProcessDuration prometheus.Histogram + finalizationStoreDuration prometheus.Histogram + finalizationCleanDuration prometheus.Histogram + + pruningLoadDuration prometheus.Histogram + pruningProcessDuration prometheus.Histogram + pruningStoreDuration prometheus.Histogram + pruningCleanDuration prometheus.Histogram } func (indexer *Indexer) registerMetrics() *beaconMetrics { @@ -58,6 +94,7 @@ func (indexer *Indexer) registerMetrics() *beaconMetrics { Name: "dora_cl_indexer_block_cache_block_index_count", Help: "Number of blocks with a block index", }), + epochCacheStatsMapSize: promauto.NewGauge(prometheus.GaugeOpts{ Name: "dora_cl_indexer_epoch_cache_stats_map_size", Help: "Number of entries in the epoch cache stats map", @@ -94,11 +131,151 @@ func (indexer *Indexer) registerMetrics() *beaconMetrics { Name: "dora_cl_indexer_epoch_cache_votes_cache_miss", Help: "Number of misses in the epoch cache votes cache", }), + + forkCacheForkMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_fork_cache_fork_map_size", + Help: "Number of entries in the fork cache fork map", + }), + forkCacheParentIdCacheLen: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_fork_cache_parent_id_cache_len", + Help: "Number of entries in the fork cache parent id cache", + }), + forkCacheParentIdCacheHit: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_fork_cache_parent_id_cache_hit", + Help: "Number of hits in the fork cache parent id cache", + }), + forkCacheParentIdCacheMiss: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_fork_cache_parent_id_cache_miss", + Help: "Number of misses in the fork cache parent id cache", + }), + forkCacheParentIdsCacheLen: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_fork_cache_parent_ids_cache_len", + Help: "Number of entries in the fork cache parent ids cache", + }), + forkCacheParentIdsCacheHit: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_fork_cache_parent_ids_cache_hit", + Help: "Number of hits in the fork cache parent ids cache", + }), + forkCacheParentIdsCacheMiss: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_fork_cache_parent_ids_cache_miss", + Help: "Number of misses in the fork cache parent ids cache", + }), + validatorCacheValidators: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_validator_cache_validators", + Help: "Number of validators in the validator cache", + }), + + validatorCacheValidatorDiffs: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_validator_cache_validator_diffs", + Help: "Number of validator diffs in the validator cache", + }), + validatorCacheValidatorData: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_validator_cache_validator_data", + Help: "Number of validator data in the validator cache", + }), + validatorCacheValidatorActivity: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_validator_cache_validator_activity", + Help: "Number of validator activity entries in the validator cache", + }), + validatorCachePubkeyMapSize: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_cl_indexer_validator_cache_pubkey_map_size", + Help: "Number of entries in the validator cache pubkey map", + }), + + blockLoadDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_block_load_duration", + Help: "Block loading time from clients", + Buckets: []float64{0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 250, 500, 750, 1000, 2500, 5000, 10000}, + }), + blockProcessDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_block_process_duration", + Help: "Block processing time (fork detection, ...)", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + blockStoreDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_block_store_duration", + Help: "Block storing to db delay", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + + epochStateLoadDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_epoch_state_load_duration", + Help: "Epoch state loading time from clients", + Buckets: []float64{25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 10000, 20000, 30000, 60000, 90000, 120000}, + }), + epochStateLoadCount: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_cl_indexer_epoch_state_load_count", + Help: "Number of epoch state loads from clients", + }), + epochStatsProcessDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_epoch_stats_process_duration", + Help: "Epoch stats processing time", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + epochStatsStoreDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_epoch_stats_store_duration", + Help: "Epoch stats storing to db delay", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + epochStatsPackedSize: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_epoch_stats_packed_size", + Help: "Size of the packed epoch stats in bytes", + Buckets: []float64{0, 1000, 2500, 5000, 7500, 10000, 25000, 50000, 75000, 100000, 250000, 500000, 750000, 1000000, 2500000, 5000000, 7500000, 10000000, 25000000, 50000000, 75000000, 100000000}, + }), + epochVoteAggregateDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_epoch_vote_aggregate_duration", + Help: "Epoch vote aggregation time", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + + finalizationLoadDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_finalization_load_duration", + Help: "Finalization loading missing entities delay", + Buckets: []float64{0, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 10000, 20000, 30000, 60000, 90000, 120000}, + }), + finalizationProcessDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_finalization_process_duration", + Help: "Finalization processing time", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + finalizationStoreDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_finalization_store_duration", + Help: "Finalization storing to db delay", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + finalizationCleanDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_finalization_clean_duration", + Help: "Finalization cleaning time", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + + pruningLoadDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_pruning_load_duration", + Help: "Pruning delay for loading missing entities from clients", + Buckets: []float64{0, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 10000, 20000, 30000, 60000, 90000, 120000}, + }), + pruningProcessDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_pruning_process_duration", + Help: "Pruning processing time", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + pruningStoreDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_pruning_store_duration", + Help: "Pruning storing to db delay", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), + pruningCleanDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "dora_cl_indexer_pruning_clean_duration", + Help: "Pruning cleaning time", + Buckets: []float64{0, 1, 2, 3, 4, 5, 10, 20, 50, 100, 250, 500, 1000}, + }), } metrics.AddPreCollectFn(func() { beaconMetrics.updateBlockCacheMetrics() beaconMetrics.updateEpochCacheMetrics() + beaconMetrics.updateForkCacheMetrics() + beaconMetrics.updateValidatorCacheMetrics() }) return beaconMetrics @@ -168,3 +345,45 @@ func (metrics *beaconMetrics) updateEpochCacheMetrics() { metrics.epochCacheStateLoadedCount.Set(float64(stateLoadedCount)) metrics.epochCacheVotesCacheLen.Set(float64(metrics.indexer.epochCache.votesCache.Len())) } + +func (metrics *beaconMetrics) updateForkCacheMetrics() { + metrics.indexer.forkCache.cacheMutex.RLock() + defer metrics.indexer.forkCache.cacheMutex.RUnlock() + + metrics.forkCacheForkMapSize.Set(float64(len(metrics.indexer.forkCache.forkMap))) + metrics.forkCacheParentIdCacheLen.Set(float64(metrics.indexer.forkCache.parentIdCache.Len())) + metrics.forkCacheParentIdsCacheLen.Set(float64(metrics.indexer.forkCache.parentIdsCache.Len())) +} + +func (metrics *beaconMetrics) updateValidatorCacheMetrics() { + metrics.indexer.validatorCache.cacheMutex.RLock() + defer metrics.indexer.validatorCache.cacheMutex.RUnlock() + + metrics.validatorCacheValidators.Set(float64(len(metrics.indexer.validatorCache.valsetCache))) + + validatorsMap := map[*phase0.Validator]bool{} + validatorDiffs := 0 + for _, validator := range metrics.indexer.validatorCache.valsetCache { + refs := len(validator.validatorDiffs) + for _, diff := range validator.validatorDiffs { + validatorsMap[diff.validator] = true + } + + if validator.finalValidator != nil { + validatorsMap[validator.finalValidator] = true + refs++ + } + validatorDiffs += refs + } + + metrics.validatorCacheValidatorDiffs.Set(float64(validatorDiffs)) + metrics.validatorCacheValidatorData.Set(float64(len(validatorsMap))) + + activityCount := 0 + for _, recentActivity := range metrics.indexer.validatorCache.validatorActivityMap { + activityCount += len(recentActivity) + } + metrics.validatorCacheValidatorActivity.Set(float64(activityCount)) + + metrics.validatorCachePubkeyMapSize.Set(float64(len(metrics.indexer.validatorCache.pubkeyMap))) +} diff --git a/indexer/beacon/pruning.go b/indexer/beacon/pruning.go index 65033d7f..315d47f3 100644 --- a/indexer/beacon/pruning.go +++ b/indexer/beacon/pruning.go @@ -276,6 +276,11 @@ func (indexer *Indexer) processEpochPruning(pruneEpoch phase0.Epoch) (uint64, ui prunedEpochStates := indexer.epochCache.removeUnreferencedEpochStates() + indexer.metrics.pruningLoadDuration.Observe(float64(t1loading.Milliseconds())) + indexer.metrics.pruningProcessDuration.Observe(float64(t1dur.Milliseconds())) + indexer.metrics.pruningStoreDuration.Observe(float64(t2dur.Milliseconds())) + indexer.metrics.pruningCleanDuration.Observe(float64(time.Since(t2).Milliseconds())) + // run gc to clean up memory runtime.GC() diff --git a/services/callratelimiter.go b/services/callratelimiter.go index d30fbb99..40d822a5 100644 --- a/services/callratelimiter.go +++ b/services/callratelimiter.go @@ -8,6 +8,9 @@ import ( "sync" "time" + "github.com/ethpandaops/dora/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/time/rate" ) @@ -18,6 +21,9 @@ type CallRateLimiter struct { mutex sync.Mutex visitors map[string]*callRateVisitor + + visitorsCount prometheus.Gauge + newVisitors prometheus.Counter } type callRateVisitor struct { @@ -39,9 +45,25 @@ func StartCallRateLimiter(proxyCount uint, rateLimit uint, burstLimit uint) erro burstLimit: burstLimit, visitors: map[string]*callRateVisitor{}, + + visitorsCount: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dora_call_rate_limiter_visitors_count", + Help: "Number of visitors in the call rate limiter", + }), + newVisitors: promauto.NewCounter(prometheus.CounterOpts{ + Name: "dora_call_rate_limiter_new_visitors_count", + Help: "Number of new visitors in the call rate limiter", + }), } go GlobalCallRateLimiter.cleanupVisitors() + metrics.AddPreCollectFn(func() { + GlobalCallRateLimiter.mutex.Lock() + defer GlobalCallRateLimiter.mutex.Unlock() + + GlobalCallRateLimiter.visitorsCount.Set(float64(len(GlobalCallRateLimiter.visitors))) + }) + return nil } @@ -87,6 +109,8 @@ func (crl *CallRateLimiter) getVisitor(r *http.Request) *callRateVisitor { lastSeen: time.Now(), } crl.visitors[ip] = visitor + + crl.newVisitors.Inc() } else { visitor.lastSeen = time.Now() } diff --git a/services/frontendcache.go b/services/frontendcache.go index 428823f9..8806655f 100644 --- a/services/frontendcache.go +++ b/services/frontendcache.go @@ -13,6 +13,9 @@ import ( "github.com/ethpandaops/dora/cache" "github.com/ethpandaops/dora/utils" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" "github.com/timandy/routine" ) @@ -25,6 +28,10 @@ type FrontendCacheService struct { processingDict map[string]*FrontendCacheProcessingPage callStackMutex sync.RWMutex callStackBuffer []byte + + pageCallCount *prometheus.CounterVec + pageCallDuration *prometheus.HistogramVec + pageCallCacheHit *prometheus.CounterVec } type FrontendCacheProcessingPage struct { @@ -72,12 +79,32 @@ func StartFrontendCache() error { tieredCache: tieredCache, processingDict: make(map[string]*FrontendCacheProcessingPage), callStackBuffer: make([]byte, 1024*1024*5), + + pageCallCount: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "dora_frontend_page_call_count", + Help: "Number of page calls", + }, []string{"page"}), + pageCallDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dora_frontend_page_call_duration", + Help: "Processing time for page calls", + Buckets: []float64{0, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 10000, 20000, 30000, 60000, 120000}, + }, []string{"page"}), + pageCallCacheHit: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "dora_frontend_page_call_cache_hit", + Help: "Number of page calls that were served from cache", + }, []string{"page"}), } return nil } func (fc *FrontendCacheService) ProcessCachedPage(pageKey string, caching bool, returnValue interface{}, buildFn PageDataHandlerFn) (interface{}, error) { //fmt.Printf("page call %v (goid: %v)\n", pageKey, utils.Goid()) + pageType := pageKey + if strings.Contains(pageKey, ":") { + pageType = strings.Split(pageKey, ":")[0] + } + + fc.pageCallCount.WithLabelValues(pageType).Inc() fc.processingMutex.Lock() processingPage := fc.processingDict[pageKey] @@ -85,6 +112,8 @@ func (fc *FrontendCacheService) ProcessCachedPage(pageKey string, caching bool, fc.processingMutex.Unlock() logrus.Debugf("page already processing: %v", pageKey) + fc.pageCallCacheHit.WithLabelValues(pageType).Inc() + processingPage.modelMutex.RLock() defer processingPage.modelMutex.RUnlock() return processingPage.pageModel, processingPage.pageError @@ -98,10 +127,15 @@ func (fc *FrontendCacheService) ProcessCachedPage(pageKey string, caching bool, defer fc.completePageLoad(pageKey, processingPage) fc.processingMutex.Unlock() + startTime := time.Now() var returnError error returnValue, returnError = fc.processPageCall(pageKey, caching, returnValue, buildFn, processingPage) processingPage.pageModel = returnValue processingPage.pageError = returnError + + duration := time.Since(startTime) + fc.pageCallDuration.WithLabelValues(pageType).Observe(float64(duration.Milliseconds())) + return returnValue, returnError }