Skip to content

Commit 0030bfd

Browse files
committed
Load all open incidents from DB on daemon startup
1 parent 24b6dd0 commit 0030bfd

File tree

4 files changed

+204
-43
lines changed

4 files changed

+204
-43
lines changed

cmd/icinga-notifications-daemon/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"github.com/icinga/icinga-notifications/internal/config"
8+
"github.com/icinga/icinga-notifications/internal/incident"
89
"github.com/icinga/icinga-notifications/internal/listener"
910
"github.com/icinga/icingadb/pkg/logging"
1011
"github.com/icinga/icingadb/pkg/utils"
@@ -62,6 +63,11 @@ func main() {
6263

6364
go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)
6465

66+
err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf)
67+
if err != nil {
68+
logger.Fatalw("Can't load incidents from database", zap.Error(err))
69+
}
70+
6571
if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
6672
panic(err)
6773
}

internal/incident/incident.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,71 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
595595
return nil
596596
}
597597

598+
// ReloadRecipients reloads the current incident recipients from the database.
599+
// Returns error on database failure.
600+
func (i *Incident) ReloadRecipients(ctx context.Context) error {
601+
contact := &ContactRow{}
602+
var contacts []*ContactRow
603+
err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
604+
if err != nil {
605+
i.logger.Errorw(
606+
"failed to reload incident recipients", zap.String("object", i.ObjectDisplayName()),
607+
zap.String("incident", i.String()), zap.Error(err),
608+
)
609+
610+
return errors.New("failed to reload incident recipients")
611+
}
612+
613+
recipients := make(map[recipient.Key]*RecipientState)
614+
for _, contact := range contacts {
615+
recipients[contact.Key] = &RecipientState{Role: contact.Role}
616+
}
617+
618+
i.Recipients = recipients
619+
620+
return nil
621+
}
622+
623+
// LoadSourceSeverities loads all non-OK source severities from database.
624+
// Returns error on database failure.
625+
func (i *Incident) LoadSourceSeverities(ctx context.Context) error {
626+
sourceSeverity := &SourceSeverity{IncidentID: i.ID()}
627+
var sources []SourceSeverity
628+
err := i.db.SelectContext(
629+
ctx, &sources,
630+
i.db.Rebind(i.db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
631+
i.ID(), event.SeverityOK,
632+
)
633+
if err != nil {
634+
i.logger.Errorw("Failed to load incident source severities from database", zap.Error(err))
635+
636+
return errors.New("failed to load incident source severities")
637+
}
638+
639+
for _, source := range sources {
640+
i.SeverityBySource[source.SourceID] = source.Severity
641+
}
642+
643+
return nil
644+
}
645+
646+
func (i *Incident) LoadEscalationsState(ctx context.Context) error {
647+
state := &EscalationState{}
648+
var states []*EscalationState
649+
err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID())
650+
if err != nil {
651+
i.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err))
652+
653+
return errors.New("failed to load incident rule escalation states")
654+
}
655+
656+
for _, state := range states {
657+
i.EscalationState[state.RuleEscalationID] = state
658+
}
659+
660+
return nil
661+
}
662+
598663
type EscalationState struct {
599664
IncidentID int64 `db:"incident_id"`
600665
RuleEscalationID int64 `db:"rule_escalation_id"`

internal/incident/incidents.go

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/icinga/icinga-notifications/internal/recipient"
1111
"github.com/icinga/icingadb/pkg/icingadb"
1212
"github.com/icinga/icingadb/pkg/logging"
13+
"github.com/icinga/icingadb/pkg/types"
1314
"go.uber.org/zap"
1415
"golang.org/x/sync/errgroup"
1516
"sync"
@@ -20,6 +21,86 @@ var (
2021
currentIncidentsMu sync.Mutex
2122
)
2223

24+
func LoadOpenIncidents(
25+
ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile,
26+
) error {
27+
var incidentRows []*IncidentRow
28+
err := db.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`)
29+
if err != nil {
30+
logger.Errorw("failed to load active incidents from database", zap.Error(err))
31+
32+
return errors.New("failed to fetch open incidents")
33+
}
34+
35+
incidents := make(map[*object.Object]*Incident)
36+
g, childCtx := errgroup.WithContext(ctx)
37+
for _, incidentRow := range incidentRows {
38+
incident := &Incident{
39+
db: db,
40+
runtimeConfig: runtimeConfig,
41+
StartedAt: incidentRow.StartedAt.Time(),
42+
incidentRowID: incidentRow.ID,
43+
configFile: configFile,
44+
SeverityBySource: map[int64]event.Severity{},
45+
EscalationState: map[escalationID]*EscalationState{},
46+
Rules: map[ruleID]struct{}{},
47+
Recipients: map[recipient.Key]*RecipientState{},
48+
}
49+
50+
obj, err := object.LoadFromDB(ctx, db, incidentRow.ObjectID)
51+
if err != nil {
52+
return err
53+
}
54+
55+
incident.Object = obj
56+
incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String()))
57+
58+
g.Go(func() error {
59+
return incident.LoadSourceSeverities(childCtx)
60+
})
61+
g.Go(func() error {
62+
return incident.LoadEscalationsState(childCtx)
63+
})
64+
g.Go(func() error {
65+
err := incident.ReloadRecipients(childCtx)
66+
if err != nil {
67+
return err
68+
}
69+
70+
tx, err := db.BeginTxx(ctx, nil)
71+
if err != nil {
72+
return err
73+
}
74+
defer func() { _ = tx.Rollback() }()
75+
76+
_, err = incident.evaluateRules(childCtx, tx, 0, types.Int{})
77+
if err != nil {
78+
return err
79+
}
80+
81+
incident.evaluateEscalations()
82+
if err = tx.Commit(); err != nil {
83+
return err
84+
}
85+
86+
return childCtx.Err()
87+
})
88+
89+
incidents[obj] = incident
90+
}
91+
92+
if err = g.Wait(); err != nil {
93+
return err
94+
}
95+
96+
currentIncidentsMu.Lock()
97+
defer currentIncidentsMu.Unlock()
98+
99+
currentIncidents = incidents
100+
101+
return nil
102+
}
103+
23104
func GetCurrent(
24105
ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig,
25106
configFile *config.ConfigFile, create bool,
@@ -55,41 +136,10 @@ func GetCurrent(
55136

56137
g, childCtx := errgroup.WithContext(ctx)
57138
g.Go(func() error {
58-
sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
59-
var sources []SourceSeverity
60-
err := db.SelectContext(
61-
childCtx, &sources,
62-
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
63-
ir.ID, event.SeverityOK,
64-
)
65-
if err != nil {
66-
incident.logger.Errorw("Failed to load incident source severities from database", zap.Error(err))
67-
68-
return errors.New("failed to load incident source severities")
69-
}
70-
71-
for _, source := range sources {
72-
incident.SeverityBySource[source.SourceID] = source.Severity
73-
}
74-
75-
return childCtx.Err()
139+
return incident.LoadSourceSeverities(childCtx)
76140
})
77-
78141
g.Go(func() error {
79-
state := &EscalationState{}
80-
var states []*EscalationState
81-
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
82-
if err != nil {
83-
incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err))
84-
85-
return errors.New("failed to load incident rule escalation states")
86-
}
87-
88-
for _, state := range states {
89-
incident.EscalationState[state.RuleEscalationID] = state
90-
}
91-
92-
return childCtx.Err()
142+
return incident.LoadEscalationsState(childCtx)
93143
})
94144

95145
if err := g.Wait(); err != nil {
@@ -113,21 +163,12 @@ func GetCurrent(
113163
currentIncident.Lock()
114164
defer currentIncident.Unlock()
115165

116-
contact := &ContactRow{}
117-
var contacts []*ContactRow
118-
err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
166+
err := currentIncident.ReloadRecipients(ctx)
119167
if err != nil {
120168
currentIncident.logger.Errorw("Failed to reload incident recipients", zap.Error(err))
121169

122170
return nil, false, errors.New("failed to load incident recipients")
123171
}
124-
125-
recipients := make(map[recipient.Key]*RecipientState)
126-
for _, contact := range contacts {
127-
recipients[contact.Key] = &RecipientState{Role: contact.Role}
128-
}
129-
130-
currentIncident.Recipients = recipients
131172
}
132173

133174
return currentIncident, created, nil

internal/object/db_types.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package object
22

33
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icingadb/pkg/icingadb"
47
"github.com/icinga/icingadb/pkg/types"
58
)
69

@@ -27,3 +30,49 @@ type ObjectRow struct {
2730
func (d *ObjectRow) TableName() string {
2831
return "object"
2932
}
33+
34+
func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) {
35+
objectRow := &ObjectRow{ID: id}
36+
err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
37+
if err != nil {
38+
return nil, fmt.Errorf("failed to fetch object: %w", err)
39+
}
40+
41+
tags := map[string]string{"host": objectRow.Host}
42+
if objectRow.Service.Valid {
43+
tags["service"] = objectRow.Service.String
44+
}
45+
46+
metadata := make(map[int64]*SourceMetadata)
47+
var sourceMetas []*SourceMetadata
48+
err = db.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to fetch source object: %w", err)
51+
}
52+
53+
var extraTags []*ExtraTagRow
54+
err = db.SelectContext(
55+
ctx, &extraTags,
56+
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id,
57+
)
58+
if err != nil {
59+
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
60+
}
61+
62+
for _, sourceMeta := range sourceMetas {
63+
sourceMeta.ExtraTags = map[string]string{}
64+
metadata[sourceMeta.SourceId] = sourceMeta
65+
}
66+
67+
for _, extraTag := range extraTags {
68+
sourceMeta, ok := metadata[extraTag.SourceId]
69+
if ok {
70+
sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value
71+
}
72+
}
73+
74+
obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
75+
obj.UpdateCache()
76+
77+
return obj, nil
78+
}

0 commit comments

Comments
 (0)