Skip to content

Commit 9a2e2e2

Browse files
authored
feat: add graceful shutdown and refactor validation pipeline (#21)
- Add signal handling (SIGTERM/SIGINT) with proper cleanup using parent/child stopper contexts - Check for pending jobs on source table before validation to prevent conflicts - Refactor validation steps into uniform pipeline with better error reporting Fixes: #20
1 parent 5e69f0d commit 9a2e2e2

File tree

6 files changed

+166
-62
lines changed

6 files changed

+166
-62
lines changed

cmd/s3/s3.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
package s3
1616

1717
import (
18+
"log/slog"
19+
"os"
20+
"os/signal"
21+
"syscall"
22+
1823
"github.com/spf13/cobra"
1924

2025
"github.com/cockroachdb/field-eng-powertools/stopper"
@@ -29,7 +34,22 @@ func command(env *env.Env) *cobra.Command {
2934
Use: "s3",
3035
Short: "Performs a validation test for a s3 object store",
3136
RunE: func(cmd *cobra.Command, args []string) error {
32-
ctx := stopper.WithContext(cmd.Context())
37+
// Parent context for cleanup operations
38+
parentCtx := stopper.WithContext(cmd.Context())
39+
// Child context for validator operations that can be stopped
40+
ctx := stopper.WithContext(parentCtx)
41+
42+
// Set up signal handler
43+
sigChan := make(chan os.Signal, 1)
44+
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
45+
defer signal.Stop(sigChan)
46+
47+
go func() {
48+
sig := <-sigChan
49+
slog.Info("Received signal, stopping validator", slog.String("signal", sig.String()))
50+
ctx.Stop(0)
51+
}()
52+
3353
store, err := blob.S3FromEnv(ctx, env)
3454
if err != nil {
3555
return err
@@ -44,12 +64,16 @@ func command(env *env.Env) *cobra.Command {
4464
if err != nil {
4565
return err
4666
}
47-
defer validator.Clean(ctx)
67+
// Use parent context for cleanup so it can access the database
68+
defer validator.Clean(parentCtx)
69+
4870
report, err := validator.Validate(ctx)
4971
if err != nil {
5072
return err
5173
}
52-
format.Report(cmd.OutOrStdout(), report)
74+
if report != nil {
75+
format.Report(cmd.OutOrStdout(), report)
76+
}
5377
return nil
5478
},
5579
}

internal/db/kvtable.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,28 @@ func (t *KvTable) Fingerprint(ctx *stopper.Context, conn *pgxpool.Conn) (string,
122122
}
123123
return b.String(), rows.Err()
124124
}
125+
126+
const jobsStmt = `
127+
SELECT job_id
128+
FROM [SHOW JOBS]
129+
WHERE
130+
NOT status = ANY (@status)
131+
AND description LIKE @desc
132+
`
133+
134+
var pendingStatues = []string{"succeeded", "failed"}
135+
136+
// PendingJobs returns a list of job IDs that are still pending (not succeeded or failed).
137+
func (t *KvTable) PendingJobs(ctx *stopper.Context, conn *pgxpool.Conn) ([]int64, error) {
138+
slog.Debug("Checking for pending jobs", slog.String("table", t.String()))
139+
rows, err := conn.Query(ctx, jobsStmt, pgx.NamedArgs{
140+
"status": pendingStatues,
141+
"desc": fmt.Sprintf("%%%s%%", t.Name),
142+
})
143+
if err != nil {
144+
return nil, err
145+
}
146+
defer rows.Close()
147+
148+
return pgx.CollectRows(rows, pgx.RowTo[int64])
149+
}

internal/validate/backup.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ package validate
1717
import (
1818
"log/slog"
1919

20-
"github.com/jackc/pgx/v5/pgxpool"
21-
2220
"github.com/cockroachdb/errors"
2321
"github.com/cockroachdb/field-eng-powertools/stopper"
2422
"github.com/cockroachlabs-field/blobcheck/internal/db"
@@ -62,9 +60,13 @@ func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn)
6260
}
6361

6462
// performRestore restores the backup to a separate database.
65-
func (v *Validator) performRestore(
66-
ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn,
67-
) error {
63+
func (v *Validator) performRestore(ctx *stopper.Context, extConn *db.ExternalConn) error {
64+
conn, err := v.acquireConn(ctx)
65+
if err != nil {
66+
return err
67+
}
68+
defer conn.Release()
69+
6870
slog.Info("restoring backup")
6971
if err := v.restoredTable.Restore(ctx, conn, extConn, &v.sourceTable); err != nil {
7072
return errors.Wrap(err, "failed to restore backup")
@@ -88,9 +90,12 @@ func (v *Validator) runFullBackup(ctx *stopper.Context, extConn *db.ExternalConn
8890
}
8991

9092
// runIncrementalBackup runs an incremental backup.
91-
func (v *Validator) runIncrementalBackup(
92-
ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn,
93-
) error {
93+
func (v *Validator) runIncrementalBackup(ctx *stopper.Context, extConn *db.ExternalConn) error {
94+
conn, err := v.acquireConn(ctx)
95+
if err != nil {
96+
return err
97+
}
98+
defer conn.Release()
9499
slog.Info("starting incremental backup")
95100
if err := v.sourceTable.Backup(ctx, conn, extConn, true); err != nil {
96101
return errors.Wrap(err, "failed to create incremental backup")
@@ -99,7 +104,13 @@ func (v *Validator) runIncrementalBackup(
99104
}
100105

101106
// verifyIntegrity checks that the restored data matches the original.
102-
func (v *Validator) verifyIntegrity(ctx *stopper.Context, conn *pgxpool.Conn) error {
107+
func (v *Validator) verifyIntegrity(ctx *stopper.Context) error {
108+
conn, err := v.acquireConn(ctx)
109+
if err != nil {
110+
return err
111+
}
112+
defer conn.Release()
113+
103114
slog.Info("checking integrity")
104115
original, err := v.sourceTable.Fingerprint(ctx, conn)
105116
if err != nil {

internal/validate/db.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,24 @@ func (v *Validator) acquireConn(ctx *stopper.Context) (*pgxpool.Conn, error) {
3333
return conn, nil
3434
}
3535

36+
// captureInitialStats captures initial database statistics.
37+
func (v *Validator) captureInitialStats(
38+
ctx *stopper.Context, extConn *db.ExternalConn,
39+
) ([]*db.Stats, error) {
40+
conn, err := v.acquireConn(ctx)
41+
if err != nil {
42+
return nil, err
43+
}
44+
defer conn.Release()
45+
46+
slog.Info("capturing initial statistics")
47+
stats, err := extConn.Stats(ctx, conn)
48+
if err != nil {
49+
return nil, errors.Wrap(err, "failed to capture initial statistics")
50+
}
51+
return stats, nil
52+
}
53+
3654
// createSourceTable creates the source database and table.
3755
func createSourceTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable, error) {
3856
source := db.Database{Name: "_blobcheck"}
@@ -66,15 +84,3 @@ func createRestoredTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable,
6684
}
6785
return restoredTable, nil
6886
}
69-
70-
// captureInitialStats captures initial database statistics.
71-
func captureInitialStats(
72-
ctx *stopper.Context, extConn *db.ExternalConn, conn *pgxpool.Conn,
73-
) ([]*db.Stats, error) {
74-
slog.Info("capturing initial statistics")
75-
stats, err := extConn.Stats(ctx, conn)
76-
if err != nil {
77-
return nil, errors.Wrap(err, "failed to capture initial statistics")
78-
}
79-
return stats, nil
80-
}

internal/validate/validate.go

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ func New(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) (*Validat
7777
return nil, err
7878
}
7979

80+
// Check for pending jobs on the source table
81+
pendingJobs, err := sourceTable.PendingJobs(ctx, conn)
82+
if err != nil {
83+
return nil, errors.Wrap(err, "failed to check for pending jobs on source table")
84+
}
85+
if len(pendingJobs) > 0 {
86+
slog.Error("pending jobs found on source table. Please review and cancel them.", slog.Any("job_ids", pendingJobs))
87+
return nil, errors.New("pending jobs found on source table")
88+
}
89+
8090
restoredTable, err := createRestoredTable(ctx, conn)
8191
if err != nil {
8292
return nil, err
@@ -113,24 +123,34 @@ func preflight(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) err
113123

114124
// Clean removes all resources created by the validator.
115125
func (v *Validator) Clean(ctx *stopper.Context) error {
126+
slog.Debug("Starting cleanup of validator resources")
116127
conn, err := v.acquireConn(ctx)
117128
if err != nil {
118129
return err
119130
}
120131
defer conn.Release()
121132

122133
var e1, e2 error
134+
slog.Debug("Dropping source database", slog.String("database", v.sourceTable.Database.String()))
123135
if err := v.sourceTable.Database.Drop(ctx, conn); err != nil {
124-
slog.Error("drop source DB", "err", err)
125136
e1 = errors.Wrap(err, "failed to drop source database")
126137
}
138+
slog.Debug("Dropping restored database", slog.String("database", v.restoredTable.Database.String()))
127139
if err := v.restoredTable.Database.Drop(ctx, conn); err != nil {
128-
slog.Error("drop restored DB", "err", err)
129140
e2 = errors.Wrap(err, "failed to drop restored database")
130141
}
131142
return errors.Join(e1, e2)
132143
}
133144

145+
// validationStepFn is a function that performs a validation step.
146+
type validationStepFn func(ctx *stopper.Context, extConn *db.ExternalConn) error
147+
148+
// validationStep represents a step in the validation process.
149+
type validationStep struct {
150+
name string
151+
fn validationStepFn
152+
}
153+
134154
// Validate performs a backup/restore against a storage provider
135155
// to asses minimum compatibility at the functional level.
136156
// This does not imply that a storage provider passing the test is supported.
@@ -148,31 +168,55 @@ func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) {
148168
}
149169
defer extConn.Drop(ctx, conn)
150170

151-
stats, err := captureInitialStats(ctx, extConn, conn)
152-
if err != nil {
153-
return nil, err
154-
}
155-
156-
if err := v.runWorkloadWithBackup(ctx, extConn); err != nil {
157-
return nil, err
158-
}
159-
160-
if err := v.runIncrementalBackup(ctx, conn, extConn); err != nil {
161-
return nil, err
162-
}
163-
164-
if err := v.checkBackups(ctx, extConn); err != nil {
165-
return nil, err
166-
}
167-
168-
if err := v.performRestore(ctx, conn, extConn); err != nil {
169-
return nil, err
170-
}
171-
172-
if err := v.verifyIntegrity(ctx, conn); err != nil {
173-
// If we fail to verify the integrity, just log the error, but
174-
// still provide a complete report
175-
slog.Error("failed to verify integrity", slog.Any("error", err))
171+
var stats []*db.Stats
172+
173+
// Define validation steps
174+
steps := []validationStep{
175+
{
176+
name: "capture initial stats",
177+
fn: func(ctx *stopper.Context, extConn *db.ExternalConn) error {
178+
var err error
179+
stats, err = v.captureInitialStats(ctx, extConn)
180+
return err
181+
},
182+
},
183+
{
184+
name: "workload with backup",
185+
fn: v.runWorkloadWithBackup,
186+
},
187+
{
188+
name: "incremental backup",
189+
fn: v.runIncrementalBackup,
190+
},
191+
{
192+
name: "check backups",
193+
fn: v.checkBackups,
194+
},
195+
{
196+
name: "restore",
197+
fn: v.performRestore,
198+
},
199+
{
200+
name: "verify integrity",
201+
fn: func(ctx *stopper.Context, extConn *db.ExternalConn) error {
202+
if err := v.verifyIntegrity(ctx); err != nil {
203+
// If we fail to verify the integrity, just log the error, but
204+
// still provide a complete report
205+
slog.Error("failed to verify integrity", slog.Any("error", err))
206+
}
207+
return nil
208+
},
209+
},
210+
}
211+
212+
// Execute steps
213+
for _, step := range steps {
214+
if ctx.IsStopping() {
215+
return nil, ctx.Err()
216+
}
217+
if err := step.fn(ctx, extConn); err != nil {
218+
return nil, errors.Wrapf(err, "failed during step: %s", step.name)
219+
}
176220
}
177221

178222
return &Report{

internal/validate/workload.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ func (v *Validator) runWorkloadWithBackup(ctx *stopper.Context, extConn *db.Exte
3333
if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil {
3434
return errors.Wrap(err, "failed to run initial workload")
3535
}
36+
if ctx.IsStopping() {
37+
return nil
38+
}
3639
return v.runConcurrentWorkloadAndBackup(ctx, extConn)
3740
}
3841

@@ -45,8 +48,9 @@ func (v *Validator) runConcurrentWorkloadAndBackup(
4548
for w := range v.env.Workers {
4649
g.Add(1)
4750
ctx.Go(func(ctx *stopper.Context) error {
51+
slog.Info("starting", "worker", w)
4852
defer g.Done()
49-
return v.runWorkloadWorker(ctx, w)
53+
return v.runWorkload(ctx, v.env.WorkloadDuration)
5054
})
5155
}
5256

@@ -86,13 +90,3 @@ func (v *Validator) runWorkload(ctx *stopper.Context, duration time.Duration) er
8690
}
8791
return nil
8892
}
89-
90-
// runWorkloadWorker runs a single worker instance.
91-
func (v *Validator) runWorkloadWorker(ctx *stopper.Context, workerID int) error {
92-
slog.Info("starting", "worker", workerID)
93-
if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil {
94-
slog.Error("worker failed", "worker", workerID, "error", err)
95-
return errors.Wrapf(err, "worker %d failed", workerID)
96-
}
97-
return nil
98-
}

0 commit comments

Comments
 (0)