Skip to content

Commit 27a33b3

Browse files
craig[bot]KeithCh
andcommitted
Merge #149984
149984: changefeedccl: create db-level changefeed job r=andyyang890,asg0451 a=KeithCh Support the creation of database-level changefeed jobs e.g. CREATE CHANGEFEED FOR DATABASE d; We do this by modifying AllTargets to expand database-level changefeed target spec to fetch all tables in the database. Resolves: #147371 Epic: CRDB-1421 Release note: None Co-authored-by: Keith Chow <keith.chow@cockroachlabs.com>
2 parents cd43ab3 + 7671553 commit 27a33b3

10 files changed

+478
-143
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func alterChangefeedPlanHook(
202202
return err
203203
}
204204

205-
jobRecord, err := createChangefeedJobRecord(
205+
jobRecord, targets, err := createChangefeedJobRecord(
206206
ctx,
207207
p,
208208
annotatedStmt,
@@ -251,7 +251,7 @@ func alterChangefeedPlanHook(
251251

252252
telemetry.Count(telemetryPath)
253253

254-
logAlterChangefeedTelemetry(ctx, j, jobPayload.Description)
254+
logAlterChangefeedTelemetry(ctx, j, jobPayload.Description, targets.Size)
255255

256256
select {
257257
case <-ctx.Done():
@@ -447,7 +447,10 @@ func generateAndValidateNewTargets(
447447
return nil, nil, hlc.Timestamp{}, nil, err
448448
}
449449

450-
prevTargets := AllTargets(prevDetails)
450+
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg())
451+
if err != nil {
452+
return nil, nil, hlc.Timestamp{}, nil, err
453+
}
451454
noLongerExist := make(map[string]descpb.ID)
452455
if err := prevTargets.EachTarget(func(targetSpec changefeedbase.Target) error {
453456
k := targetKey{TableID: targetSpec.DescID, FamilyName: tree.Name(targetSpec.FamilyName)}

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1313
"github.com/cockroachdb/cockroach/pkg/cloud"
1414
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
15+
"github.com/cockroachdb/cockroach/pkg/sql"
16+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
18+
"github.com/cockroachdb/cockroach/pkg/sql/isql"
1519
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
1620
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1721
"github.com/cockroachdb/cockroach/pkg/util/log"
1822
"github.com/cockroachdb/cockroach/pkg/util/tracing"
23+
"github.com/cockroachdb/errors"
1924
"github.com/gogo/protobuf/jsonpb"
2025
)
2126

@@ -37,33 +42,59 @@ type ChangefeedConfig struct {
3742

3843
// makeChangefeedConfigFromJobDetails creates a ChangefeedConfig struct from any
3944
// version of the ChangefeedDetails protobuf.
40-
func makeChangefeedConfigFromJobDetails(d jobspb.ChangefeedDetails) ChangefeedConfig {
45+
func makeChangefeedConfigFromJobDetails(
46+
ctx context.Context, d jobspb.ChangefeedDetails, execCfg *sql.ExecutorConfig,
47+
) (ChangefeedConfig, error) {
48+
targets, err := AllTargets(ctx, d, execCfg)
49+
if err != nil {
50+
return ChangefeedConfig{}, err
51+
}
4152
return ChangefeedConfig{
4253
SinkURI: d.SinkURI,
4354
Opts: changefeedbase.MakeStatementOptions(d.Opts),
4455
ScanTime: d.StatementTime,
4556
EndTime: d.EndTime,
46-
Targets: AllTargets(d),
47-
}
57+
Targets: targets,
58+
}, nil
4859
}
4960

5061
// AllTargets gets all the targets listed in a ChangefeedDetails,
5162
// from the statement time name map in old protos
5263
// or the TargetSpecifications in new ones.
53-
func AllTargets(cd jobspb.ChangefeedDetails) (targets changefeedbase.Targets) {
64+
func AllTargets(
65+
ctx context.Context, cd jobspb.ChangefeedDetails, execCfg *sql.ExecutorConfig,
66+
) (changefeedbase.Targets, error) {
67+
targets := changefeedbase.Targets{}
68+
var err error
69+
5470
// TODO: Use a version gate for this once we have CDC version gates
5571
if len(cd.TargetSpecifications) > 0 {
5672
for _, ts := range cd.TargetSpecifications {
5773
if ts.DescID > 0 {
58-
if ts.StatementTimeName == "" {
59-
ts.StatementTimeName = cd.Tables[ts.DescID].StatementTimeName
74+
switch ts.Type {
75+
case jobspb.ChangefeedTargetSpecification_DATABASE:
76+
if len(cd.TargetSpecifications) > 1 {
77+
return changefeedbase.Targets{}, errors.AssertionFailedf("database-level changefeed is not supported with multiple targets")
78+
}
79+
targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg)
80+
if err != nil {
81+
return changefeedbase.Targets{}, err
82+
}
83+
case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
84+
jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY,
85+
jobspb.ChangefeedTargetSpecification_EACH_FAMILY:
86+
if ts.StatementTimeName == "" {
87+
ts.StatementTimeName = cd.Tables[ts.DescID].StatementTimeName
88+
}
89+
targets.Add(changefeedbase.Target{
90+
Type: ts.Type,
91+
DescID: ts.DescID,
92+
FamilyName: ts.FamilyName,
93+
StatementTimeName: changefeedbase.StatementTimeName(ts.StatementTimeName),
94+
})
95+
default:
96+
return changefeedbase.Targets{}, errors.AssertionFailedf("unsupported target type: %s", ts.Type)
6097
}
61-
targets.Add(changefeedbase.Target{
62-
Type: ts.Type,
63-
DescID: ts.DescID,
64-
FamilyName: ts.FamilyName,
65-
StatementTimeName: changefeedbase.StatementTimeName(ts.StatementTimeName),
66-
})
6798
}
6899
}
69100
} else {
@@ -75,7 +106,46 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets changefeedbase.Targets) {
75106
})
76107
}
77108
}
78-
return
109+
return targets, err
110+
}
111+
112+
func getTargetsFromDatabaseSpec(
113+
ctx context.Context, ts jobspb.ChangefeedTargetSpecification, execCfg *sql.ExecutorConfig,
114+
) (targets changefeedbase.Targets, err error) {
115+
err = sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descs *descs.Collection) error {
116+
databaseDescriptor, err := descs.ByIDWithLeased(txn.KV()).Get().Database(ctx, ts.DescID)
117+
if err != nil {
118+
return err
119+
}
120+
tables, err := descs.GetAllTablesInDatabase(ctx, txn.KV(), databaseDescriptor)
121+
if err != nil {
122+
return err
123+
}
124+
for _, desc := range tables.OrderedDescriptors() {
125+
tableDesc, ok := desc.(catalog.TableDescriptor)
126+
if !ok {
127+
return errors.AssertionFailedf("expected table descriptor, got %T", desc)
128+
}
129+
// Skip virtual tables
130+
if !tableDesc.IsPhysicalTable() {
131+
continue
132+
}
133+
var tableType jobspb.ChangefeedTargetSpecification_TargetType
134+
if len(tableDesc.GetFamilies()) == 1 {
135+
tableType = jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY
136+
} else {
137+
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
138+
}
139+
140+
targets.Add(changefeedbase.Target{
141+
Type: tableType,
142+
DescID: desc.GetID(),
143+
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
144+
})
145+
}
146+
return nil
147+
})
148+
return targets, err
79149
}
80150

81151
const (

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func distChangefeedFlow(
8282
localState *cachedState,
8383
resultsCh chan<- tree.Datums,
8484
onTracingEvent func(ctx context.Context, meta *execinfrapb.TracingAggregatorEvents),
85+
targets changefeedbase.Targets,
8586
) error {
8687
opts := changefeedbase.MakeStatementOptions(details.Opts)
8788
progress := localState.progress
@@ -135,7 +136,7 @@ func distChangefeedFlow(
135136
}
136137
}
137138
return startDistChangefeed(
138-
ctx, execCtx, jobID, schemaTS, details, description, initialHighWater, localState, resultsCh, onTracingEvent)
139+
ctx, execCtx, jobID, schemaTS, details, description, initialHighWater, localState, resultsCh, onTracingEvent, targets)
139140
}
140141

141142
func fetchTableDescriptors(
@@ -235,9 +236,10 @@ func startDistChangefeed(
235236
localState *cachedState,
236237
resultsCh chan<- tree.Datums,
237238
onTracingEvent func(ctx context.Context, meta *execinfrapb.TracingAggregatorEvents),
239+
targets changefeedbase.Targets,
238240
) error {
239241
execCfg := execCtx.ExecCfg()
240-
tableDescs, err := fetchTableDescriptors(ctx, execCfg, AllTargets(details), schemaTS)
242+
tableDescs, err := fetchTableDescriptors(ctx, execCfg, targets, schemaTS)
241243
if err != nil {
242244
return err
243245
}

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ type changeAggregator struct {
121121
sliMetricsID int64
122122
closeTelemetryRecorder func()
123123
knobs TestingKnobs
124+
125+
targets changefeedbase.Targets
124126
}
125127

126128
type timestampLowerBoundOracle interface {
@@ -286,7 +288,7 @@ func (ca *changeAggregator) MustBeStreaming() bool {
286288
// wrapMetricsRecorderWithTelemetry wraps the supplied metricsRecorder
287289
// so it periodically emits metrics to telemetry.
288290
func (ca *changeAggregator) wrapMetricsRecorderWithTelemetry(
289-
ctx context.Context, recorder metricsRecorder,
291+
ctx context.Context, recorder metricsRecorder, targets changefeedbase.Targets,
290292
) (metricsRecorder, error) {
291293
details := ca.spec.Feed
292294
jobID := ca.spec.JobID
@@ -307,7 +309,7 @@ func (ca *changeAggregator) wrapMetricsRecorderWithTelemetry(
307309
description = job.Payload().Description
308310
}
309311

310-
recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, details, description, jobID, ca.FlowCtx.Cfg.Settings, recorder, ca.knobs)
312+
recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, details, description, jobID, ca.FlowCtx.Cfg.Settings, recorder, ca.knobs, targets)
311313
if err != nil {
312314
return ca.sliMetrics, err
313315
}
@@ -350,8 +352,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
350352
return
351353
}
352354

353-
feed := makeChangefeedConfigFromJobDetails(ca.spec.Feed)
354-
355+
feed, err := makeChangefeedConfigFromJobDetails(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
356+
if err != nil {
357+
if log.V(2) {
358+
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error making changefeed config: %v", err)
359+
}
360+
ca.MoveToDraining(err)
361+
ca.cancel()
362+
return
363+
}
355364
opts := feed.Opts
356365

357366
timestampOracle := &changeAggregatorLowerBoundOracle{
@@ -378,20 +387,29 @@ func (ca *changeAggregator) Start(ctx context.Context) {
378387
return
379388
}
380389
ca.sliMetricsID = ca.sliMetrics.claimId()
390+
ca.targets, err = AllTargets(ctx, ca.spec.Feed, ca.FlowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
391+
if err != nil {
392+
if log.V(2) {
393+
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error getting targets: %v", err)
394+
}
395+
ca.MoveToDraining(err)
396+
ca.cancel()
397+
return
398+
}
381399

382400
recorder := metricsRecorder(ca.sliMetrics)
383-
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder)
401+
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder, ca.targets)
402+
384403
if err != nil {
385404
if log.V(2) {
386405
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
387406
}
388407
ca.MoveToDraining(err)
389408
ca.cancel()
390-
return
391409
}
392410

393411
ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle,
394-
ca.spec.User(), ca.spec.JobID, recorder)
412+
ca.spec.User(), ca.spec.JobID, recorder, ca.targets)
395413
if err != nil {
396414
err = changefeedbase.MarkRetryableError(err)
397415
if log.V(2) {
@@ -535,7 +553,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
535553
if schemaChange.Policy == changefeedbase.OptSchemaChangePolicyIgnore || initialScanOnly {
536554
sf = schemafeed.DoNothingSchemaFeed
537555
} else {
538-
sf = schemafeed.New(ctx, cfg, schemaChange.EventClass, AllTargets(ca.spec.Feed),
556+
sf = schemafeed.New(ctx, cfg, schemaChange.EventClass, ca.targets,
539557
initialHighWater, &ca.metrics.SchemaFeedMetrics, config.Opts.GetCanHandle())
540558
}
541559

@@ -552,7 +570,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
552570
Clock: cfg.DB.KV().Clock(),
553571
Spans: spans,
554572
SpanLevelCheckpoint: ca.spec.SpanLevelCheckpoint,
555-
Targets: AllTargets(ca.spec.Feed),
573+
Targets: ca.targets,
556574
Metrics: &ca.metrics.KVFeedMetrics,
557575
MM: memMon,
558576
InitialHighWater: initialHighWater,
@@ -1073,6 +1091,8 @@ type changeFrontier struct {
10731091

10741092
usageWg sync.WaitGroup
10751093
usageWgCancel context.CancelFunc
1094+
1095+
targets changefeedbase.Targets
10761096
}
10771097

10781098
const (
@@ -1295,8 +1315,13 @@ func newChangeFrontierProcessor(
12951315
if err != nil {
12961316
return nil, err
12971317
}
1318+
targets, err := AllTargets(ctx, spec.Feed, flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig))
1319+
if err != nil {
1320+
return nil, err
1321+
}
1322+
cf.targets = targets
12981323
if cf.encoder, err = getEncoder(
1299-
ctx, encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
1324+
ctx, encodingOpts, targets, spec.Feed.Select != "",
13001325
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMetrics,
13011326
sourceProvider,
13021327
); err != nil {
@@ -1349,9 +1374,8 @@ func (cf *changeFrontier) Start(ctx context.Context) {
13491374
return
13501375
}
13511376
cf.sliMetrics = sli
1352-
13531377
cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle,
1354-
cf.spec.User(), cf.spec.JobID, sli)
1378+
cf.spec.User(), cf.spec.JobID, sli, cf.targets)
13551379
if err != nil {
13561380
err = changefeedbase.MarkRetryableError(err)
13571381
if log.V(2) {
@@ -1924,10 +1948,9 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19241948
if highWater.Less(cf.highWaterAtStart) {
19251949
highWater = cf.highWaterAtStart
19261950
}
1927-
19281951
if progress.ProtectedTimestampRecord == uuid.Nil {
19291952
ptr := createProtectedTimestampRecord(
1930-
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater,
1953+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, highWater,
19311954
)
19321955
progress.ProtectedTimestampRecord = ptr.ID.GetUUID()
19331956
return true, pts.Protect(ctx, ptr)
@@ -1954,7 +1977,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(
19541977
// If we've identified more tables that need to be protected since this
19551978
// changefeed was created, it will be missing here. If so, we "migrate" it
19561979
// to include all the appropriate targets.
1957-
if targets := AllTargets(cf.spec.Feed); !makeTargetToProtect(targets).Equal(rec.Target) {
1980+
if !makeTargetToProtect(cf.targets).Equal(rec.Target) {
19581981
if preservePTSTargets := cf.knobs.PreservePTSTargets != nil && cf.knobs.PreservePTSTargets(); preservePTSTargets {
19591982
return false, nil
19601983
}
@@ -1988,7 +2011,7 @@ func (cf *changeFrontier) remakePTSRecord(
19882011
) error {
19892012
prevRecordId := progress.ProtectedTimestampRecord
19902013
ptr := createProtectedTimestampRecord(
1991-
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), resolved,
2014+
ctx, cf.FlowCtx.Codec(), cf.spec.JobID, cf.targets, resolved,
19922015
)
19932016
if err := pts.Protect(ctx, ptr); err != nil {
19942017
return err

0 commit comments

Comments
 (0)