diff --git a/engine/db/db.go b/engine/db/db.go index 146e86a..0c2ebe1 100644 --- a/engine/db/db.go +++ b/engine/db/db.go @@ -47,6 +47,43 @@ func Connect(connectURL string) { if err != nil { log.Fatalln("Failed to auto migrate:", err) } + + // Create materialized views + createCumulativeScoresView() +} + +// createCumulativeScoresView creates the materialized view for cumulative scores. +func createCumulativeScoresView() { + // Create the materialized view if it doesn't exist + // If it does exist, CREATE won't refresh it, so we do that separately + err := db.Exec(` + CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS + SELECT DISTINCT + round_id, + team_id, + SUM(CASE WHEN result = '1' THEN points ELSE 0 END) + OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points + FROM service_check_schemas + ORDER BY team_id, round_id + `).Error + if err != nil { + log.Fatalln("Failed to create cumulative_scores materialized view:", err) + } + + // Unique index required to enable REFRESH CONCURRENTLY + err = db.Exec(` + CREATE UNIQUE INDEX IF NOT EXISTS idx_cumulative_scores_round_team + ON cumulative_scores (round_id, team_id) + `).Error + if err != nil { + log.Fatalln("Failed to create index on cumulative_scores:", err) + } + + // Ensure view is populated/fresh on startup in case there was existing data + err = db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error + if err != nil { + log.Fatalln("Failed to refresh cumulative_scores materialized view:", err) + } } func AddTeams(conf *config.ConfigSettings) error { @@ -115,5 +152,10 @@ func ResetScores() error { return err } + // Refresh the materialized view to clear it + if err := db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error; err != nil { + return err + } + return nil } diff --git a/engine/db/rounds.go b/engine/db/rounds.go index 87a4794..6c3d0d0 100644 --- a/engine/db/rounds.go +++ b/engine/db/rounds.go @@ -50,3 +50,8 @@ func GetLastRound() (RoundSchema, error) { } return round, nil } + +func RefreshScoresMaterializedView() error { + // Use concurrent refresh to avoid blocking reads + return db.Exec("REFRESH MATERIALIZED VIEW CONCURRENTLY cumulative_scores").Error +} diff --git a/engine/db/servicechecks.go b/engine/db/servicechecks.go index 7131789..48dcb0b 100644 --- a/engine/db/servicechecks.go +++ b/engine/db/servicechecks.go @@ -55,11 +55,10 @@ func GetServiceCheckSumByRound() ([]map[uint]int, error) { // creates array with size of num rounds result := make([]map[uint]int, last.ID) + // Query from materialized view instead of running window function each time rows, err := db.Raw(` - SELECT DISTINCT round_id, team_id, - SUM(CASE WHEN result = '1' THEN points ELSE 0 END) - OVER(PARTITION BY team_id ORDER BY round_id) - FROM service_check_schemas + SELECT round_id, team_id, cumulative_points + FROM cumulative_scores ORDER BY team_id, round_id `).Rows() if err != nil { diff --git a/engine/engine.go b/engine/engine.go index 1576d00..21817bc 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -9,6 +9,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "unicode/utf8" @@ -31,6 +32,9 @@ type ScoringEngine struct { CurrentRoundStartTime time.Time RedisClient *redis.Client + // Concurrency control for materialized view refresh + Refreshing atomic.Bool + // Config update handling configPath string } @@ -573,4 +577,17 @@ func (se *ScoringEngine) processCollectedResults(results []checks.Result) { } slog.Debug("Successfully processed results for round", "round", se.CurrentRound, "total", len(dbResults)) + + // Refresh materialized view asynchronously, but avoid concurrent refreshes + currentRound := se.CurrentRound + if se.Refreshing.CompareAndSwap(false, true) { + go func(round uint) { + defer se.Refreshing.Store(false) + if err := db.RefreshScoresMaterializedView(); err != nil { + slog.Error("failed to refresh materialized view", "round", round, "error", err) + } + }(currentRound) + } else { + slog.Debug("refresh already in progress, skipping refresh spawn", "round", currentRound) + } } diff --git a/tests/integration/materialized_view_test.go b/tests/integration/materialized_view_test.go new file mode 100644 index 0000000..d6133f3 --- /dev/null +++ b/tests/integration/materialized_view_test.go @@ -0,0 +1,69 @@ +package integration + +import ( + "quotient/engine/db" + "quotient/tests/testutil" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMaterializedViewLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + // Start PostgreSQL connection + pgContainer := testutil.StartPostgres(t) + + // Initialize database connection + // This calls createCumulativeScoresView() internally, which now includes the initial REFRESH + db.Connect(pgContainer.ConnectionString()) + + // Data cleanup + err := db.ResetScores() + require.NoError(t, err, "ResetScores should succeed") + + t.Run("refresh with zero rows", func(t *testing.T) { + // The view should operate correctly even with no data + err := db.RefreshScoresMaterializedView() + require.NoError(t, err, "RefreshScoresMaterializedView should succeed with 0 rows") + }) + + t.Run("refresh with data", func(t *testing.T) { + // Add a team + team := db.TeamSchema{ + Name: "ViewTestTeam", + Active: true, + Identifier: "vt1", + } + teamCreated, err := db.CreateTeam(team) + require.NoError(t, err) + + // Create a round with a result + check := db.ServiceCheckSchema{ + TeamID: teamCreated.ID, + RoundID: 1, + ServiceName: "test-service", + Points: 10, + Result: true, + } + + round := db.RoundSchema{ + ID: 1, + StartTime: time.Now(), + Checks: []db.ServiceCheckSchema{check}, + } + + _, err = db.CreateRound(round) + require.NoError(t, err, "should save round to database") + + // Refresh should succeed with data + err = db.RefreshScoresMaterializedView() + require.NoError(t, err, "RefreshScoresMaterializedView should succeed with data") + + // Optional: We could verify data via db.GetServiceCheckSumByRound() if we wanted to be thorough + // but the main point here is that the REFRESH command doesn't throw an error. + }) +}