From 6f9d0a5ddbc11775f60a83c8aa7a039ff002fcfd Mon Sep 17 00:00:00 2001 From: dbaseqp Date: Sun, 25 Jan 2026 22:13:53 -0800 Subject: [PATCH 1/6] init --- IMPLEMENTATION_SUMMARY.md | 74 ++++++++++++++++++++++++++++++++++++++ engine/db/db.go | 29 +++++++++++++++ engine/db/rounds.go | 4 +++ engine/db/servicechecks.go | 7 ++-- engine/engine.go | 7 ++++ 5 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 IMPLEMENTATION_SUMMARY.md diff --git a/IMPLEMENTATION_SUMMARY.md b/IMPLEMENTATION_SUMMARY.md new file mode 100644 index 00000000..9297e0df --- /dev/null +++ b/IMPLEMENTATION_SUMMARY.md @@ -0,0 +1,74 @@ +# PostgreSQL Materialized Views Implementation Summary + +## Overview +Implemented PostgreSQL materialized views to optimize graph data queries, specifically the cumulative scores calculation that previously ran expensive window functions on every API call. + +## Changes Made + +### 1. Materialized View Creation (`engine/db/db.go`) +- Added creation of `cumulative_scores` materialized view in the `Connect()` function +- The view pre-computes cumulative points per team per round using window functions +- Created unique index on `(round_id, team_id)` to enable `REFRESH CONCURRENTLY` +- Updated `ResetScores()` to refresh the materialized view when scores are reset + +**SQL Created:** +```sql +CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS +SELECT DISTINCT + round_id, + team_id, + SUM(CASE WHEN result = '1' THEN points ELSE 0 END) + OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points +FROM service_check_schemas +ORDER BY team_id, round_id; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_cumulative_scores_round_team +ON cumulative_scores (round_id, team_id); +``` + +### 2. Refresh Function (`engine/db/rounds.go`) +- Added `RefreshScoresMaterializedView()` function +- Uses `REFRESH MATERIALIZED VIEW CONCURRENTLY` to avoid blocking reads during refresh + +### 3. Automatic Refresh Trigger (`engine/engine.go`) +- Modified `processCollectedResults()` to refresh the materialized view after processing each round +- Refresh runs asynchronously in a goroutine to avoid blocking the next round +- Logs errors if refresh fails without disrupting the engine + +### 4. Optimized Query (`engine/db/servicechecks.go`) +- Updated `GetServiceCheckSumByRound()` to query from `cumulative_scores` view +- Replaced expensive window function with simple SELECT from materialized view +- SLA penalty calculation remains unchanged (still queried from `sla_schemas`) + +## Benefits + +1. **Performance**: Graph API (`GetScoreStatus`) now queries pre-computed results instead of running window functions +2. **Scalability**: Query time is constant regardless of number of rounds +3. **Non-blocking**: CONCURRENT refresh allows reads during updates +4. **Automatic**: View refreshes after each round without manual intervention + +## Technical Notes + +### Unique Index Requirement +The unique index on `(round_id, team_id)` is required for CONCURRENT refresh. Without it, PostgreSQL would lock the view during refresh, blocking all reads. + +### SLA Penalties +SLA penalties are still calculated separately and applied to the cumulative scores in the application layer. This maintains the existing business logic where SLA penalties affect all subsequent rounds. + +### Initial State +The materialized view is created empty during database initialization. It will be populated as rounds are processed. If the database already contains round data, a manual `REFRESH MATERIALIZED VIEW cumulative_scores` should be run once. + +### Error Handling +- View creation errors are fatal (application won't start) +- Refresh errors are logged but don't stop the engine +- This ensures the system remains operational even if materialized view refresh fails + +## Future Considerations + +1. **SLA Penalties View**: Currently SLA penalties are queried separately. Could be combined into the materialized view for further optimization. + +2. **Service Status**: `GetServiceStatus` only queries the last round and is already fast. No materialization needed. + +3. **Manual Refresh**: Consider adding an admin API endpoint to manually trigger refresh if needed. + +4. **Monitoring**: Add metrics to track refresh time and success rate. diff --git a/engine/db/db.go b/engine/db/db.go index 146e86a7..e1db3520 100644 --- a/engine/db/db.go +++ b/engine/db/db.go @@ -47,6 +47,30 @@ func Connect(connectURL string) { if err != nil { log.Fatalln("Failed to auto migrate:", err) } + + // Create materialized view for cumulative scores + err = db.Exec(` + CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS + SELECT DISTINCT + round_id, + team_id, + SUM(CASE WHEN result = '1' THEN points ELSE 0 END) + OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points + FROM service_check_schemas + ORDER BY team_id, round_id + `).Error + if err != nil { + log.Fatalln("Failed to create materialized view:", err) + } + + // Create unique index to enable CONCURRENT refresh + err = db.Exec(` + CREATE UNIQUE INDEX IF NOT EXISTS idx_cumulative_scores_round_team + ON cumulative_scores (round_id, team_id) + `).Error + if err != nil { + log.Fatalln("Failed to create unique index on materialized view:", err) + } } func AddTeams(conf *config.ConfigSettings) error { @@ -115,5 +139,10 @@ func ResetScores() error { return err } + // Refresh the materialized view to clear it + if err := db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error; err != nil { + return err + } + return nil } diff --git a/engine/db/rounds.go b/engine/db/rounds.go index 87a4794f..250f761b 100644 --- a/engine/db/rounds.go +++ b/engine/db/rounds.go @@ -50,3 +50,7 @@ func GetLastRound() (RoundSchema, error) { } return round, nil } + +func RefreshScoresMaterializedView() error { + return db.Exec("REFRESH MATERIALIZED VIEW CONCURRENTLY cumulative_scores").Error +} diff --git a/engine/db/servicechecks.go b/engine/db/servicechecks.go index 7131789f..48dcb0b9 100644 --- a/engine/db/servicechecks.go +++ b/engine/db/servicechecks.go @@ -55,11 +55,10 @@ func GetServiceCheckSumByRound() ([]map[uint]int, error) { // creates array with size of num rounds result := make([]map[uint]int, last.ID) + // Query from materialized view instead of running window function each time rows, err := db.Raw(` - SELECT DISTINCT round_id, team_id, - SUM(CASE WHEN result = '1' THEN points ELSE 0 END) - OVER(PARTITION BY team_id ORDER BY round_id) - FROM service_check_schemas + SELECT round_id, team_id, cumulative_points + FROM cumulative_scores ORDER BY team_id, round_id `).Rows() if err != nil { diff --git a/engine/engine.go b/engine/engine.go index 1576d00f..63a1bc01 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -573,4 +573,11 @@ func (se *ScoringEngine) processCollectedResults(results []checks.Result) { } slog.Debug("Successfully processed results for round", "round", se.CurrentRound, "total", len(dbResults)) + + // Refresh materialized view asynchronously + go func() { + if err := db.RefreshScoresMaterializedView(); err != nil { + slog.Error("failed to refresh materialized view", "round", se.CurrentRound, "error", err) + } + }() } From f0bd4571d3953ea3ce2634e068c937c44cf72d2e Mon Sep 17 00:00:00 2001 From: dbaseqp Date: Sun, 25 Jan 2026 22:18:23 -0800 Subject: [PATCH 2/6] only compute after round --- engine/db/db.go | 3 ++- engine/db/rounds.go | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/engine/db/db.go b/engine/db/db.go index e1db3520..f7d9c5b9 100644 --- a/engine/db/db.go +++ b/engine/db/db.go @@ -48,7 +48,7 @@ func Connect(connectURL string) { log.Fatalln("Failed to auto migrate:", err) } - // Create materialized view for cumulative scores + // Create materialized view for cumulative scores (WITH NO DATA to avoid computation at startup) err = db.Exec(` CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS SELECT DISTINCT @@ -58,6 +58,7 @@ func Connect(connectURL string) { OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points FROM service_check_schemas ORDER BY team_id, round_id + WITH NO DATA `).Error if err != nil { log.Fatalln("Failed to create materialized view:", err) diff --git a/engine/db/rounds.go b/engine/db/rounds.go index 250f761b..8096040a 100644 --- a/engine/db/rounds.go +++ b/engine/db/rounds.go @@ -52,5 +52,19 @@ func GetLastRound() (RoundSchema, error) { } func RefreshScoresMaterializedView() error { + // Check if the view has data - CONCURRENTLY only works on populated views + var hasData bool + err := db.Raw("SELECT EXISTS(SELECT 1 FROM cumulative_scores LIMIT 1)").Scan(&hasData).Error + if err != nil { + // View is likely unpopulated (WITH NO DATA), use blocking refresh + return db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error + } + + if !hasData { + // First refresh - must use blocking refresh + return db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error + } + + // Subsequent refreshes - use concurrent refresh to avoid blocking reads return db.Exec("REFRESH MATERIALIZED VIEW CONCURRENTLY cumulative_scores").Error } From 12f5cadf6b7548c63df8d22a4c61918bfa4d227f Mon Sep 17 00:00:00 2001 From: dbaseqp Date: Sun, 25 Jan 2026 23:41:51 -0800 Subject: [PATCH 3/6] refactor --- engine/db/db.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/engine/db/db.go b/engine/db/db.go index f7d9c5b9..37230b00 100644 --- a/engine/db/db.go +++ b/engine/db/db.go @@ -48,8 +48,14 @@ func Connect(connectURL string) { log.Fatalln("Failed to auto migrate:", err) } - // Create materialized view for cumulative scores (WITH NO DATA to avoid computation at startup) - err = db.Exec(` + // Create materialized views + createCumulativeScoresView() +} + +// createCumulativeScoresView creates the materialized view for cumulative scores. +// Uses WITH NO DATA to avoid expensive computation at startup; first refresh populates it. +func createCumulativeScoresView() { + err := db.Exec(` CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS SELECT DISTINCT round_id, @@ -61,16 +67,16 @@ func Connect(connectURL string) { WITH NO DATA `).Error if err != nil { - log.Fatalln("Failed to create materialized view:", err) + log.Fatalln("Failed to create cumulative_scores materialized view:", err) } - // Create unique index to enable CONCURRENT refresh + // Unique index required to enable REFRESH CONCURRENTLY err = db.Exec(` CREATE UNIQUE INDEX IF NOT EXISTS idx_cumulative_scores_round_team ON cumulative_scores (round_id, team_id) `).Error if err != nil { - log.Fatalln("Failed to create unique index on materialized view:", err) + log.Fatalln("Failed to create index on cumulative_scores:", err) } } From 361851c2932a49fd6b8da21b0c27baf777ddc2bb Mon Sep 17 00:00:00 2001 From: dbaseqp Date: Sun, 25 Jan 2026 23:42:35 -0800 Subject: [PATCH 4/6] remove md --- IMPLEMENTATION_SUMMARY.md | 74 --------------------------------------- 1 file changed, 74 deletions(-) delete mode 100644 IMPLEMENTATION_SUMMARY.md diff --git a/IMPLEMENTATION_SUMMARY.md b/IMPLEMENTATION_SUMMARY.md deleted file mode 100644 index 9297e0df..00000000 --- a/IMPLEMENTATION_SUMMARY.md +++ /dev/null @@ -1,74 +0,0 @@ -# PostgreSQL Materialized Views Implementation Summary - -## Overview -Implemented PostgreSQL materialized views to optimize graph data queries, specifically the cumulative scores calculation that previously ran expensive window functions on every API call. - -## Changes Made - -### 1. Materialized View Creation (`engine/db/db.go`) -- Added creation of `cumulative_scores` materialized view in the `Connect()` function -- The view pre-computes cumulative points per team per round using window functions -- Created unique index on `(round_id, team_id)` to enable `REFRESH CONCURRENTLY` -- Updated `ResetScores()` to refresh the materialized view when scores are reset - -**SQL Created:** -```sql -CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS -SELECT DISTINCT - round_id, - team_id, - SUM(CASE WHEN result = '1' THEN points ELSE 0 END) - OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points -FROM service_check_schemas -ORDER BY team_id, round_id; - -CREATE UNIQUE INDEX IF NOT EXISTS idx_cumulative_scores_round_team -ON cumulative_scores (round_id, team_id); -``` - -### 2. Refresh Function (`engine/db/rounds.go`) -- Added `RefreshScoresMaterializedView()` function -- Uses `REFRESH MATERIALIZED VIEW CONCURRENTLY` to avoid blocking reads during refresh - -### 3. Automatic Refresh Trigger (`engine/engine.go`) -- Modified `processCollectedResults()` to refresh the materialized view after processing each round -- Refresh runs asynchronously in a goroutine to avoid blocking the next round -- Logs errors if refresh fails without disrupting the engine - -### 4. Optimized Query (`engine/db/servicechecks.go`) -- Updated `GetServiceCheckSumByRound()` to query from `cumulative_scores` view -- Replaced expensive window function with simple SELECT from materialized view -- SLA penalty calculation remains unchanged (still queried from `sla_schemas`) - -## Benefits - -1. **Performance**: Graph API (`GetScoreStatus`) now queries pre-computed results instead of running window functions -2. **Scalability**: Query time is constant regardless of number of rounds -3. **Non-blocking**: CONCURRENT refresh allows reads during updates -4. **Automatic**: View refreshes after each round without manual intervention - -## Technical Notes - -### Unique Index Requirement -The unique index on `(round_id, team_id)` is required for CONCURRENT refresh. Without it, PostgreSQL would lock the view during refresh, blocking all reads. - -### SLA Penalties -SLA penalties are still calculated separately and applied to the cumulative scores in the application layer. This maintains the existing business logic where SLA penalties affect all subsequent rounds. - -### Initial State -The materialized view is created empty during database initialization. It will be populated as rounds are processed. If the database already contains round data, a manual `REFRESH MATERIALIZED VIEW cumulative_scores` should be run once. - -### Error Handling -- View creation errors are fatal (application won't start) -- Refresh errors are logged but don't stop the engine -- This ensures the system remains operational even if materialized view refresh fails - -## Future Considerations - -1. **SLA Penalties View**: Currently SLA penalties are queried separately. Could be combined into the materialized view for further optimization. - -2. **Service Status**: `GetServiceStatus` only queries the last round and is already fast. No materialization needed. - -3. **Manual Refresh**: Consider adding an admin API endpoint to manually trigger refresh if needed. - -4. **Monitoring**: Add metrics to track refresh time and success rate. From a7398d5cd422de07faca23e4c7da8639a7246591 Mon Sep 17 00:00:00 2001 From: dbaseqp Date: Thu, 29 Jan 2026 22:38:16 -0800 Subject: [PATCH 5/6] more optimized --- engine/db/db.go | 10 ++- engine/db/rounds.go | 15 +---- tests/integration/materialized_view_test.go | 69 +++++++++++++++++++++ 3 files changed, 78 insertions(+), 16 deletions(-) create mode 100644 tests/integration/materialized_view_test.go diff --git a/engine/db/db.go b/engine/db/db.go index 37230b00..0c2ebe1f 100644 --- a/engine/db/db.go +++ b/engine/db/db.go @@ -53,8 +53,9 @@ func Connect(connectURL string) { } // createCumulativeScoresView creates the materialized view for cumulative scores. -// Uses WITH NO DATA to avoid expensive computation at startup; first refresh populates it. func createCumulativeScoresView() { + // Create the materialized view if it doesn't exist + // If it does exist, CREATE won't refresh it, so we do that separately err := db.Exec(` CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS SELECT DISTINCT @@ -64,7 +65,6 @@ func createCumulativeScoresView() { OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points FROM service_check_schemas ORDER BY team_id, round_id - WITH NO DATA `).Error if err != nil { log.Fatalln("Failed to create cumulative_scores materialized view:", err) @@ -78,6 +78,12 @@ func createCumulativeScoresView() { if err != nil { log.Fatalln("Failed to create index on cumulative_scores:", err) } + + // Ensure view is populated/fresh on startup in case there was existing data + err = db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error + if err != nil { + log.Fatalln("Failed to refresh cumulative_scores materialized view:", err) + } } func AddTeams(conf *config.ConfigSettings) error { diff --git a/engine/db/rounds.go b/engine/db/rounds.go index 8096040a..6c3d0d0a 100644 --- a/engine/db/rounds.go +++ b/engine/db/rounds.go @@ -52,19 +52,6 @@ func GetLastRound() (RoundSchema, error) { } func RefreshScoresMaterializedView() error { - // Check if the view has data - CONCURRENTLY only works on populated views - var hasData bool - err := db.Raw("SELECT EXISTS(SELECT 1 FROM cumulative_scores LIMIT 1)").Scan(&hasData).Error - if err != nil { - // View is likely unpopulated (WITH NO DATA), use blocking refresh - return db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error - } - - if !hasData { - // First refresh - must use blocking refresh - return db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error - } - - // Subsequent refreshes - use concurrent refresh to avoid blocking reads + // Use concurrent refresh to avoid blocking reads return db.Exec("REFRESH MATERIALIZED VIEW CONCURRENTLY cumulative_scores").Error } diff --git a/tests/integration/materialized_view_test.go b/tests/integration/materialized_view_test.go new file mode 100644 index 00000000..d6133f3e --- /dev/null +++ b/tests/integration/materialized_view_test.go @@ -0,0 +1,69 @@ +package integration + +import ( + "quotient/engine/db" + "quotient/tests/testutil" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMaterializedViewLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + // Start PostgreSQL connection + pgContainer := testutil.StartPostgres(t) + + // Initialize database connection + // This calls createCumulativeScoresView() internally, which now includes the initial REFRESH + db.Connect(pgContainer.ConnectionString()) + + // Data cleanup + err := db.ResetScores() + require.NoError(t, err, "ResetScores should succeed") + + t.Run("refresh with zero rows", func(t *testing.T) { + // The view should operate correctly even with no data + err := db.RefreshScoresMaterializedView() + require.NoError(t, err, "RefreshScoresMaterializedView should succeed with 0 rows") + }) + + t.Run("refresh with data", func(t *testing.T) { + // Add a team + team := db.TeamSchema{ + Name: "ViewTestTeam", + Active: true, + Identifier: "vt1", + } + teamCreated, err := db.CreateTeam(team) + require.NoError(t, err) + + // Create a round with a result + check := db.ServiceCheckSchema{ + TeamID: teamCreated.ID, + RoundID: 1, + ServiceName: "test-service", + Points: 10, + Result: true, + } + + round := db.RoundSchema{ + ID: 1, + StartTime: time.Now(), + Checks: []db.ServiceCheckSchema{check}, + } + + _, err = db.CreateRound(round) + require.NoError(t, err, "should save round to database") + + // Refresh should succeed with data + err = db.RefreshScoresMaterializedView() + require.NoError(t, err, "RefreshScoresMaterializedView should succeed with data") + + // Optional: We could verify data via db.GetServiceCheckSumByRound() if we wanted to be thorough + // but the main point here is that the REFRESH command doesn't throw an error. + }) +} From 6a6fe35bd7bf4b4a88232a08bfdcfee8c577762a Mon Sep 17 00:00:00 2001 From: dbaseqp Date: Thu, 29 Jan 2026 23:09:37 -0800 Subject: [PATCH 6/6] avoid possibly spawning multiple refreshes --- engine/engine.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 63a1bc01..21817bca 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -9,6 +9,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "unicode/utf8" @@ -31,6 +32,9 @@ type ScoringEngine struct { CurrentRoundStartTime time.Time RedisClient *redis.Client + // Concurrency control for materialized view refresh + Refreshing atomic.Bool + // Config update handling configPath string } @@ -574,10 +578,16 @@ func (se *ScoringEngine) processCollectedResults(results []checks.Result) { slog.Debug("Successfully processed results for round", "round", se.CurrentRound, "total", len(dbResults)) - // Refresh materialized view asynchronously - go func() { - if err := db.RefreshScoresMaterializedView(); err != nil { - slog.Error("failed to refresh materialized view", "round", se.CurrentRound, "error", err) - } - }() + // Refresh materialized view asynchronously, but avoid concurrent refreshes + currentRound := se.CurrentRound + if se.Refreshing.CompareAndSwap(false, true) { + go func(round uint) { + defer se.Refreshing.Store(false) + if err := db.RefreshScoresMaterializedView(); err != nil { + slog.Error("failed to refresh materialized view", "round", round, "error", err) + } + }(currentRound) + } else { + slog.Debug("refresh already in progress, skipping refresh spawn", "round", currentRound) + } }