Skip to content

Commit 3b2be88

Browse files
committed
Load all open incidents from DB on daemon startup
1 parent f44c4b6 commit 3b2be88

File tree

4 files changed

+210
-49
lines changed

4 files changed

+210
-49
lines changed

cmd/icinga-notifications-daemon/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"github.com/icinga/icinga-notifications/internal/config"
8+
"github.com/icinga/icinga-notifications/internal/incident"
89
"github.com/icinga/icinga-notifications/internal/listener"
910
"github.com/icinga/icingadb/pkg/logging"
1011
"github.com/icinga/icingadb/pkg/utils"
@@ -62,6 +63,11 @@ func main() {
6263

6364
go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)
6465

66+
err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf)
67+
if err != nil {
68+
logger.Fatalw("Can't load incidents from database", zap.Error(err))
69+
}
70+
6571
if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
6672
panic(err)
6773
}

internal/incident/incident.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,77 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
648648
return nil
649649
}
650650

651+
// ReloadRecipients reloads the current incident recipients from the database.
652+
// Returns error on database failure.
653+
func (i *Incident) ReloadRecipients(ctx context.Context) error {
654+
contact := &ContactRow{}
655+
var contacts []*ContactRow
656+
err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
657+
if err != nil {
658+
i.logger.Errorw(
659+
"failed to reload incident recipients", zap.String("object", i.ObjectDisplayName()),
660+
zap.String("incident", i.String()), zap.Error(err),
661+
)
662+
663+
return errors.New("failed to reload incident recipients")
664+
}
665+
666+
recipients := make(map[recipient.Key]*RecipientState)
667+
for _, contact := range contacts {
668+
recipients[contact.Key] = &RecipientState{Role: contact.Role}
669+
}
670+
671+
i.Recipients = recipients
672+
673+
return nil
674+
}
675+
676+
// LoadSourceSeverities loads all non-OK source severities from database.
677+
// Returns error on database failure.
678+
func (i *Incident) LoadSourceSeverities(ctx context.Context) error {
679+
sourceSeverity := &SourceSeverity{IncidentID: i.ID()}
680+
var sources []SourceSeverity
681+
err := i.db.SelectContext(
682+
ctx, &sources,
683+
i.db.Rebind(i.db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
684+
i.ID(), event.SeverityOK,
685+
)
686+
if err != nil {
687+
i.logger.Errorw(
688+
"failed to load incident source severities from database", zap.String("object", i.ObjectDisplayName()),
689+
zap.String("incident", i.String()), zap.Error(err),
690+
)
691+
692+
return errors.New("failed to load incident source severities")
693+
}
694+
695+
for _, source := range sources {
696+
i.SeverityBySource[source.SourceID] = source.Severity
697+
}
698+
699+
return nil
700+
}
701+
702+
func (i *Incident) LoadEscalationsState(ctx context.Context) error {
703+
state := &EscalationState{}
704+
var states []*EscalationState
705+
err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID())
706+
if err != nil {
707+
i.logger.Errorw(
708+
"failed to load incident rule escalation states", zap.String("object", i.ObjectDisplayName()),
709+
zap.String("incident", i.String()), zap.Error(err),
710+
)
711+
712+
return errors.New("failed to load incident rule escalation states")
713+
}
714+
715+
for _, state := range states {
716+
i.EscalationState[state.RuleEscalationID] = state
717+
}
718+
719+
return nil
720+
}
721+
651722
type EscalationState struct {
652723
IncidentID int64 `db:"incident_id"`
653724
RuleEscalationID int64 `db:"rule_escalation_id"`

internal/incident/incidents.go

Lines changed: 84 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/icinga/icinga-notifications/internal/recipient"
1111
"github.com/icinga/icingadb/pkg/icingadb"
1212
"github.com/icinga/icingadb/pkg/logging"
13+
"github.com/icinga/icingadb/pkg/types"
1314
"go.uber.org/zap"
1415
"golang.org/x/sync/errgroup"
1516
"sync"
@@ -20,6 +21,86 @@ var (
2021
currentIncidentsMu sync.Mutex
2122
)
2223

24+
func LoadOpenIncidents(
25+
ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile,
26+
) error {
27+
var incidentRows []*IncidentRow
28+
err := db.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`)
29+
if err != nil {
30+
logger.Errorw("failed to load active incidents from database", zap.Error(err))
31+
32+
return errors.New("failed to fetch open incidents")
33+
}
34+
35+
incidents := make(map[*object.Object]*Incident)
36+
g, childCtx := errgroup.WithContext(ctx)
37+
for _, incidentRow := range incidentRows {
38+
incident := &Incident{
39+
db: db,
40+
logger: logger,
41+
runtimeConfig: runtimeConfig,
42+
StartedAt: incidentRow.StartedAt.Time(),
43+
incidentRowID: incidentRow.ID,
44+
configFile: configFile,
45+
SeverityBySource: map[int64]event.Severity{},
46+
EscalationState: map[escalationID]*EscalationState{},
47+
Rules: map[ruleID]struct{}{},
48+
Recipients: map[recipient.Key]*RecipientState{},
49+
}
50+
51+
obj, err := object.LoadFromDB(ctx, db, incidentRow.ObjectID)
52+
if err != nil {
53+
return err
54+
}
55+
56+
incident.Object = obj
57+
58+
g.Go(func() error {
59+
return incident.LoadSourceSeverities(childCtx)
60+
})
61+
g.Go(func() error {
62+
return incident.LoadEscalationsState(childCtx)
63+
})
64+
g.Go(func() error {
65+
err := incident.ReloadRecipients(childCtx)
66+
if err != nil {
67+
return err
68+
}
69+
70+
tx, err := db.BeginTxx(ctx, nil)
71+
if err != nil {
72+
return err
73+
}
74+
defer func() { _ = tx.Rollback() }()
75+
76+
_, err = incident.evaluateRules(childCtx, tx, 0, types.Int{})
77+
if err != nil {
78+
return err
79+
}
80+
81+
incident.evaluateEscalations()
82+
if err = tx.Commit(); err != nil {
83+
return err
84+
}
85+
86+
return childCtx.Err()
87+
})
88+
89+
incidents[obj] = incident
90+
}
91+
92+
if err = g.Wait(); err != nil {
93+
return err
94+
}
95+
96+
currentIncidentsMu.Lock()
97+
defer currentIncidentsMu.Unlock()
98+
99+
currentIncidents = incidents
100+
101+
return nil
102+
}
103+
23104
func GetCurrent(
24105
ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig,
25106
configFile *config.ConfigFile, create bool,
@@ -54,47 +135,10 @@ func GetCurrent(
54135

55136
g, childCtx := errgroup.WithContext(ctx)
56137
g.Go(func() error {
57-
sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
58-
var sources []SourceSeverity
59-
err := db.SelectContext(
60-
childCtx, &sources,
61-
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
62-
ir.ID, event.SeverityOK,
63-
)
64-
if err != nil {
65-
logger.Errorw(
66-
"failed to load incident source severities from database", zap.String("object", obj.DisplayName()),
67-
zap.String("incident", incident.String()), zap.Error(err),
68-
)
69-
70-
return errors.New("failed to load incident source severities")
71-
}
72-
73-
for _, source := range sources {
74-
incident.SeverityBySource[source.SourceID] = source.Severity
75-
}
76-
77-
return childCtx.Err()
138+
return incident.LoadSourceSeverities(childCtx)
78139
})
79-
80140
g.Go(func() error {
81-
state := &EscalationState{}
82-
var states []*EscalationState
83-
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
84-
if err != nil {
85-
logger.Errorw(
86-
"failed to load incident rule escalation states", zap.String("object", obj.DisplayName()),
87-
zap.String("incident", incident.String()), zap.Error(err),
88-
)
89-
90-
return errors.New("failed to load incident rule escalation states")
91-
}
92-
93-
for _, state := range states {
94-
incident.EscalationState[state.RuleEscalationID] = state
95-
}
96-
97-
return childCtx.Err()
141+
return incident.LoadEscalationsState(childCtx)
98142
})
99143

100144
if err := g.Wait(); err != nil {
@@ -118,9 +162,7 @@ func GetCurrent(
118162
currentIncident.Lock()
119163
defer currentIncident.Unlock()
120164

121-
contact := &ContactRow{}
122-
var contacts []*ContactRow
123-
err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
165+
err := currentIncident.ReloadRecipients(ctx)
124166
if err != nil {
125167
logger.Errorw(
126168
"failed to reload incident recipients", zap.String("object", obj.DisplayName()),
@@ -129,13 +171,6 @@ func GetCurrent(
129171

130172
return nil, false, errors.New("failed to load incident recipients")
131173
}
132-
133-
recipients := make(map[recipient.Key]*RecipientState)
134-
for _, contact := range contacts {
135-
recipients[contact.Key] = &RecipientState{Role: contact.Role}
136-
}
137-
138-
currentIncident.Recipients = recipients
139174
}
140175

141176
return currentIncident, created, nil

internal/object/db_types.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package object
22

33
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icingadb/pkg/icingadb"
47
"github.com/icinga/icingadb/pkg/types"
58
)
69

@@ -27,3 +30,49 @@ type ObjectRow struct {
2730
func (d *ObjectRow) TableName() string {
2831
return "object"
2932
}
33+
34+
func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) {
35+
objectRow := &ObjectRow{ID: id}
36+
err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
37+
if err != nil {
38+
return nil, fmt.Errorf("failed to fetch object: %w", err)
39+
}
40+
41+
tags := map[string]string{"host": objectRow.Host}
42+
if objectRow.Service.Valid {
43+
tags["service"] = objectRow.Service.String
44+
}
45+
46+
metadata := make(map[int64]*SourceMetadata)
47+
var sourceMetas []*SourceMetadata
48+
err = db.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to fetch source object: %w", err)
51+
}
52+
53+
var extraTags []*ExtraTagRow
54+
err = db.SelectContext(
55+
ctx, &extraTags,
56+
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id,
57+
)
58+
if err != nil {
59+
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
60+
}
61+
62+
for _, sourceMeta := range sourceMetas {
63+
sourceMeta.ExtraTags = map[string]string{}
64+
metadata[sourceMeta.SourceId] = sourceMeta
65+
}
66+
67+
for _, extraTag := range extraTags {
68+
sourceMeta, ok := metadata[extraTag.SourceId]
69+
if ok {
70+
sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value
71+
}
72+
}
73+
74+
obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
75+
obj.UpdateCache()
76+
77+
return obj, nil
78+
}

0 commit comments

Comments
 (0)