From 02f6a1859eb895042587d6a715170098fed37156 Mon Sep 17 00:00:00 2001 From: Ravi Date: Thu, 13 Nov 2025 12:50:56 +0530 Subject: [PATCH 1/2] feat: configure-decimal-precision/scal for unbounded numeric --- flow/cmd/validate_mirror.go | 40 ++++ flow/connectors/bigquery/qrep_avro_sync.go | 2 +- flow/connectors/clickhouse/avro_sync.go | 14 ++ flow/connectors/clickhouse/cdc.go | 10 + flow/connectors/clickhouse/normalize.go | 20 ++ flow/connectors/clickhouse/normalize_query.go | 34 ++-- flow/connectors/clickhouse/normalize_test.go | 6 + .../snowflake/merge_stmt_generator.go | 2 +- flow/connectors/snowflake/snowflake.go | 4 +- flow/connectors/utils/stream.go | 4 +- flow/e2e/clickhouse_test.go | 70 +++++++ flow/internal/dynamicconf.go | 12 ++ flow/model/conversion_avro.go | 11 + flow/model/qvalue/avro_converter.go | 20 +- flow/model/qvalue/dwh.go | 10 +- flow/model/qvalue/kind.go | 14 +- flow/model/qvalue/kind_test.go | 189 ++++++++++++++++++ nexus/flow-rs/src/grpc.rs | 2 + protos/flow.proto | 12 ++ ui/app/mirrors/create/helpers/cdc.ts | 45 +++++ ui/app/mirrors/create/helpers/common.ts | 2 + 21 files changed, 495 insertions(+), 28 deletions(-) create mode 100644 flow/model/qvalue/kind_test.go diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 8e612ba543..913f14fb44 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -65,6 +65,46 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl( errors.New("invalid config: initial_snapshot_only is true but do_initial_snapshot is false")) } + // Validate ClickHouse numeric settings if provided + var chDefPrecision, chDefScale int32 + if connectionConfigs.ClickhouseNumericDefaultPrecision != nil { + chDefPrecision = *connectionConfigs.ClickhouseNumericDefaultPrecision + } + if connectionConfigs.ClickhouseNumericDefaultScale != nil { + chDefScale = *connectionConfigs.ClickhouseNumericDefaultScale + } + if chDefPrecision > 0 || chDefScale > 0 { + // Load destination peer to check if it's ClickHouse + dstPeer, err := connectors.LoadPeer(ctx, h.pool, connectionConfigs.DestinationName) + if err != nil { + return nil, NewFailedPreconditionApiError(fmt.Errorf("failed to load destination peer: %w", err)) + } + + // Only validate ClickHouse numeric settings for ClickHouse destinations + if dstPeer.Type == protos.DBType_CLICKHOUSE { + precision := chDefPrecision + scale := chDefScale + + // If scale is provided but precision is not, reject early with a clear error + if precision == 0 && scale > 0 { + return nil, NewInvalidArgumentApiError( + errors.New("clickhouse numeric precision must be set when providing a custom scale")) + } + + // Validate precision: 1-76 + if precision < 1 || precision > 76 { + return nil, NewInvalidArgumentApiError( + fmt.Errorf("clickhouse numeric precision must be between 1 and 76, got %d", precision)) + } + + // Validate scale: 0 to precision + if scale < 0 || scale > precision { + return nil, NewInvalidArgumentApiError( + fmt.Errorf("clickhouse numeric scale must be between 0 and %d (precision), got %d", precision, scale)) + } + } + } + for _, tm := range connectionConfigs.TableMappings { for _, col := range tm.Columns { if !CustomColumnTypeRegex.MatchString(col.DestinationType) { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index e2f7ff7ca5..13aaf5e406 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -252,7 +252,7 @@ func DefineAvroSchema(dstTableName string, func GetAvroType(bqField *bigquery.FieldSchema) (avro.Schema, error) { avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH( - int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY) + int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY, 0, 0) considerRepeated := func(typ avro.Type, repeated bool) avro.Schema { if repeated { diff --git a/flow/connectors/clickhouse/avro_sync.go b/flow/connectors/clickhouse/avro_sync.go index ce7c0b9b97..3bc4fe4d1a 100644 --- a/flow/connectors/clickhouse/avro_sync.go +++ b/flow/connectors/clickhouse/avro_sync.go @@ -366,6 +366,20 @@ func (s *ClickHouseAvroSyncMethod) getAvroSchema( schema types.QRecordSchema, avroNameMap map[string]string, ) (*model.QRecordAvroSchemaDefinition, error) { + // Inject ClickHouse numeric default overrides from dynamic env (if set) for Avro schema generation + if s.ClickHouseConnector != nil { + copied := make(map[string]string, len(env)+2) + for k, v := range env { + copied[k] = v + } + if p, err := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, env); err == nil && p > 0 { + copied["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_PRECISION"] = fmt.Sprintf("%d", p) + } + if sc, err := internal.PeerDBClickHouseNumericDefaultScale(ctx, env); err == nil { + copied["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_SCALE"] = fmt.Sprintf("%d", sc) + } + env = copied + } avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_CLICKHOUSE, avroNameMap) if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 9fcd42ee30..26ab8420d3 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -170,6 +170,15 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas( return nil } + // Fetch optional ClickHouse numeric default overrides from env + var chDefPrecision, chDefScale int32 + if p, err := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, env); err == nil { + chDefPrecision = p + } + if s, err := internal.PeerDBClickHouseNumericDefaultScale(ctx, env); err == nil { + chDefScale = s + } + onCluster := c.onCluster() for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { @@ -189,6 +198,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas( qvKind := types.QValueKind(addedColumn.Type) clickHouseColType, err := qvalue.ToDWHColumnType( ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, + chDefPrecision, chDefScale, ) if err != nil { return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 59df573aa9..65f64e3726 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -202,8 +202,17 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { var err error + // Fetch optional ClickHouse numeric default overrides from env + var chDefPrecision, chDefScale int32 + if p, errP := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, config.Env); errP == nil { + chDefPrecision = p + } + if s, errS := internal.PeerDBClickHouseNumericDefaultScale(ctx, config.Env); errS == nil { + chDefScale = s + } clickHouseType, err = qvalue.ToDWHColumnType( ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled, + chDefPrecision, chDefScale, ) if err != nil { return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err) @@ -560,6 +569,15 @@ func (c *ClickHouseConnector) NormalizeRecords( continue } + // Fetch optional ClickHouse numeric default overrides from env for query generation + var chDefPrecision, chDefScale int32 + if p, errP := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, req.Env); errP == nil { + chDefPrecision = p + } + if s, errS := internal.PeerDBClickHouseNumericDefaultScale(ctx, req.Env); errS == nil { + chDefScale = s + } + queryGenerator := NewNormalizeQueryGenerator( tbl, req.TableNameSchemaMapping, @@ -574,6 +592,8 @@ func (c *ClickHouseConnector) NormalizeRecords( c.Config.Cluster != "", req.SoftDeleteColName, req.Version, + chDefPrecision, + chDefScale, ) query, err := queryGenerator.BuildQuery(ctx) if err != nil { diff --git a/flow/connectors/clickhouse/normalize_query.go b/flow/connectors/clickhouse/normalize_query.go index 44f95dafec..a0607cae79 100644 --- a/flow/connectors/clickhouse/normalize_query.go +++ b/flow/connectors/clickhouse/normalize_query.go @@ -30,6 +30,9 @@ type NormalizeQueryGenerator struct { sourceSchemaAsDestinationColumn bool cluster bool version uint32 + // Override defaults for unbounded Postgres NUMERIC -> ClickHouse Decimal + clickhouseNumericDefaultPrecision int32 + clickhouseNumericDefaultScale int32 } // NewTableNormalizeQuery constructs a TableNormalizeQuery with required fields. @@ -47,25 +50,29 @@ func NewNormalizeQueryGenerator( cluster bool, configuredSoftDeleteColName string, version uint32, + clickhouseNumericDefaultPrecision int32, + clickhouseNumericDefaultScale int32, ) *NormalizeQueryGenerator { isDeletedColumn := isDeletedColName if configuredSoftDeleteColName != "" { isDeletedColumn = configuredSoftDeleteColName } return &NormalizeQueryGenerator{ - TableName: tableName, - tableNameSchemaMapping: tableNameSchemaMapping, - tableMappings: tableMappings, - endBatchID: endBatchID, - lastNormBatchID: lastNormBatchID, - enablePrimaryUpdate: enablePrimaryUpdate, - sourceSchemaAsDestinationColumn: sourceSchemaAsDestinationColumn, - env: env, - rawTableName: rawTableName, - chVersion: chVersion, - cluster: cluster, - isDeletedColName: isDeletedColumn, - version: version, + TableName: tableName, + tableNameSchemaMapping: tableNameSchemaMapping, + tableMappings: tableMappings, + endBatchID: endBatchID, + lastNormBatchID: lastNormBatchID, + enablePrimaryUpdate: enablePrimaryUpdate, + sourceSchemaAsDestinationColumn: sourceSchemaAsDestinationColumn, + env: env, + rawTableName: rawTableName, + chVersion: chVersion, + cluster: cluster, + isDeletedColName: isDeletedColumn, + version: version, + clickhouseNumericDefaultPrecision: clickhouseNumericDefaultPrecision, + clickhouseNumericDefaultScale: clickhouseNumericDefaultScale, } } @@ -124,6 +131,7 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error var err error clickHouseType, err = qvalue.ToDWHColumnType( ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled, + t.clickhouseNumericDefaultPrecision, t.clickhouseNumericDefaultScale, ) if err != nil { return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err) diff --git a/flow/connectors/clickhouse/normalize_test.go b/flow/connectors/clickhouse/normalize_test.go index 326b55e07a..9fe1263ffb 100644 --- a/flow/connectors/clickhouse/normalize_test.go +++ b/flow/connectors/clickhouse/normalize_test.go @@ -258,6 +258,8 @@ func TestBuildQuery_Basic(t *testing.T) { false, "", shared.InternalVersion_Latest, + 0, + 0, ) query, err := g.BuildQuery(ctx) @@ -312,6 +314,8 @@ func TestBuildQuery_WithPrimaryUpdate(t *testing.T) { false, "", shared.InternalVersion_Latest, + 0, + 0, ) query, err := g.BuildQuery(ctx) @@ -363,6 +367,8 @@ func TestBuildQuery_WithSourceSchemaAsDestinationColumn(t *testing.T) { true, "", shared.InternalVersion_Latest, + 0, + 0, ) query, err := g.BuildQuery(ctx) diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 486e9a3e45..e379afd306 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -36,7 +36,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(ctx context.Context, env map[stri for _, column := range columns { genericColumnType := column.Type qvKind := types.QValueKind(genericColumnType) - sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled) + sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled, 0, 0) if err != nil { return "", fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f7ebf975df..41acae0b7d 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -365,7 +365,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { qvKind := types.QValueKind(addedColumn.Type) sfColtype, err := qvalue.ToDWHColumnType( - ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, + ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, 0, 0, ) if err != nil { return fmt.Errorf("failed to convert column type %s to snowflake type: %w", @@ -663,7 +663,7 @@ func generateCreateTableSQLForNormalizedTable( normalizedColName := SnowflakeIdentifierNormalize(column.Name) qvKind := types.QValueKind(genericColumnType) sfColType, err := qvalue.ToDWHColumnType( - ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, + ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, 0, 0, ) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType), diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index d4b230ab83..0ddce8b391 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -185,7 +185,7 @@ func truncateNumerics( switch numeric := val.(type) { case types.QValueNumeric: destType := qvalue.GetNumericDestinationType( - numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString, + numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString, 0, 0, ) if destType.IsString { newVal = val @@ -204,7 +204,7 @@ func truncateNumerics( } case types.QValueArrayNumeric: destType := qvalue.GetNumericDestinationType( - numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString, + numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString, 0, 0, ) if destType.IsString { newVal = val diff --git a/flow/e2e/clickhouse_test.go b/flow/e2e/clickhouse_test.go index 08a3241623..21094e51bb 100644 --- a/flow/e2e/clickhouse_test.go +++ b/flow/e2e/clickhouse_test.go @@ -203,6 +203,76 @@ func (s ClickHouseSuite) Test_Addition_Removal() { RequireEnvCanceled(s.t, env) } +// Verifies that unbounded Postgres NUMERIC maps to ClickHouse Decimal(precision, scale) +// when overrides are provided via env dynamic config. +func (s ClickHouseSuite) Test_ClickHouse_Numeric_Overrides_Unbounded() { + tc := NewTemporalClient(s.t) + + srcTable := s.attachSchemaSuffix("test_numeric_override_src") + dstTable := "test_numeric_override_dst" + + // Create source table with unbounded NUMERIC (no typmod) and not null to avoid Nullable in CH + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + val NUMERIC NOT NULL + ); + `, srcTable))) + + // Insert a couple of rows + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + INSERT INTO %s (val) VALUES (123.456), (789.01234); + `, srcTable))) + + // Generate flow with ClickHouse destination and set numeric overrides via env + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("numeric_override_flow"), + TableNameMapping: map[string]string{srcTable: dstTable}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + if flowConnConfig.Env == nil { + flowConnConfig.Env = map[string]string{} + } + flowConnConfig.Env["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_PRECISION"] = "60" + flowConnConfig.Env["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_SCALE"] = "10" + + env := ExecutePeerflow(s.t, tc, flowConnConfig) + + // Connect to ClickHouse and verify the destination column type + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + expectedType := "Decimal(60, 10)" + dbName := s.Peer().GetClickhouseConfig().Database + + // Wait until the table is created and type is materialized to expected Decimal + EnvWaitFor(s.t, env, 3*time.Minute, "wait for ClickHouse table with overridden numeric type", func() bool { + var colType string + // system.columns stores type without Nullable() wrapper in separate nullable flag for newer versions, + // but to be safe, we accept exact Decimal(60, 10) or Nullable(Decimal(60, 10)). + // We query the exact type string ClickHouse exposes. + rows, qerr := ch.Query(s.t.Context(), + `SELECT type FROM system.columns WHERE database = ? AND table = ? AND name = 'val' LIMIT 1`, dbName, dstTable) + if qerr != nil { + return false + } + defer rows.Close() + if !rows.Next() { + return false + } + if scanErr := rows.Scan(&colType); scanErr != nil { + return false + } + return colType == expectedType || colType == ("Nullable("+expectedType+")") + }) + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + func (s ClickHouseSuite) Test_NullableMirrorSetting() { srcTableName := "test_nullable_mirror" srcFullName := s.attachSchemaSuffix(srcTableName) diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index d97abd0f43..c7d0d50ea8 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -617,6 +617,18 @@ func PeerDBEnableClickHouseJSON(ctx context.Context, env map[string]string) (boo return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_JSON") } +// Mirror-scoped override for unbounded NUMERIC -> ClickHouse Decimal default precision. +// Prefer value from env map when present; falls back to dynamic settings if configured. +func PeerDBClickHouseNumericDefaultPrecision(ctx context.Context, env map[string]string) (int32, error) { + return dynamicConfSigned[int32](ctx, env, "PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_PRECISION") +} + +// Mirror-scoped override for unbounded NUMERIC -> ClickHouse Decimal default scale. +// Prefer value from env map when present; falls back to dynamic settings if configured. +func PeerDBClickHouseNumericDefaultScale(ctx context.Context, env map[string]string) (int32, error) { + return dynamicConfSigned[int32](ctx, env, "PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_SCALE") +} + func PeerDBSnowflakeMergeParallelism(ctx context.Context, env map[string]string) (int64, error) { return dynamicConfSigned[int64](ctx, env, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") } diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index dc55c0364c..5b3de7b61c 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -56,6 +56,15 @@ func (qac *QRecordAvroConverter) Convert( numericTruncator SnapshotTableNumericTruncator, format internal.BinaryFormat, ) (map[string]any, error) { + var chDefaultPrecision, chDefaultScale int32 + if qac.TargetDWH == protos.DBType_CLICKHOUSE { + if p, err := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, env); err == nil { + chDefaultPrecision = p + } + if s, err := internal.PeerDBClickHouseNumericDefaultScale(ctx, env); err == nil { + chDefaultScale = s + } + } m := make(map[string]any, len(qrecord)) for idx, val := range qrecord { if typeConversion, ok := typeConversions[qac.Schema.Fields[idx].Name]; ok { @@ -66,6 +75,7 @@ func (qac *QRecordAvroConverter) Convert( &qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, qac.UnboundedNumericAsString, numericTruncator.Get(idx), format, + chDefaultPrecision, chDefaultScale, ) if err != nil { return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err) @@ -93,6 +103,7 @@ type QRecordAvroSchemaDefinition struct { Fields []types.QField } +// Extended: numeric overrides can be threaded later if needed; currently schema-level override happens at conversion time. func GetAvroSchemaDefinition( ctx context.Context, env map[string]string, diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 2851d0a7bd..9c3980097d 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -165,7 +165,16 @@ func getAvroNumericSchema( if err != nil { return nil, err } - destinationType := GetNumericDestinationType(precision, scale, targetDWH, asString) + var chDefaultPrecision, chDefaultScale int32 + if targetDWH == protos.DBType_CLICKHOUSE { + if p, err := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, env); err == nil { + chDefaultPrecision = p + } + if s, err := internal.PeerDBClickHouseNumericDefaultScale(ctx, env); err == nil { + chDefaultScale = s + } + } + destinationType := GetNumericDestinationType(precision, scale, targetDWH, asString, chDefaultPrecision, chDefaultScale) if destinationType.IsString { return avro.NewPrimitiveSchema(avro.String, nil), nil } @@ -180,6 +189,8 @@ type QValueAvroConverter struct { TargetDWH protos.DBType UnboundedNumericAsString bool binaryFormat internal.BinaryFormat + chDefaultPrecision int32 + chDefaultScale int32 } func QValueToAvro( @@ -187,6 +198,7 @@ func QValueToAvro( value types.QValue, field *types.QField, targetDWH protos.DBType, logger log.Logger, unboundedNumericAsString bool, stat *NumericStat, binaryFormat internal.BinaryFormat, + chDefaultPrecision, chDefaultScale int32, ) (any, error) { if value.Value() == nil { return nil, nil @@ -199,6 +211,8 @@ func QValueToAvro( TargetDWH: targetDWH, UnboundedNumericAsString: unboundedNumericAsString, binaryFormat: binaryFormat, + chDefaultPrecision: chDefaultPrecision, + chDefaultScale: chDefaultScale, } switch v := value.(type) { @@ -361,7 +375,7 @@ func (c *QValueAvroConverter) processNullableUnion( } func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any { - destType := GetNumericDestinationType(c.Precision, c.Scale, c.TargetDWH, c.UnboundedNumericAsString) + destType := GetNumericDestinationType(c.Precision, c.Scale, c.TargetDWH, c.UnboundedNumericAsString, c.chDefaultPrecision, c.chDefaultScale) if destType.IsString { return c.processNullableUnion(num.String()) } @@ -424,7 +438,7 @@ func (c *QValueAvroConverter) processUInt256(num *big.Int) any { } func (c *QValueAvroConverter) processArrayNumeric(arrayNum []decimal.Decimal) any { - destType := GetNumericDestinationType(c.Precision, c.Scale, c.TargetDWH, c.UnboundedNumericAsString) + destType := GetNumericDestinationType(c.Precision, c.Scale, c.TargetDWH, c.UnboundedNumericAsString, c.chDefaultPrecision, c.chDefaultScale) if destType.IsString { transformedNumArr := make([]string, 0, len(arrayNum)) for _, num := range arrayNum { diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index 3aa6a9f066..e06c79068b 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -7,10 +7,18 @@ import ( "github.com/PeerDB-io/peerdb/flow/shared/datatypes" ) -func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBType) (int16, int16) { +func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBType, chDefaultPrecision, chDefaultScale int32) (int16, int16) { var warehouseNumeric datatypes.WarehouseNumericCompatibility switch dwh { case protos.DBType_CLICKHOUSE: + // If user provided overrides and typmod is unbounded (precision and scale are 0), use the overrides + if precision == 0 && scale == 0 && (chDefaultPrecision > 0 || chDefaultScale >= 0) { + // Convert from int32 to int16 for comparison and return + // If chDefaultPrecision was set but chDefaultScale wasn't, still use the defaults as fallback + if chDefaultPrecision > 0 { + return int16(chDefaultPrecision), int16(chDefaultScale) + } + } warehouseNumeric = datatypes.ClickHouseNumericCompatibility{} case protos.DBType_SNOWFLAKE: warehouseNumeric = datatypes.SnowflakeNumericCompatibility{} diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 814694de7f..adeed19873 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -19,6 +19,7 @@ type NumericDestinationType struct { func GetNumericDestinationType( precision, scale int16, targetDWH protos.DBType, unboundedNumericAsString bool, + chDefaultPrecision, chDefaultScale int32, ) NumericDestinationType { if targetDWH == protos.DBType_CLICKHOUSE { if precision == 0 && scale == 0 && unboundedNumericAsString { @@ -28,7 +29,8 @@ func GetNumericDestinationType( return NumericDestinationType{IsString: true} } } - destPrecision, destScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) + // For ClickHouse, apply overrides only when precision is unbounded (0,0) and user provided a precision > 0. + destPrecision, destScale := DetermineNumericSettingForDWH(precision, scale, targetDWH, chDefaultPrecision, chDefaultScale) return NumericDestinationType{ IsString: false, Precision: destPrecision, @@ -36,19 +38,20 @@ func GetNumericDestinationType( } } -func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, typeModifier int32) (string, error) { +func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]string, typeModifier int32, chDefaultPrecision, chDefaultScale int32) (string, error) { precision, scale := datatypes.ParseNumericTypmod(typeModifier) asString, err := internal.PeerDBEnableClickHouseNumericAsString(ctx, env) if err != nil { return "", err } - destinationType := GetNumericDestinationType(precision, scale, protos.DBType_CLICKHOUSE, asString) + destinationType := GetNumericDestinationType(precision, scale, protos.DBType_CLICKHOUSE, asString, chDefaultPrecision, chDefaultScale) if destinationType.IsString { return "String", nil } return fmt.Sprintf("Decimal(%d, %d)", destinationType.Precision, destinationType.Scale), nil } +// Extended to accept optional ClickHouse default precision/scale overrides. func ToDWHColumnType( ctx context.Context, kind types.QValueKind, @@ -57,6 +60,7 @@ func ToDWHColumnType( dwhVersion *chproto.Version, column *protos.FieldDescription, nullableEnabled bool, + chDefaultPrecision, chDefaultScale int32, ) (string, error) { var colType string switch dwhType { @@ -75,13 +79,13 @@ func ToDWHColumnType( case protos.DBType_CLICKHOUSE: if kind == types.QValueKindNumeric { var err error - colType, err = getClickHouseTypeForNumericColumn(ctx, env, column.TypeModifier) + colType, err = getClickHouseTypeForNumericColumn(ctx, env, column.TypeModifier, chDefaultPrecision, chDefaultScale) if err != nil { return "", err } } else if kind == types.QValueKindArrayNumeric { var err error - colType, err = getClickHouseTypeForNumericColumn(ctx, env, column.TypeModifier) + colType, err = getClickHouseTypeForNumericColumn(ctx, env, column.TypeModifier, chDefaultPrecision, chDefaultScale) if err != nil { return "", err } diff --git a/flow/model/qvalue/kind_test.go b/flow/model/qvalue/kind_test.go new file mode 100644 index 0000000000..251999e4af --- /dev/null +++ b/flow/model/qvalue/kind_test.go @@ -0,0 +1,189 @@ +package qvalue + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +func TestGetNumericDestinationTypeWithClickHouseOverrides(t *testing.T) { + tests := []struct { + name string + precision int16 + scale int16 + chDefaultPrecision int32 + chDefaultScale int32 + expectedPrecision int16 + expectedScale int16 + expectedIsString bool + }{ + { + name: "unbounded numeric with override", + precision: 0, + scale: 0, + chDefaultPrecision: 60, + chDefaultScale: 10, + expectedPrecision: 60, + expectedScale: 10, + expectedIsString: false, + }, + { + name: "unbounded numeric without override", + precision: 0, + scale: 0, + chDefaultPrecision: 0, + chDefaultScale: 0, + expectedPrecision: 76, + expectedScale: 38, + expectedIsString: false, + }, + { + name: "bounded numeric ignores override", + precision: 20, + scale: 5, + chDefaultPrecision: 60, + chDefaultScale: 10, + expectedPrecision: 20, + expectedScale: 5, + expectedIsString: false, + }, + { + name: "unbounded numeric with high precision override", + precision: 0, + scale: 0, + chDefaultPrecision: 76, + chDefaultScale: 0, + expectedPrecision: 76, + expectedScale: 0, + expectedIsString: false, + }, + { + name: "unbounded numeric with valid custom scale", + precision: 0, + scale: 0, + chDefaultPrecision: 30, + chDefaultScale: 10, + expectedPrecision: 30, + expectedScale: 10, + expectedIsString: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetNumericDestinationType( + tt.precision, + tt.scale, + protos.DBType_CLICKHOUSE, + false, // unboundedNumericAsString + tt.chDefaultPrecision, + tt.chDefaultScale, + ) + + require.Equal(t, tt.expectedIsString, result.IsString) + if !tt.expectedIsString { + require.Equal(t, tt.expectedPrecision, result.Precision) + require.Equal(t, tt.expectedScale, result.Scale) + } + }) + } +} + +func TestDetermineNumericSettingForDWHClickHouse(t *testing.T) { + tests := []struct { + name string + precision int16 + scale int16 + chDefaultPrecision int32 + chDefaultScale int32 + expectedPrecision int16 + expectedScale int16 + }{ + { + name: "unbounded with override - uses override", + precision: 0, + scale: 0, + chDefaultPrecision: 50, + chDefaultScale: 15, + expectedPrecision: 50, + expectedScale: 15, + }, + { + name: "unbounded without override - uses defaults", + precision: 0, + scale: 0, + chDefaultPrecision: 0, + chDefaultScale: 0, + expectedPrecision: 76, + expectedScale: 38, + }, + { + name: "bounded numeric ignores override", + precision: 18, + scale: 2, + chDefaultPrecision: 60, + chDefaultScale: 10, + expectedPrecision: 18, + expectedScale: 2, + }, + { + name: "extreme override values", + precision: 0, + scale: 0, + chDefaultPrecision: 76, + chDefaultScale: 76, + expectedPrecision: 76, + expectedScale: 76, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, s := DetermineNumericSettingForDWH( + tt.precision, + tt.scale, + protos.DBType_CLICKHOUSE, + tt.chDefaultPrecision, + tt.chDefaultScale, + ) + + require.Equal(t, tt.expectedPrecision, p, "precision mismatch") + require.Equal(t, tt.expectedScale, s, "scale mismatch") + }) + } +} + +func TestDetermineNumericSettingForDWHNonClickHouse(t *testing.T) { + // Non-ClickHouse destinations should ignore the override parameters + tests := []struct { + name string + dwh protos.DBType + expectedPrecision int16 + expectedScale int16 + }{ + { + name: "Snowflake with override", + dwh: protos.DBType_SNOWFLAKE, + expectedPrecision: 38, + expectedScale: 20, + }, + { + name: "BigQuery with override", + dwh: protos.DBType_BIGQUERY, + expectedPrecision: 38, + expectedScale: 20, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Pass override values but they should be ignored for non-ClickHouse + p, s := DetermineNumericSettingForDWH(0, 0, tt.dwh, 60, 10) + + require.Equal(t, tt.expectedPrecision, p) + require.Equal(t, tt.expectedScale, s) + }) + } +} diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index ac611c55ba..911d8bcf5e 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -147,6 +147,8 @@ impl FlowGrpcClient { idle_timeout_seconds: job.sync_interval.unwrap_or_default(), env: Default::default(), version: 0, // filled in by server + clickhouse_numeric_default_precision: None, + clickhouse_numeric_default_scale: None, }; if job.disable_peerdb_columns { diff --git a/protos/flow.proto b/protos/flow.proto index ee94edfadb..bdc00f18fe 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -91,6 +91,12 @@ message FlowConnectionConfigs { map env = 24; uint32 version = 25; + + // ClickHouse-specific numeric settings for unbounded Postgres NUMERIC columns + // If set, these override the default precision (76) and scale (38) for unbounded NUMERIC + // Only applies when destination is ClickHouse; ignored for other destinations + optional int32 clickhouse_numeric_default_precision = 27; + optional int32 clickhouse_numeric_default_scale = 28; } // FlowConnectionConfigsCore is used internally in the codebase, it is safe to remove (mark reserved) fields from it @@ -137,6 +143,12 @@ message FlowConnectionConfigsCore { map env = 24; uint32 version = 25; + + // ClickHouse-specific numeric settings for unbounded Postgres NUMERIC columns + // If set, these override the default precision (76) and scale (38) for unbounded NUMERIC + // Only applies when destination is ClickHouse; ignored for other destinations + optional int32 clickhouse_numeric_default_precision = 27; + optional int32 clickhouse_numeric_default_scale = 28; } message RenameTableOption { diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index 3918d53cbb..ce7a51b83d 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -272,4 +272,49 @@ export const cdcSettings: MirrorSetting[] = [ tips: '{"string":"string"} JSON mapping to override global settings for mirror', advanced: AdvancedSettingType.ALL, }, + { + label: 'ClickHouse Decimal Precision (Unbounded)', + stateHandler: (value, setter) => + setter( + (curr: CDCConfig): CDCConfig => ({ + ...curr, + clickhouseNumericDefaultPrecision: (value as number) || 0, + }) + ), + type: 'number', + default: 76, + tips: 'For unbounded Postgres NUMERIC columns (no precision specified), this sets the default decimal precision in ClickHouse. Range: 1-76 (default 76).', + advanced: AdvancedSettingType.ALL, + checked: (config: CDCConfig) => { + // Only show for ClickHouse destinations + return ( + config.destinationName?.toLowerCase().includes('clickhouse') || + config.destination_name?.toLowerCase().includes('clickhouse') || + false + ); + }, + }, + { + label: 'ClickHouse Decimal Scale (Unbounded)', + stateHandler: (value, setter) => + setter( + (curr: CDCConfig): CDCConfig => ({ + ...curr, + clickhouseNumericDefaultScale: (value as number) || 0, + }) + ), + type: 'number', + default: 38, + tips: 'For unbounded Postgres NUMERIC columns, this sets the default decimal scale (digits after decimal point) in ClickHouse. Range: 0 to Precision (default 38).', + advanced: AdvancedSettingType.ALL, + checked: (config: CDCConfig) => { + // Only show for ClickHouse destinations + return ( + config.destinationName?.toLowerCase().includes('clickhouse') || + config.destination_name?.toLowerCase().includes('clickhouse') || + false + ); + }, + }, ]; + diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 6b0887d2cf..c8a96bba92 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -49,6 +49,8 @@ export const blankCDCSetting: CDCConfig = { env: {}, envString: '', version: 0, + clickhouseNumericDefaultPrecision: 0, + clickhouseNumericDefaultScale: 0, }; export const cdcSourceDefaults: { [index: string]: Partial } = { From b8b8329262ec5b1b298e57f48acb50c11260c669 Mon Sep 17 00:00:00 2001 From: Ravi Date: Thu, 13 Nov 2025 16:47:25 +0530 Subject: [PATCH 2/2] refactor: remove comments --- flow/cmd/validate_mirror.go | 6 ---- flow/connectors/clickhouse/normalize.go | 4 --- flow/connectors/clickhouse/normalize_query.go | 30 +++++++++---------- flow/e2e/clickhouse_test.go | 10 ------- flow/internal/dynamicconf.go | 4 --- flow/model/qvalue/dwh.go | 1 - flow/model/qvalue/kind.go | 2 -- 7 files changed, 14 insertions(+), 43 deletions(-) diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 913f14fb44..098b3e1441 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -65,7 +65,6 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl( errors.New("invalid config: initial_snapshot_only is true but do_initial_snapshot is false")) } - // Validate ClickHouse numeric settings if provided var chDefPrecision, chDefScale int32 if connectionConfigs.ClickhouseNumericDefaultPrecision != nil { chDefPrecision = *connectionConfigs.ClickhouseNumericDefaultPrecision @@ -74,30 +73,25 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl( chDefScale = *connectionConfigs.ClickhouseNumericDefaultScale } if chDefPrecision > 0 || chDefScale > 0 { - // Load destination peer to check if it's ClickHouse dstPeer, err := connectors.LoadPeer(ctx, h.pool, connectionConfigs.DestinationName) if err != nil { return nil, NewFailedPreconditionApiError(fmt.Errorf("failed to load destination peer: %w", err)) } - // Only validate ClickHouse numeric settings for ClickHouse destinations if dstPeer.Type == protos.DBType_CLICKHOUSE { precision := chDefPrecision scale := chDefScale - // If scale is provided but precision is not, reject early with a clear error if precision == 0 && scale > 0 { return nil, NewInvalidArgumentApiError( errors.New("clickhouse numeric precision must be set when providing a custom scale")) } - // Validate precision: 1-76 if precision < 1 || precision > 76 { return nil, NewInvalidArgumentApiError( fmt.Errorf("clickhouse numeric precision must be between 1 and 76, got %d", precision)) } - // Validate scale: 0 to precision if scale < 0 || scale > precision { return nil, NewInvalidArgumentApiError( fmt.Errorf("clickhouse numeric scale must be between 0 and %d (precision), got %d", precision, scale)) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 65f64e3726..fbf4ecf6fc 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -202,7 +202,6 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { var err error - // Fetch optional ClickHouse numeric default overrides from env var chDefPrecision, chDefScale int32 if p, errP := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, config.Env); errP == nil { chDefPrecision = p @@ -224,18 +223,15 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( fmt.Fprintf(builder, "%s %s, ", peerdb_clickhouse.QuoteIdentifier(dstColName), clickHouseType) } - // synced at column will be added to all normalized tables if config.SyncedAtColName != "" { colName := strings.ToLower(config.SyncedAtColName) fmt.Fprintf(builder, "%s DateTime64(9) DEFAULT now64(), ", peerdb_clickhouse.QuoteIdentifier(colName)) } - // add _peerdb_source_schema_name column if sourceSchemaAsDestinationColumn { fmt.Fprintf(builder, "%s %s, ", peerdb_clickhouse.QuoteIdentifier(sourceSchemaColName), sourceSchemaColType) } - // add sign and version columns fmt.Fprintf(builder, "%s %s, %s %s)", peerdb_clickhouse.QuoteIdentifier(isDeletedColumn), isDeletedColType, peerdb_clickhouse.QuoteIdentifier(versionColName), versionColType) diff --git a/flow/connectors/clickhouse/normalize_query.go b/flow/connectors/clickhouse/normalize_query.go index a0607cae79..3bd7fb65f6 100644 --- a/flow/connectors/clickhouse/normalize_query.go +++ b/flow/connectors/clickhouse/normalize_query.go @@ -16,26 +16,24 @@ import ( ) type NormalizeQueryGenerator struct { - env map[string]string - tableNameSchemaMapping map[string]*protos.TableSchema - chVersion *chproto.Version - Query string - TableName string - rawTableName string - isDeletedColName string - tableMappings []*protos.TableMapping - lastNormBatchID int64 - endBatchID int64 - enablePrimaryUpdate bool - sourceSchemaAsDestinationColumn bool - cluster bool - version uint32 - // Override defaults for unbounded Postgres NUMERIC -> ClickHouse Decimal + env map[string]string + tableNameSchemaMapping map[string]*protos.TableSchema + chVersion *chproto.Version + Query string + TableName string + rawTableName string + isDeletedColName string + tableMappings []*protos.TableMapping + lastNormBatchID int64 + endBatchID int64 + enablePrimaryUpdate bool + sourceSchemaAsDestinationColumn bool + cluster bool + version uint32 clickhouseNumericDefaultPrecision int32 clickhouseNumericDefaultScale int32 } -// NewTableNormalizeQuery constructs a TableNormalizeQuery with required fields. func NewNormalizeQueryGenerator( tableName string, tableNameSchemaMapping map[string]*protos.TableSchema, diff --git a/flow/e2e/clickhouse_test.go b/flow/e2e/clickhouse_test.go index 21094e51bb..2cfeea34f0 100644 --- a/flow/e2e/clickhouse_test.go +++ b/flow/e2e/clickhouse_test.go @@ -203,15 +203,12 @@ func (s ClickHouseSuite) Test_Addition_Removal() { RequireEnvCanceled(s.t, env) } -// Verifies that unbounded Postgres NUMERIC maps to ClickHouse Decimal(precision, scale) -// when overrides are provided via env dynamic config. func (s ClickHouseSuite) Test_ClickHouse_Numeric_Overrides_Unbounded() { tc := NewTemporalClient(s.t) srcTable := s.attachSchemaSuffix("test_numeric_override_src") dstTable := "test_numeric_override_dst" - // Create source table with unbounded NUMERIC (no typmod) and not null to avoid Nullable in CH require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id SERIAL PRIMARY KEY, @@ -219,12 +216,10 @@ func (s ClickHouseSuite) Test_ClickHouse_Numeric_Overrides_Unbounded() { ); `, srcTable))) - // Insert a couple of rows require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` INSERT INTO %s (val) VALUES (123.456), (789.01234); `, srcTable))) - // Generate flow with ClickHouse destination and set numeric overrides via env connectionGen := FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("numeric_override_flow"), TableNameMapping: map[string]string{srcTable: dstTable}, @@ -240,7 +235,6 @@ func (s ClickHouseSuite) Test_ClickHouse_Numeric_Overrides_Unbounded() { env := ExecutePeerflow(s.t, tc, flowConnConfig) - // Connect to ClickHouse and verify the destination column type ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) require.NoError(s.t, err) defer ch.Close() @@ -248,12 +242,8 @@ func (s ClickHouseSuite) Test_ClickHouse_Numeric_Overrides_Unbounded() { expectedType := "Decimal(60, 10)" dbName := s.Peer().GetClickhouseConfig().Database - // Wait until the table is created and type is materialized to expected Decimal EnvWaitFor(s.t, env, 3*time.Minute, "wait for ClickHouse table with overridden numeric type", func() bool { var colType string - // system.columns stores type without Nullable() wrapper in separate nullable flag for newer versions, - // but to be safe, we accept exact Decimal(60, 10) or Nullable(Decimal(60, 10)). - // We query the exact type string ClickHouse exposes. rows, qerr := ch.Query(s.t.Context(), `SELECT type FROM system.columns WHERE database = ? AND table = ? AND name = 'val' LIMIT 1`, dbName, dstTable) if qerr != nil { diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index c7d0d50ea8..63d1224f8f 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -617,14 +617,10 @@ func PeerDBEnableClickHouseJSON(ctx context.Context, env map[string]string) (boo return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_JSON") } -// Mirror-scoped override for unbounded NUMERIC -> ClickHouse Decimal default precision. -// Prefer value from env map when present; falls back to dynamic settings if configured. func PeerDBClickHouseNumericDefaultPrecision(ctx context.Context, env map[string]string) (int32, error) { return dynamicConfSigned[int32](ctx, env, "PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_PRECISION") } -// Mirror-scoped override for unbounded NUMERIC -> ClickHouse Decimal default scale. -// Prefer value from env map when present; falls back to dynamic settings if configured. func PeerDBClickHouseNumericDefaultScale(ctx context.Context, env map[string]string) (int32, error) { return dynamicConfSigned[int32](ctx, env, "PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_SCALE") } diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index e06c79068b..a98b0ff0db 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -14,7 +14,6 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy // If user provided overrides and typmod is unbounded (precision and scale are 0), use the overrides if precision == 0 && scale == 0 && (chDefaultPrecision > 0 || chDefaultScale >= 0) { // Convert from int32 to int16 for comparison and return - // If chDefaultPrecision was set but chDefaultScale wasn't, still use the defaults as fallback if chDefaultPrecision > 0 { return int16(chDefaultPrecision), int16(chDefaultScale) } diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index adeed19873..e90b65df57 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -29,7 +29,6 @@ func GetNumericDestinationType( return NumericDestinationType{IsString: true} } } - // For ClickHouse, apply overrides only when precision is unbounded (0,0) and user provided a precision > 0. destPrecision, destScale := DetermineNumericSettingForDWH(precision, scale, targetDWH, chDefaultPrecision, chDefaultScale) return NumericDestinationType{ IsString: false, @@ -51,7 +50,6 @@ func getClickHouseTypeForNumericColumn(ctx context.Context, env map[string]strin return fmt.Sprintf("Decimal(%d, %d)", destinationType.Precision, destinationType.Scale), nil } -// Extended to accept optional ClickHouse default precision/scale overrides. func ToDWHColumnType( ctx context.Context, kind types.QValueKind,