diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 6f5416fd2b85..29ad8db31ba4 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -12,7 +12,6 @@ changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a frac
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled) boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
-changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 1d1025aef11b..b1c02dc9f305 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -17,7 +17,6 @@
changefeed.backfill.concurrent_scan_requests
| integer | 0 | number of concurrent scan requests per node issued during a backfill | Basic/Standard/Advanced/Self-Hosted |
changefeed.backfill.scan_request_size
| integer | 524288 | the maximum number of bytes returned by each scan request | Basic/Standard/Advanced/Self-Hosted |
changefeed.batch_reduction_retry.enabled (alias: changefeed.batch_reduction_retry_enabled)
| boolean | false | if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes | Basic/Standard/Advanced/Self-Hosted |
-changefeed.default_range_distribution_strategy
| enumeration | default | configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] | Basic/Standard/Advanced/Self-Hosted |
changefeed.event_consumer_worker_queue_size
| integer | 16 | if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer | Basic/Standard/Advanced/Self-Hosted |
changefeed.event_consumer_workers
| integer | 0 | the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled | Basic/Standard/Advanced/Self-Hosted |
changefeed.fast_gzip.enabled
| boolean | true | use fast gzip implementation | Basic/Standard/Advanced/Self-Hosted |
diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go
index bf2634ba80da..f55510258790 100644
--- a/pkg/ccl/changefeedccl/changefeed_dist.go
+++ b/pkg/ccl/changefeedccl/changefeed_dist.go
@@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
- "github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/errors"
)
@@ -345,37 +344,6 @@ func startDistChangefeed(
// The bin packing choice gives preference to leaseholder replicas if possible.
var replicaOracleChoice = replicaoracle.BinPackingChoice
-type rangeDistributionType int
-
-const (
- // defaultDistribution employs no load balancing on the changefeed
- // side. We defer to distsql to select nodes and distribute work.
- defaultDistribution rangeDistributionType = 0
- // balancedSimpleDistribution defers to distsql for selecting the
- // set of nodes to distribute work to. However, changefeeds will try to
- // distribute work evenly across this set of nodes.
- balancedSimpleDistribution rangeDistributionType = 1
- // TODO(jayant): add balancedFullDistribution which takes
- // full control of node selection and distribution.
-)
-
-// RangeDistributionStrategy is used to determine how the changefeed balances
-// ranges between nodes.
-// TODO: deprecate this setting in favor of a changefeed option.
-var RangeDistributionStrategy = settings.RegisterEnumSetting(
- settings.ApplicationLevel,
- "changefeed.default_range_distribution_strategy",
- "configures how work is distributed among nodes for a given changefeed. "+
- "for the most balanced distribution, use `balanced_simple`. changing this setting "+
- "will not override locality restrictions",
- metamorphic.ConstantWithTestChoice("default_range_distribution_strategy",
- "default", "balanced_simple"),
- map[rangeDistributionType]string{
- defaultDistribution: "default",
- balancedSimpleDistribution: "balanced_simple",
- },
- settings.WithPublic)
-
var useBulkOracle = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"changefeed.random_replica_selection.enabled",
@@ -410,7 +378,10 @@ func makePlan(
}
}
- rangeDistribution := RangeDistributionStrategy.Get(sv)
+ rangeDistributionStrat, err := changefeedbase.MakeStatementOptions(details.Opts).GetRangeDistributionStrategy()
+ if err != nil {
+ return nil, nil, err
+ }
evalCtx := execCtx.ExtendedEvalContext()
oracle := replicaoracle.NewOracle(replicaOracleChoice, dsp.ReplicaOracleConfig(locFilter))
if useBulkOracle.Get(&evalCtx.Settings.SV) {
@@ -427,8 +398,8 @@ func makePlan(
log.Changefeed.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
}
switch {
- case distMode == sql.LocalDistribution || rangeDistribution == defaultDistribution:
- case rangeDistribution == balancedSimpleDistribution:
+ case distMode == sql.LocalDistribution || rangeDistributionStrat == changefeedbase.RangeDistributionStrategyDefault:
+ case rangeDistributionStrat == changefeedbase.RangeDistributionStrategyBalancedSimple:
log.Changefeed.Infof(ctx, "rebalancing ranges using balanced simple distribution")
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
@@ -442,8 +413,8 @@ func makePlan(
log.Changefeed.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
}
default:
- return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
- rangeDistribution, distMode)
+ return nil, nil, errors.AssertionFailedf("unsupported dist strategy %s and dist mode %d",
+ rangeDistributionStrat, distMode)
}
if haveKnobs && maybeCfKnobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {
diff --git a/pkg/ccl/changefeedccl/changefeed_dist_test.go b/pkg/ccl/changefeedccl/changefeed_dist_test.go
index b32d1b34e606..007c7aa0e22b 100644
--- a/pkg/ccl/changefeedccl/changefeed_dist_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_dist_test.go
@@ -500,7 +500,6 @@ func TestChangefeedWithNoDistributionStrategy(t *testing.T) {
tester := newRangeDistributionTester(t, noLocality)
defer tester.cleanup()
- serverutils.SetClusterSetting(t, tester.tc, "changefeed.default_range_distribution_strategy", "default")
serverutils.SetClusterSetting(t, tester.tc, "changefeed.random_replica_selection.enabled", false)
tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
partitions := tester.getPartitions()
@@ -527,10 +526,9 @@ func TestChangefeedWithSimpleDistributionStrategy(t *testing.T) {
// Check that we roughly assign (64 ranges / 6 nodes) ranges to each node.
tester := newRangeDistributionTester(t, noLocality)
defer tester.cleanup()
- tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
// We need to disable the bulk oracle in order to ensure the leaseholder is selected.
tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.random_replica_selection.enabled = false")
- tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no'")
+ tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', range_distribution_strategy='balanced_simple'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)
upper := int(math.Ceil((1 + rebalanceThreshold.Get(&tester.lastNode.ClusterSettings().SV)) * 64 / 6))
@@ -548,30 +546,56 @@ func TestChangefeedWithNoDistributionStrategyAndConstrainedLocality(t *testing.T
skip.UnderShort(t)
skip.UnderDuress(t)
- // The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
- // to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
- // to these nodes are distributed arbitrarily to any nodes which pass the filter.
- tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
- if i%2 == 1 {
- return []roachpb.Tier{{Key: "y", Value: "1"}}
+ t.Run("default specified", func(t *testing.T) {
+ // The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
+ // to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
+ // to these nodes are distributed arbitrarily to any nodes which pass the filter.
+ tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
+ if i%2 == 1 {
+ return []roachpb.Tier{{Key: "y", Value: "1"}}
+ }
+ return []roachpb.Tier{}
+ })
+ defer tester.cleanup()
+ tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='default'")
+ partitions := tester.getPartitions()
+ counts := tester.countRangesPerNode(partitions)
+
+ totalRanges := 0
+ for i, count := range counts {
+ if i%2 == 1 {
+ totalRanges += count
+ } else {
+ require.Equal(t, count, 0)
+ }
}
- return []roachpb.Tier{}
+ require.Equal(t, totalRanges, 64)
})
- defer tester.cleanup()
- tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'default'")
- tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
- partitions := tester.getPartitions()
- counts := tester.countRangesPerNode(partitions)
-
- totalRanges := 0
- for i, count := range counts {
- if i%2 == 1 {
- totalRanges += count
- } else {
- require.Equal(t, count, 0)
+ t.Run("no distribution strategy specified", func(t *testing.T) {
+ // The replica oracle selects the leaseholder replica for each range. Then, distsql assigns the replica
+ // to the same node which stores it. However, node of these nodes don't pass the filter. The replicas assigned
+ // to these nodes are distributed arbitrarily to any nodes which pass the filter.
+ tester := newRangeDistributionTester(t, func(i int) []roachpb.Tier {
+ if i%2 == 1 {
+ return []roachpb.Tier{{Key: "y", Value: "1"}}
+ }
+ return []roachpb.Tier{}
+ })
+ defer tester.cleanup()
+ tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
+ partitions := tester.getPartitions()
+ counts := tester.countRangesPerNode(partitions)
+
+ totalRanges := 0
+ for i, count := range counts {
+ if i%2 == 1 {
+ totalRanges += count
+ } else {
+ require.Equal(t, count, 0)
+ }
}
- }
- require.Equal(t, totalRanges, 64)
+ require.Equal(t, totalRanges, 64)
+ })
}
func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testing.T) {
@@ -593,8 +617,7 @@ func TestChangefeedWithSimpleDistributionStrategyAndConstrainedLocality(t *testi
return []roachpb.Tier{}
})
defer tester.cleanup()
- tester.sqlDB.Exec(t, "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = 'balanced_simple'")
- tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1'")
+ tester.sqlDB.Exec(t, "CREATE CHANGEFEED FOR x INTO 'null://' WITH initial_scan='no', execution_locality='y=1', range_distribution_strategy='balanced_simple'")
partitions := tester.getPartitions()
counts := tester.countRangesPerNode(partitions)
diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go
index 325b7b4db732..abac9c12d205 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/options.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/options.go
@@ -75,6 +75,19 @@ const (
EnrichedPropertySchema EnrichedProperty = `schema`
)
+// RangeDistributionStrategy configures how the changefeed balances ranges between nodes.
+type RangeDistributionStrategy string
+
+const (
+ // RangeDistributionStrategyDefault employs no load balancing on the changefeed
+ // side. We defer to distsql to select nodes and distribute work.
+ RangeDistributionStrategyDefault RangeDistributionStrategy = `default`
+ // RangeDistributionStrategyBalancedSimple defers to distsql for selecting the
+ // set of nodes to distribute work to. However, changefeeds will try to
+ // distribute work evenly across this set of nodes.
+ RangeDistributionStrategyBalancedSimple RangeDistributionStrategy = `balanced_simple`
+)
+
// Constants for the initial scan types
const (
InitialScan InitialScanType = iota
@@ -162,6 +175,8 @@ const (
OptEnrichedProperties = `enriched_properties`
+ OptRangeDistributionStrategy = `range_distribution_strategy`
+
OptEnvelopeKeyOnly EnvelopeType = `key_only`
OptEnvelopeRow EnvelopeType = `row`
OptEnvelopeDeprecatedRow EnvelopeType = `deprecated_row`
@@ -412,6 +427,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptIgnoreDisableChangefeedReplication: flagOption,
OptEncodeJSONValueNullAsObject: flagOption,
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
+ OptRangeDistributionStrategy: enum("default", "balanced_simple"),
OptHeadersJSONColumnName: stringOption,
OptExtraHeaders: jsonOption,
}
@@ -428,6 +444,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptExpirePTSAfter,
OptExecutionLocality, OptLaggingRangesThreshold, OptLaggingRangesPollingInterval,
OptIgnoreDisableChangefeedReplication, OptEncodeJSONValueNullAsObject, OptEnrichedProperties,
+ OptRangeDistributionStrategy,
)
// SQLValidOptions is options exclusive to SQL sink
@@ -805,6 +822,17 @@ func (s StatementOptions) IsInitialScanSpecified() bool {
return true
}
+func (s StatementOptions) GetRangeDistributionStrategy() (RangeDistributionStrategy, error) {
+ v, err := s.getEnumValue(OptRangeDistributionStrategy)
+ if err != nil {
+ return "", err
+ }
+ if v == `` {
+ return RangeDistributionStrategyDefault, nil
+ }
+ return RangeDistributionStrategy(v), nil
+}
+
// ShouldUseFullStatementTimeName returns true if references to the table should be in db.schema.table
// format (e.g. in Kafka topics).
func (s StatementOptions) ShouldUseFullStatementTimeName() bool {
diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go
index af34526cd700..5daa6a207344 100644
--- a/pkg/cmd/roachtest/tests/cdc.go
+++ b/pkg/cmd/roachtest/tests/cdc.go
@@ -490,32 +490,11 @@ func (f *featureFlag) enabled(r entropy) featureState {
return featureDisabled
}
-type enumFeatureFlag struct {
- state string
- v *featureState
-}
-
-// enabled returns a valid string if the returned featureState is featureEnabled.
-func (f *enumFeatureFlag) enabled(r entropy, choose func(entropy) string) (string, featureState) {
- if f.v != nil {
- return f.state, *f.v
- }
-
- if r.Bool() {
- f.v = &featureEnabled
- f.state = choose(r)
- return f.state, featureEnabled
- }
- f.v = &featureDisabled
- return f.state, featureDisabled
-}
-
// cdcFeatureFlags describes various cdc feature flags.
// zero value cdcFeatureFlags uses metamorphic settings for features.
type cdcFeatureFlags struct {
- RangeFeedScheduler featureFlag
- SchemaLockTables featureFlag
- DistributionStrategy enumFeatureFlag
+ RangeFeedScheduler featureFlag
+ SchemaLockTables featureFlag
}
func makeDefaultFeatureFlags() cdcFeatureFlags {
@@ -4425,11 +4404,6 @@ func (cfc *changefeedCreator) Args(args ...interface{}) *changefeedCreator {
return cfc
}
-func chooseDistributionStrategy(r entropy) string {
- vals := changefeedccl.RangeDistributionStrategy.GetAvailableValues()
- return vals[r.Intn(len(vals))]
-}
-
// applySettings aplies various settings to the cluster -- once per the
// lifetime of changefeedCreator
func (cfc *changefeedCreator) applySettings() error {
@@ -4451,16 +4425,6 @@ func (cfc *changefeedCreator) applySettings() error {
}
}
- rangeDistribution, rangeDistributionEnabled := cfc.flags.DistributionStrategy.enabled(cfc.rng,
- chooseDistributionStrategy)
- if rangeDistributionEnabled == featureEnabled {
- cfc.logger.Printf("Setting changefeed.default_range_distribution_strategy to %s", rangeDistribution)
- if _, err := cfc.db.Exec(fmt.Sprintf(
- "SET CLUSTER SETTING changefeed.default_range_distribution_strategy = '%s'", rangeDistribution)); err != nil {
- return err
- }
- }
-
return nil
}
diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go
index 6de0e0c9bff3..a0777ce8a76b 100644
--- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go
+++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go
@@ -410,14 +410,6 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
ff.RangeFeedScheduler.v = &featureUnset
}
- distributionStrategySupported, err := cmvt.distributionStrategySupported(r, h)
- if err != nil {
- return err
- }
- if !distributionStrategySupported {
- ff.DistributionStrategy.v = &featureUnset
- }
-
jobID, err := newChangefeedCreator(db, systemDB, l, r, fmt.Sprintf("%s.%s", targetDB, targetTable),
cmvt.kafka.manager.sinkURL(ctx), ff).
With(options).
@@ -476,7 +468,6 @@ func (cmvt *cdcMixedVersionTester) muxRangeFeedSupported(
}
const v232CV = "23.2"
-const v241CV = "24.1"
func (cmvt *cdcMixedVersionTester) rangefeedSchedulerSupported(
r *rand.Rand, h *mixedversion.Helper,
@@ -485,12 +476,6 @@ func (cmvt *cdcMixedVersionTester) rangefeedSchedulerSupported(
return h.ClusterVersionAtLeast(r, v232CV)
}
-func (cmvt *cdcMixedVersionTester) distributionStrategySupported(
- r *rand.Rand, h *mixedversion.Helper,
-) (bool, error) {
- return h.ClusterVersionAtLeast(r, v241CV)
-}
-
// canMixedVersionUseDeletedClusterSetting returns whether a
// mixed-version cluster can use a deleted (system) cluster
// setting. If it returns true, it will also return the subset of