Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,40 @@ func (h *FlowRequestHandler) validateCDCMirrorImpl(
errors.New("invalid config: initial_snapshot_only is true but do_initial_snapshot is false"))
}

var chDefPrecision, chDefScale int32
if connectionConfigs.ClickhouseNumericDefaultPrecision != nil {
chDefPrecision = *connectionConfigs.ClickhouseNumericDefaultPrecision
}
if connectionConfigs.ClickhouseNumericDefaultScale != nil {
chDefScale = *connectionConfigs.ClickhouseNumericDefaultScale
}
if chDefPrecision > 0 || chDefScale > 0 {
dstPeer, err := connectors.LoadPeer(ctx, h.pool, connectionConfigs.DestinationName)
if err != nil {
return nil, NewFailedPreconditionApiError(fmt.Errorf("failed to load destination peer: %w", err))
}

if dstPeer.Type == protos.DBType_CLICKHOUSE {
precision := chDefPrecision
scale := chDefScale

if precision == 0 && scale > 0 {
return nil, NewInvalidArgumentApiError(
errors.New("clickhouse numeric precision must be set when providing a custom scale"))
}

if precision < 1 || precision > 76 {
return nil, NewInvalidArgumentApiError(
fmt.Errorf("clickhouse numeric precision must be between 1 and 76, got %d", precision))
}

if scale < 0 || scale > precision {
return nil, NewInvalidArgumentApiError(
fmt.Errorf("clickhouse numeric scale must be between 0 and %d (precision), got %d", precision, scale))
}
}
}

for _, tm := range connectionConfigs.TableMappings {
for _, col := range tm.Columns {
if !CustomColumnTypeRegex.MatchString(col.DestinationType) {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func DefineAvroSchema(dstTableName string,

func GetAvroType(bqField *bigquery.FieldSchema) (avro.Schema, error) {
avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH(
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY)
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY, 0, 0)

considerRepeated := func(typ avro.Type, repeated bool) avro.Schema {
if repeated {
Expand Down
14 changes: 14 additions & 0 deletions flow/connectors/clickhouse/avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,20 @@
schema types.QRecordSchema,
avroNameMap map[string]string,
) (*model.QRecordAvroSchemaDefinition, error) {
// Inject ClickHouse numeric default overrides from dynamic env (if set) for Avro schema generation
if s.ClickHouseConnector != nil {
copied := make(map[string]string, len(env)+2)
for k, v := range env {
copied[k] = v
}
if p, err := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, env); err == nil && p > 0 {
copied["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_PRECISION"] = fmt.Sprintf("%d", p)

Check failure on line 376 in flow/connectors/clickhouse/avro_sync.go

View workflow job for this annotation

GitHub Actions / lint

integer-format: fmt.Sprintf can be replaced with faster strconv.Itoa (perfsprint)
}
if sc, err := internal.PeerDBClickHouseNumericDefaultScale(ctx, env); err == nil {
copied["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_SCALE"] = fmt.Sprintf("%d", sc)

Check failure on line 379 in flow/connectors/clickhouse/avro_sync.go

View workflow job for this annotation

GitHub Actions / lint

integer-format: fmt.Sprintf can be replaced with faster strconv.Itoa (perfsprint)
}
env = copied
}
avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_CLICKHOUSE, avroNameMap)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
Expand Down
10 changes: 10 additions & 0 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
return nil
}

// Fetch optional ClickHouse numeric default overrides from env
var chDefPrecision, chDefScale int32
if p, err := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, env); err == nil {
chDefPrecision = p
}
if s, err := internal.PeerDBClickHouseNumericDefaultScale(ctx, env); err == nil {
chDefScale = s
}

onCluster := c.onCluster()
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
Expand All @@ -189,6 +198,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
qvKind := types.QValueKind(addedColumn.Type)
clickHouseColType, err := qvalue.ToDWHColumnType(
ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled,
chDefPrecision, chDefScale,
)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
Expand Down
22 changes: 19 additions & 3 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,16 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(

if clickHouseType == "" {
var err error
var chDefPrecision, chDefScale int32
if p, errP := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, config.Env); errP == nil {
chDefPrecision = p
}
if s, errS := internal.PeerDBClickHouseNumericDefaultScale(ctx, config.Env); errS == nil {
chDefScale = s
}
clickHouseType, err = qvalue.ToDWHColumnType(
ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled,
chDefPrecision, chDefScale,
)
if err != nil {
return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
Expand All @@ -215,18 +223,15 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
fmt.Fprintf(builder, "%s %s, ", peerdb_clickhouse.QuoteIdentifier(dstColName), clickHouseType)
}

// synced at column will be added to all normalized tables
if config.SyncedAtColName != "" {
colName := strings.ToLower(config.SyncedAtColName)
fmt.Fprintf(builder, "%s DateTime64(9) DEFAULT now64(), ", peerdb_clickhouse.QuoteIdentifier(colName))
}

// add _peerdb_source_schema_name column
if sourceSchemaAsDestinationColumn {
fmt.Fprintf(builder, "%s %s, ", peerdb_clickhouse.QuoteIdentifier(sourceSchemaColName), sourceSchemaColType)
}

// add sign and version columns
fmt.Fprintf(builder, "%s %s, %s %s)",
peerdb_clickhouse.QuoteIdentifier(isDeletedColumn), isDeletedColType,
peerdb_clickhouse.QuoteIdentifier(versionColName), versionColType)
Expand Down Expand Up @@ -560,6 +565,15 @@ func (c *ClickHouseConnector) NormalizeRecords(
continue
}

// Fetch optional ClickHouse numeric default overrides from env for query generation
var chDefPrecision, chDefScale int32
if p, errP := internal.PeerDBClickHouseNumericDefaultPrecision(ctx, req.Env); errP == nil {
chDefPrecision = p
}
if s, errS := internal.PeerDBClickHouseNumericDefaultScale(ctx, req.Env); errS == nil {
chDefScale = s
}

queryGenerator := NewNormalizeQueryGenerator(
tbl,
req.TableNameSchemaMapping,
Expand All @@ -574,6 +588,8 @@ func (c *ClickHouseConnector) NormalizeRecords(
c.Config.Cluster != "",
req.SoftDeleteColName,
req.Version,
chDefPrecision,
chDefScale,
)
query, err := queryGenerator.BuildQuery(ctx)
if err != nil {
Expand Down
62 changes: 34 additions & 28 deletions flow/connectors/clickhouse/normalize_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@ import (
)

type NormalizeQueryGenerator struct {
env map[string]string
tableNameSchemaMapping map[string]*protos.TableSchema
chVersion *chproto.Version
Query string
TableName string
rawTableName string
isDeletedColName string
tableMappings []*protos.TableMapping
lastNormBatchID int64
endBatchID int64
enablePrimaryUpdate bool
sourceSchemaAsDestinationColumn bool
cluster bool
version uint32
env map[string]string
tableNameSchemaMapping map[string]*protos.TableSchema
chVersion *chproto.Version
Query string
TableName string
rawTableName string
isDeletedColName string
tableMappings []*protos.TableMapping
lastNormBatchID int64
endBatchID int64
enablePrimaryUpdate bool
sourceSchemaAsDestinationColumn bool
cluster bool
version uint32
clickhouseNumericDefaultPrecision int32
clickhouseNumericDefaultScale int32
}

// NewTableNormalizeQuery constructs a TableNormalizeQuery with required fields.
func NewNormalizeQueryGenerator(
tableName string,
tableNameSchemaMapping map[string]*protos.TableSchema,
Expand All @@ -47,25 +48,29 @@ func NewNormalizeQueryGenerator(
cluster bool,
configuredSoftDeleteColName string,
version uint32,
clickhouseNumericDefaultPrecision int32,
clickhouseNumericDefaultScale int32,
) *NormalizeQueryGenerator {
isDeletedColumn := isDeletedColName
if configuredSoftDeleteColName != "" {
isDeletedColumn = configuredSoftDeleteColName
}
return &NormalizeQueryGenerator{
TableName: tableName,
tableNameSchemaMapping: tableNameSchemaMapping,
tableMappings: tableMappings,
endBatchID: endBatchID,
lastNormBatchID: lastNormBatchID,
enablePrimaryUpdate: enablePrimaryUpdate,
sourceSchemaAsDestinationColumn: sourceSchemaAsDestinationColumn,
env: env,
rawTableName: rawTableName,
chVersion: chVersion,
cluster: cluster,
isDeletedColName: isDeletedColumn,
version: version,
TableName: tableName,
tableNameSchemaMapping: tableNameSchemaMapping,
tableMappings: tableMappings,
endBatchID: endBatchID,
lastNormBatchID: lastNormBatchID,
enablePrimaryUpdate: enablePrimaryUpdate,
sourceSchemaAsDestinationColumn: sourceSchemaAsDestinationColumn,
env: env,
rawTableName: rawTableName,
chVersion: chVersion,
cluster: cluster,
isDeletedColName: isDeletedColumn,
version: version,
clickhouseNumericDefaultPrecision: clickhouseNumericDefaultPrecision,
clickhouseNumericDefaultScale: clickhouseNumericDefaultScale,
}
}

Expand Down Expand Up @@ -124,6 +129,7 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
var err error
clickHouseType, err = qvalue.ToDWHColumnType(
ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled,
t.clickhouseNumericDefaultPrecision, t.clickhouseNumericDefaultScale,
)
if err != nil {
return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err)
Expand Down
6 changes: 6 additions & 0 deletions flow/connectors/clickhouse/normalize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ func TestBuildQuery_Basic(t *testing.T) {
false,
"",
shared.InternalVersion_Latest,
0,
0,
)

query, err := g.BuildQuery(ctx)
Expand Down Expand Up @@ -312,6 +314,8 @@ func TestBuildQuery_WithPrimaryUpdate(t *testing.T) {
false,
"",
shared.InternalVersion_Latest,
0,
0,
)

query, err := g.BuildQuery(ctx)
Expand Down Expand Up @@ -363,6 +367,8 @@ func TestBuildQuery_WithSourceSchemaAsDestinationColumn(t *testing.T) {
true,
"",
shared.InternalVersion_Latest,
0,
0,
)

query, err := g.BuildQuery(ctx)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
for _, column := range columns {
genericColumnType := column.Type
qvKind := types.QValueKind(genericColumnType)
sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled)
sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled, 0, 0)

Check failure on line 39 in flow/connectors/snowflake/merge_stmt_generator.go

View workflow job for this annotation

GitHub Actions / lint

The line is 146 characters long, which exceeds the maximum of 144 characters. (lll)
if err != nil {
return "", fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(
for _, addedColumn := range schemaDelta.AddedColumns {
qvKind := types.QValueKind(addedColumn.Type)
sfColtype, err := qvalue.ToDWHColumnType(
ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled,
ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, 0, 0,
)
if err != nil {
return fmt.Errorf("failed to convert column type %s to snowflake type: %w",
Expand Down Expand Up @@ -663,7 +663,7 @@ func generateCreateTableSQLForNormalizedTable(
normalizedColName := SnowflakeIdentifierNormalize(column.Name)
qvKind := types.QValueKind(genericColumnType)
sfColType, err := qvalue.ToDWHColumnType(
ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled,
ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, 0, 0,
)
if err != nil {
slog.WarnContext(ctx, fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType),
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func truncateNumerics(
switch numeric := val.(type) {
case types.QValueNumeric:
destType := qvalue.GetNumericDestinationType(
numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString,
numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString, 0, 0,
)
if destType.IsString {
newVal = val
Expand All @@ -204,7 +204,7 @@ func truncateNumerics(
}
case types.QValueArrayNumeric:
destType := qvalue.GetNumericDestinationType(
numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString,
numeric.Precision, numeric.Scale, targetDWH, unboundedNumericAsString, 0, 0,
)
if destType.IsString {
newVal = val
Expand Down
60 changes: 60 additions & 0 deletions flow/e2e/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,66 @@ func (s ClickHouseSuite) Test_Addition_Removal() {
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_ClickHouse_Numeric_Overrides_Unbounded() {
tc := NewTemporalClient(s.t)

srcTable := s.attachSchemaSuffix("test_numeric_override_src")
dstTable := "test_numeric_override_dst"

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
val NUMERIC NOT NULL
);
`, srcTable)))

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
INSERT INTO %s (val) VALUES (123.456), (789.01234);
`, srcTable)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("numeric_override_flow"),
TableNameMapping: map[string]string{srcTable: dstTable},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
if flowConnConfig.Env == nil {
flowConnConfig.Env = map[string]string{}
}
flowConnConfig.Env["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_PRECISION"] = "60"
flowConnConfig.Env["PEERDB_CLICKHOUSE_NUMERIC_DEFAULT_SCALE"] = "10"

env := ExecutePeerflow(s.t, tc, flowConnConfig)

ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig())
require.NoError(s.t, err)
defer ch.Close()

expectedType := "Decimal(60, 10)"
dbName := s.Peer().GetClickhouseConfig().Database

EnvWaitFor(s.t, env, 3*time.Minute, "wait for ClickHouse table with overridden numeric type", func() bool {
var colType string
rows, qerr := ch.Query(s.t.Context(),
`SELECT type FROM system.columns WHERE database = ? AND table = ? AND name = 'val' LIMIT 1`, dbName, dstTable)
if qerr != nil {
return false
}
defer rows.Close()
if !rows.Next() {
return false
}
if scanErr := rows.Scan(&colType); scanErr != nil {
return false
}
return colType == expectedType || colType == ("Nullable("+expectedType+")")
})

env.Cancel(s.t.Context())
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_NullableMirrorSetting() {
srcTableName := "test_nullable_mirror"
srcFullName := s.attachSchemaSuffix(srcTableName)
Expand Down
Loading
Loading