Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 91 additions & 77 deletions rds/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,17 @@ type DbCollector interface {
}

type Collector struct {
sess *session.Session
region string
instance rds.DBInstance
ip *net.IPAddr

cloudWatchLogsApi *cloudwatchlogs.CloudWatchLogs

dbCollector DbCollector

logReader *LogReader
logParser *logparser.Parser

logger logger.Logger
sess *session.Session
region string
instance rds.DBInstance
ip *net.IPAddr
cloudWatchLogsApi *cloudwatchlogs.CloudWatchLogs
dbCollector DbCollector
logReader *LogReader
logParser *logparser.Parser
logger logger.Logger
metricsChan chan prometheus.Metric
done chan struct{}
}

func NewCollector(sess *session.Session, i *rds.DBInstance) (*Collector, error) {
Expand Down Expand Up @@ -171,71 +169,87 @@ func (c *Collector) Close() {
}

func (c *Collector) Collect(ch chan<- prometheus.Metric) {
i := c.instance

ch <- utils.Gauge(dStatus, 1, aws.StringValue(i.DBInstanceStatus))

ch <- utils.Gauge(dInfo, 1,
c.region,
aws.StringValue(i.AvailabilityZone),

aws.StringValue(i.Endpoint.Address),
c.ip.String(),
strconv.Itoa(int(aws.Int64Value(i.Endpoint.Port))),

aws.StringValue(i.Engine),
aws.StringValue(i.EngineVersion),

aws.StringValue(i.DBInstanceClass),
aws.StringValue(i.StorageType),

strconv.FormatBool(aws.BoolValue(i.MultiAZ)),

aws.StringValue(i.SecondaryAvailabilityZone),

utils.IdWithRegion(c.region, aws.StringValue(i.DBClusterIdentifier)),

utils.IdWithRegion(c.region, aws.StringValue(i.ReadReplicaSourceDBInstanceIdentifier)),
)

ch <- utils.Gauge(dAllocatedStorage, float64(aws.Int64Value(i.AllocatedStorage)))
ch <- utils.Gauge(dStorageAutoscalingThreshold, float64(aws.Int64Value(i.MaxAllocatedStorage)))
ch <- utils.Gauge(dStorageProvisionedIOPs, float64(aws.Int64Value(i.Iops)))
ch <- utils.Gauge(dBackupRetentionPeriod, float64(aws.Int64Value(i.BackupRetentionPeriod)))

for _, r := range i.ReadReplicaDBInstanceIdentifiers {
ch <- utils.Gauge(dReadReplicaInfo, float64(1), utils.IdWithRegion(c.region, aws.StringValue(r)))
}

wg := sync.WaitGroup{}

if aws.Int64Value(c.instance.MonitoringInterval) > 0 && c.instance.DbiResourceId != nil {
wg.Add(1)
go func() {
t := time.Now()
c.collectOsMetrics(ch)
c.logger.Info("os metrics collected in:", time.Since(t))
wg.Done()
}()
}

if c.dbCollector != nil {
wg.Add(1)
go func() {
t := time.Now()
c.dbCollector.Collect(ch)
c.logger.Info("db metrics collected in:", time.Since(t))
wg.Done()
}()
}

wg.Wait()
c.metricsChan = make(chan prometheus.Metric, 1000)
c.done = make(chan struct{})

go func() {
for metric := range c.metricsChan {
ch <- metric
}
close(c.done)
}()

var wg sync.WaitGroup
wg.Add(3)

go func() {
defer wg.Done()
c.collectInstanceInfo()
}()

if aws.Int64Value(c.instance.MonitoringInterval) > 0 && c.instance.DbiResourceId != nil {
go func() {
defer wg.Done()
t := time.Now()
c.collectOsMetrics(c.metricsChan)
c.logger.Info("os metrics collected in:", time.Since(t))
}()
} else {
wg.Done()
}

if c.dbCollector != nil {
go func() {
defer wg.Done()
t := time.Now()
c.dbCollector.Collect(c.metricsChan)
c.logger.Info("db metrics collected in:", time.Since(t))
}()
} else {
wg.Done()
}

wg.Wait()

if c.logParser != nil {
for _, lc := range c.logParser.GetCounters() {
c.metricsChan <- utils.Counter(dLogMessages, float64(lc.Messages), lc.Level.String(), lc.Hash, lc.Sample)
}
}

close(c.metricsChan)
<-c.done
}

if c.logParser != nil {
for _, lc := range c.logParser.GetCounters() {
ch <- utils.Counter(dLogMessages, float64(lc.Messages), lc.Level.String(), lc.Hash, lc.Sample)
}
}
func (c *Collector) collectInstanceInfo() {
i := c.instance

c.metricsChan <- utils.Gauge(dStatus, 1, aws.StringValue(i.DBInstanceStatus))

c.metricsChan <- utils.Gauge(dInfo, 1,
c.region,
aws.StringValue(i.AvailabilityZone),
aws.StringValue(i.Endpoint.Address),
c.ip.String(),
strconv.Itoa(int(aws.Int64Value(i.Endpoint.Port))),
aws.StringValue(i.Engine),
aws.StringValue(i.EngineVersion),
aws.StringValue(i.DBInstanceClass),
aws.StringValue(i.StorageType),
strconv.FormatBool(aws.BoolValue(i.MultiAZ)),
aws.StringValue(i.SecondaryAvailabilityZone),
utils.IdWithRegion(c.region, aws.StringValue(i.DBClusterIdentifier)),
utils.IdWithRegion(c.region, aws.StringValue(i.ReadReplicaSourceDBInstanceIdentifier)),
)

c.metricsChan <- utils.Gauge(dAllocatedStorage, float64(aws.Int64Value(i.AllocatedStorage)))
c.metricsChan <- utils.Gauge(dStorageAutoscalingThreshold, float64(aws.Int64Value(i.MaxAllocatedStorage)))
c.metricsChan <- utils.Gauge(dStorageProvisionedIOPs, float64(aws.Int64Value(i.Iops)))
c.metricsChan <- utils.Gauge(dBackupRetentionPeriod, float64(aws.Int64Value(i.BackupRetentionPeriod)))

for _, r := range i.ReadReplicaDBInstanceIdentifiers {
c.metricsChan <- utils.Gauge(dReadReplicaInfo, float64(1), utils.IdWithRegion(c.region, aws.StringValue(r)))
}
}

func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
Expand Down
188 changes: 105 additions & 83 deletions rds/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,101 +10,123 @@ import (
"github.com/coroot/logger"
"github.com/prometheus/client_golang/prometheus"
"time"
"sync"
)

type Discoverer struct {
reg prometheus.Registerer

awsSession *session.Session

instances map[string]*Collector

logger logger.Logger
reg prometheus.Registerer
awsSession *session.Session
instances sync.Map
logger logger.Logger
workerPool *utils.WorkerPool
}

func NewDiscoverer(reg prometheus.Registerer, awsSession *session.Session) *Discoverer {
d := &Discoverer{
reg: reg,
awsSession: awsSession,
instances: map[string]*Collector{},
logger: logger.NewKlog(""),
}
return d
return &Discoverer{
reg: reg,
awsSession: awsSession,
instances: sync.Map{},
logger: logger.NewKlog(""),
workerPool: utils.NewWorkerPool(10), // Adjust the number of workers as needed
}
}

func (d *Discoverer) Run() {
api := rds.New(d.awsSession)

if err := d.refresh(api); err != nil {
d.logger.Warning(err)
}

ticker := time.Tick(*flags.DiscoveryInterval)
for range ticker {
if err := d.refresh(api); err != nil {
d.logger.Warning(err)
}
}
api := rds.New(d.awsSession)

if err := d.refresh(api); err != nil {
d.logger.Warning(err)
}

ticker := time.NewTicker(*flags.DiscoveryInterval)
defer ticker.Stop()

for range ticker.C {
if err := d.refresh(api); err != nil {
d.logger.Warning(err)
}
}
}

func (d *Discoverer) refresh(api rdsiface.RDSAPI) error {
t := time.Now()
defer func() {
d.logger.Info("instances refreshed in:", time.Since(t))
}()

output, err := api.DescribeDBInstances(nil)
if err != nil {
return err
}

actualInstances := map[string]bool{}
for _, dbInstance := range output.DBInstances {
if dbInstance.Endpoint == nil {
continue
}
id := aws.StringValue(dbInstance.DBInstanceIdentifier)
input := &rds.ListTagsForResourceInput{ResourceName: dbInstance.DBInstanceArn}
tags := map[string]string{}
o, err := api.ListTagsForResource(input)
if err != nil {
d.logger.Error(err)
} else {
for _, t := range o.TagList {
tags[aws.StringValue(t.Key)] = aws.StringValue(t.Value)
}
}
if utils.Filtered(*flags.RdsFilters, tags) {
d.logger.Infof("RDS instance %s (tags: %s) was skipped according to the tag-based filters: %s", id, tags, *flags.RdsFilters)
continue
}
actualInstances[id] = true
i, ok := d.instances[id]
if !ok {
d.logger.Info("new DB instance found:", id)
i, err = NewCollector(d.awsSession, dbInstance)
if err != nil {
d.logger.Warning("failed to init RDS collector:", err)
continue
}
if err := d.wrappedReg(id).Register(i); err != nil {
d.logger.Warning(err)
continue
}
d.instances[id] = i
}
i.update(dbInstance)
}

for id, i := range d.instances {
if !actualInstances[id] {
d.logger.Info("instance no longer exists:", id)
d.wrappedReg(id).Unregister(i)
i.Close()
delete(d.instances, id)
}
}
return nil
t := time.Now()
defer func() {
d.logger.Info("instances refreshed in:", time.Since(t))
}()

output, err := api.DescribeDBInstances(nil)
if err != nil {
return err
}

actualInstances := make(map[string]bool)
var wg sync.WaitGroup
for _, dbInstance := range output.DBInstances {
wg.Add(1)
d.workerPool.Submit(func() {
defer wg.Done()
id := aws.StringValue(dbInstance.DBInstanceIdentifier)
actualInstances[id] = true
d.processInstance(api, dbInstance)
})
}
wg.Wait()

d.instances.Range(func(key, value interface{}) bool {
id := key.(string)
if _, exists := actualInstances[id]; !exists {
d.logger.Info("instance no longer exists:", id)
d.wrappedReg(id).Unregister(value.(*Collector))
value.(*Collector).Close()
d.instances.Delete(id)
}
return true
})

return nil
}

func (d *Discoverer) processInstance(api rdsiface.RDSAPI, dbInstance *rds.DBInstance) {
if dbInstance.Endpoint == nil {
return
}
id := aws.StringValue(dbInstance.DBInstanceIdentifier)
tags := d.getTags(api, dbInstance.DBInstanceArn)
if utils.Filtered(*flags.RdsFilters, tags) {
d.logger.Infof("RDS instance %s (tags: %s) was skipped according to the tag-based filters: %s", id, tags, *flags.RdsFilters)
return
}

value, ok := d.instances.Load(id)
if !ok {
d.logger.Info("new DB instance found:", id)
collector, err := NewCollector(d.awsSession, dbInstance)
if err != nil {
d.logger.Warning("failed to init RDS collector:", err)
return
}
if err := d.wrappedReg(id).Register(collector); err != nil {
d.logger.Warning(err)
return
}
d.instances.Store(id, collector)
} else {
value.(*Collector).update(dbInstance)
}
}

func (d *Discoverer) getTags(api rdsiface.RDSAPI, arn *string) map[string]string {
input := &rds.ListTagsForResourceInput{ResourceName: arn}
tags := map[string]string{}
o, err := api.ListTagsForResource(input)
if err != nil {
d.logger.Error(err)
} else {
for _, t := range o.TagList {
tags[aws.StringValue(t.Key)] = aws.StringValue(t.Value)
}
}
return tags
}

func (d *Discoverer) wrappedReg(instanceId string) prometheus.Registerer {
Expand Down
Loading