From 14f7bf52e3d3279032b021db424ad4dad4aa09e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 17 Sep 2025 19:17:29 +0000 Subject: [PATCH 01/13] poc --- flow/connectors/clickhouse/cdc.go | 87 ++++++++++++++++++++++++++++--- flow/internal/dynamicconf.go | 12 +++++ 2 files changed, 93 insertions(+), 6 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 29f8371e50..c23a2d2838 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "strings" chproto "github.com/ClickHouse/ch-go/proto" "github.com/ClickHouse/clickhouse-go/v2" @@ -146,17 +147,91 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( } func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { - res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + enableStream, err := internal.PeerDBEnableClickHouseStream(ctx, req.Env) if err != nil { return nil, err } - if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpoint); err != nil { - c.logger.Error("failed to increment id", slog.Any("error", err)) - return nil, err - } + if enableStream { + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + var numRecords int64 + Loop: + for { + select { + case record, ok := <-req.Records.GetRecords(): + if !ok { + c.logger.Info("flushing batches because no more records") + break Loop + } + numRecords += 1 + tableSchema := req.TableNameSchemaMapping[record.GetDestinationTableName()] + switch r := record.(type) { + case *model.InsertRecord[model.RecordItems]: + colNames := make([]string, 0, len(tableSchema.Columns)) + values := make([]string, 0, len(tableSchema.Columns)) + for _, col := range tableSchema.Columns { + colNames = append(colNames, peerdb_clickhouse.QuoteIdentifier(col.Name)) + val := r.Items.GetColumnValue(col.Name) + values = append(values, peerdb_clickhouse.QuoteLiteral(fmt.Sprint(val.Value()))) + } + c.exec(ctx, fmt.Sprintf("INSERT INTO %s(%s) VALUES (%s)", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(colNames, ","), strings.Join(values, ","))) + case *model.UpdateRecord[model.RecordItems]: + assignments := make([]string, 0, len(tableSchema.Columns)) + for _, col := range tableSchema.Columns { + assignments = append(assignments, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(col.Name), peerdb_clickhouse.QuoteLiteral(fmt.Sprint(r.NewItems.GetColumnValue(col.Name))), + )) + } + where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) + for _, colName := range tableSchema.PrimaryKeyColumns { + where = append(where, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), peerdb_clickhouse.QuoteLiteral(fmt.Sprint(r.OldItems.GetColumnValue(colName))), + )) + } + c.exec(ctx, fmt.Sprintf("UPDATE %s SET %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), + strings.Join(assignments, ","), + strings.Join(where, " AND "))) + case *model.DeleteRecord[model.RecordItems]: + where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) + for _, colName := range tableSchema.PrimaryKeyColumns { + where = append(where, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), peerdb_clickhouse.QuoteLiteral(fmt.Sprint(r.Items.GetColumnValue(colName))), + )) + } + c.exec(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(where, ","))) + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } - return res, nil + lastCheckpoint := req.Records.GetLastCheckpoint() + if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil { + return nil, err + } + return &model.SyncResponse{ + CurrentSyncBatchID: req.SyncBatchID, + LastSyncedCheckpoint: lastCheckpoint, + NumRecordsSynced: numRecords, + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + }, nil + } else { + res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + if err != nil { + return nil, err + } + + if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpoint); err != nil { + c.logger.Error("failed to increment id", slog.Any("error", err)) + return nil, err + } + + return res, nil + } } func (c *ClickHouseConnector) ReplayTableSchemaDeltas( diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index fe3660b49d..94c627dcee 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -135,6 +135,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_SNOWFLAKE, }, + { + Name: "PEERDB_ENABLE_CLICKHOUSE_STREAM", + Description: "Stream records as series of inserts, lightweight updates, & lightweight deletes", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT", Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64", @@ -572,6 +580,10 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_NULLABLE") } +func PeerDBEnableClickHouseStream(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_ENABLE_CLICKHOUSE_STREAM") +} + func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) { format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT") if err != nil { From 647e18d94253a26bca14f00722c2a1f13aa2b743 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 18 Sep 2025 22:25:11 +0000 Subject: [PATCH 02/13] progress --- flow/connectors/clickhouse/cdc.go | 133 ++++++++++++++++++++--- flow/connectors/clickhouse/clickhouse.go | 5 + flow/connectors/clickhouse/normalize.go | 21 +++- flow/model/record.go | 24 +++- 4 files changed, 163 insertions(+), 20 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index c23a2d2838..1a342294ba 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -4,7 +4,11 @@ import ( "context" "fmt" "log/slog" + "slices" + "strconv" "strings" + "sync/atomic" + "time" chproto "github.com/ClickHouse/ch-go/proto" "github.com/ClickHouse/clickhouse-go/v2" @@ -146,15 +150,98 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( }, nil } +func formatSlice[T any](values []T, f func(T) string) string { + s := make([]string, 0, len(values)) + for _, value := range values { + s = append(s, f(value)) + } + return fmt.Sprintf("[%s]", strings.Join(s, ",")) +} + +func formatQValue(value types.QValue) string { + switch v := value.(type) { + case nil: + return "NULL" + case types.QValueNull: + return "NULL" + case types.QValueArrayBoolean: + return formatSlice(v.Val, func(val bool) string { + if val { + return "true" + } else { + return "false" + } + }) + case types.QValueArrayInt16: + return formatSlice(v.Val, func(val int16) string { + return strconv.FormatInt(int64(val), 10) + }) + case types.QValueArrayInt32: + return formatSlice(v.Val, func(val int32) string { + return strconv.FormatInt(int64(val), 10) + }) + case types.QValueArrayInt64: + return formatSlice(v.Val, func(val int64) string { + return strconv.FormatInt(val, 10) + }) + case types.QValueArrayString: + return formatSlice(v.Val, peerdb_clickhouse.QuoteLiteral) + case types.QValueArrayFloat32: + return formatSlice(v.Val, func(val float32) string { + return strconv.FormatFloat(float64(val), 'g', -1, 32) + }) + case types.QValueArrayFloat64: + return formatSlice(v.Val, func(val float64) string { + return strconv.FormatFloat(val, 'g', -1, 64) + }) + case types.QValueArrayEnum: + return formatSlice(v.Val, peerdb_clickhouse.QuoteLiteral) + default: + return peerdb_clickhouse.QuoteLiteral(fmt.Sprint(v.Value())) + } +} + func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { enableStream, err := internal.PeerDBEnableClickHouseStream(ctx, req.Env) if err != nil { return nil, err - } - - if enableStream { - tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + } else if enableStream { var numRecords int64 + lastSeenLSN := atomic.Int64{} + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + flushLoopDone := make(chan struct{}) + go func() { + flushTimeout, err := internal.PeerDBQueueFlushTimeoutSeconds(ctx, req.Env) + if err != nil { + c.logger.Warn("failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-flushLoopDone: + return + // flush loop doesn't block processing new messages + case <-ticker.C: + lastSeen := lastSeenLSN.Load() + if lastSeen > req.ConsumedOffset.Load() { + if err := c.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil { + c.logger.Warn("SetLastOffset error", slog.Any("error", err)) + } else { + shared.AtomicInt64Max(req.ConsumedOffset, lastSeen) + c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen)) + } + } + } + } + }() + defer close(flushLoopDone) + + // TODO sourceSchemaAsDestinationColumn Loop: for { select { @@ -172,37 +259,51 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe for _, col := range tableSchema.Columns { colNames = append(colNames, peerdb_clickhouse.QuoteIdentifier(col.Name)) val := r.Items.GetColumnValue(col.Name) - values = append(values, peerdb_clickhouse.QuoteLiteral(fmt.Sprint(val.Value()))) + values = append(values, formatQValue(val)) + } + if err := c.execWithLogging(ctx, fmt.Sprintf("INSERT INTO %s(%s,_peerdb_is_deleted,_peerdb_version) VALUES (%s,0,%d)", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(colNames, ","), strings.Join(values, ","), r.BaseRecord.CommitTimeNano)); err != nil { + return nil, err } - c.exec(ctx, fmt.Sprintf("INSERT INTO %s(%s) VALUES (%s)", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(colNames, ","), strings.Join(values, ","))) case *model.UpdateRecord[model.RecordItems]: assignments := make([]string, 0, len(tableSchema.Columns)) for _, col := range tableSchema.Columns { - assignments = append(assignments, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(col.Name), peerdb_clickhouse.QuoteLiteral(fmt.Sprint(r.NewItems.GetColumnValue(col.Name))), - )) + // TODO needs to match custom ordering key + if !slices.Contains(tableSchema.PrimaryKeyColumns, col.Name) { + assignments = append(assignments, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(col.Name), formatQValue(r.NewItems.GetColumnValue(col.Name)), + )) + } } where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) for _, colName := range tableSchema.PrimaryKeyColumns { + item := r.OldItems.GetColumnValue(colName) + if item == nil { + item = r.NewItems.GetColumnValue(colName) + } where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), peerdb_clickhouse.QuoteLiteral(fmt.Sprint(r.OldItems.GetColumnValue(colName))), + peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(item), )) } - c.exec(ctx, fmt.Sprintf("UPDATE %s SET %s WHERE %s", + if err := c.execWithLogging(ctx, fmt.Sprintf("UPDATE %s SET %s,_peerdb_is_deleted=0 WHERE %s", peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(assignments, ","), - strings.Join(where, " AND "))) + strings.Join(where, " AND "))); err != nil { + return nil, err + } case *model.DeleteRecord[model.RecordItems]: where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) for _, colName := range tableSchema.PrimaryKeyColumns { where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), peerdb_clickhouse.QuoteLiteral(fmt.Sprint(r.Items.GetColumnValue(colName))), + peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(r.Items.GetColumnValue(colName)), )) } - c.exec(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(where, ","))) + if err := c.execWithLogging(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(where, " AND "))); err != nil { + return nil, err + } } + shared.AtomicInt64Max(&lastSeenLSN, record.GetBaseRecord().CheckpointID) case <-ctx.Done(): return nil, ctx.Err() } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index c7996fad32..4e79e419b7 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -258,6 +258,11 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou if config.Cluster != "" { settings["insert_distributed_sync"] = uint64(1) } + if allowStream, err := internal.PeerDBEnableClickHouseStream(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load stream config: %w", err) + } else if allowStream { + settings["allow_experimental_lightweight_update"] = true + } conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{shared.JoinHostPort(config.Host, config.Port)}, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 58b9c216f1..5766223749 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -254,8 +254,15 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( } } - if allowNullableKey { - stmtBuilder.WriteString(" SETTINGS allow_nullable_key = 1") + if allowStream, err := internal.PeerDBEnableClickHouseStream(ctx, config.Env); err != nil { + return nil, fmt.Errorf("failed to load stream config: %w", err) + } else if allowStream { + stmtBuilder.WriteString(" SETTINGS enable_block_number_column=1,enable_block_offset_column=1") + if allowNullableKey { + stmtBuilder.WriteString(",allow_nullable_key=1") + } + } else if allowNullableKey { + stmtBuilder.WriteString(" SETTINGS allow_nullable_key=1") } if c.Config.Cluster != "" { @@ -414,6 +421,16 @@ func (c *ClickHouseConnector) NormalizeRecords( }, nil } + enableStream, err := internal.PeerDBEnableClickHouseStream(ctx, req.Env) + if err != nil { + return model.NormalizeResponse{}, err + } else if enableStream { + return model.NormalizeResponse{ + StartBatchID: normBatchID, + EndBatchID: req.SyncBatchID, + }, nil + } + endBatchID := req.SyncBatchID groupBatches, err := internal.PeerDBGroupNormalize(ctx, req.Env) if err != nil { diff --git a/flow/model/record.go b/flow/model/record.go index e0bed285e2..cce21e3f5c 100644 --- a/flow/model/record.go +++ b/flow/model/record.go @@ -13,8 +13,8 @@ type Record[T Items] interface { GetTransactionID() uint64 GetDestinationTableName() string GetSourceTableName() string - // get columns and values for the record - GetItems() T + GetBaseRecord() BaseRecord + GetItems() T // get columns and values for the record PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) } @@ -63,6 +63,10 @@ func (r *InsertRecord[T]) GetSourceTableName() string { return r.SourceTableName } +func (r *InsertRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *InsertRecord[T]) GetItems() T { return r.Items } @@ -100,6 +104,10 @@ func (r *UpdateRecord[T]) GetSourceTableName() string { return r.SourceTableName } +func (r *UpdateRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *UpdateRecord[T]) GetItems() T { return r.NewItems } @@ -135,6 +143,10 @@ func (r *DeleteRecord[T]) GetSourceTableName() string { return r.SourceTableName } +func (r *DeleteRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *DeleteRecord[T]) GetItems() T { return r.Items } @@ -164,6 +176,10 @@ func (r *RelationRecord[T]) GetSourceTableName() string { return r.TableSchemaDelta.SrcTableName } +func (r *RelationRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *RelationRecord[T]) GetItems() T { var none T return none @@ -190,6 +206,10 @@ func (r *MessageRecord[T]) GetSourceTableName() string { return "" } +func (r *MessageRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *MessageRecord[T]) GetItems() T { var none T return none From 003539dfdaf6e22877c49884ddbee3864db8b792 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 19 Sep 2025 17:41:02 +0000 Subject: [PATCH 03/13] more types --- flow/connectors/clickhouse/cdc.go | 45 +++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 1a342294ba..c2c54911e1 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -13,6 +13,7 @@ import ( chproto "github.com/ClickHouse/ch-go/proto" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/shopspring/decimal" "github.com/PeerDB-io/peerdb/flow/connectors/utils" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -194,8 +195,52 @@ func formatQValue(value types.QValue) string { return formatSlice(v.Val, func(val float64) string { return strconv.FormatFloat(val, 'g', -1, 64) }) + case types.QValueArrayNumeric: + return formatSlice(v.Val, func(val decimal.Decimal) string { + return val.String() + }) + case types.QValueArrayDate: + return formatSlice(v.Val, func(val time.Time) string { + return val.Format(time.DateOnly) + }) + case types.QValueArrayTimestamp: + return formatSlice(v.Val, func(val time.Time) string { + return val.Format(time.StampNano) + }) + case types.QValueArrayTimestampTZ: + return formatSlice(v.Val, func(val time.Time) string { + return val.Format(time.StampNano) + }) case types.QValueArrayEnum: return formatSlice(v.Val, peerdb_clickhouse.QuoteLiteral) + case types.QValueBoolean: + if v.Val { + return "true" + } else { + return "false" + } + case types.QValueInt16: + return strconv.FormatInt(int64(v.Val), 10) + case types.QValueInt32: + return strconv.FormatInt(int64(v.Val), 10) + case types.QValueInt64: + return strconv.FormatInt(v.Val, 10) + case types.QValueString: + return peerdb_clickhouse.QuoteLiteral(v.Val) + case types.QValueFloat32: + return strconv.FormatFloat(float64(v.Val), 'g', -1, 32) + case types.QValueFloat64: + return strconv.FormatFloat(v.Val, 'g', -1, 64) + case types.QValueNumeric: + return v.Val.String() + case types.QValueDate: + return v.Val.Format(time.DateOnly) + case types.QValueTimestamp: + return v.Val.Format(time.StampNano) + case types.QValueTimestampTZ: + return v.Val.Format(time.StampNano) + case types.QValueEnum: + return peerdb_clickhouse.QuoteLiteral(v.Val) default: return peerdb_clickhouse.QuoteLiteral(fmt.Sprint(v.Value())) } From ed512a6cc49b5d37d3eccb6d83aff59c80abbf9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 19 Sep 2025 18:33:02 +0000 Subject: [PATCH 04/13] lint --- flow/connectors/clickhouse/cdc.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index c2c54911e1..ffbf88ced1 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -307,7 +307,9 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe values = append(values, formatQValue(val)) } if err := c.execWithLogging(ctx, fmt.Sprintf("INSERT INTO %s(%s,_peerdb_is_deleted,_peerdb_version) VALUES (%s,0,%d)", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(colNames, ","), strings.Join(values, ","), r.BaseRecord.CommitTimeNano)); err != nil { + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), + strings.Join(colNames, ","), strings.Join(values, ","), r.BaseRecord.CommitTimeNano), + ); err != nil { return nil, err } case *model.UpdateRecord[model.RecordItems]: From ba8308647d21559100b23a6afe5f080033bc1655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 19 Sep 2025 22:14:58 +0000 Subject: [PATCH 05/13] enable async_insert, try play nicer with UPDATE not liking null coercion --- flow/connectors/clickhouse/cdc.go | 82 ++++++++++++++++++++++-- flow/connectors/clickhouse/clickhouse.go | 1 + 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index ffbf88ced1..5d9e1943b2 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -159,11 +159,61 @@ func formatSlice[T any](values []T, f func(T) string) string { return fmt.Sprintf("[%s]", strings.Join(s, ",")) } -func formatQValue(value types.QValue) string { +func formatQValue(value types.QValue, nullable bool) string { switch v := value.(type) { case nil: return "NULL" case types.QValueNull: + if !nullable { + switch types.QValueKind(v) { + case types.QValueKindArrayFloat32, + types.QValueKindArrayFloat64, + types.QValueKindArrayInt16, + types.QValueKindArrayInt32, + types.QValueKindArrayInt64, + types.QValueKindArrayString, + types.QValueKindArrayEnum, + types.QValueKindArrayDate, + types.QValueKindArrayInterval, + types.QValueKindArrayTimestamp, + types.QValueKindArrayTimestampTZ, + types.QValueKindArrayBoolean, + types.QValueKindArrayJSON, + types.QValueKindArrayJSONB, + types.QValueKindArrayUUID, + types.QValueKindArrayNumeric: + return "[]" + case types.QValueKindFloat32, + types.QValueKindFloat64, + types.QValueKindInt8, + types.QValueKindInt16, + types.QValueKindInt32, + types.QValueKindInt64, + types.QValueKindInt256, + types.QValueKindUInt8, + types.QValueKindUInt16, + types.QValueKindUInt32, + types.QValueKindUInt64, + types.QValueKindUInt256, + types.QValueKindBoolean, + types.QValueKindNumeric: + return "0" + case types.QValueKindQChar, + types.QValueKindString, + types.QValueKindEnum, + types.QValueKindBytes: + return "''" + case types.QValueKindJSON, + types.QValueKindJSONB: + return "'{}'" + case types.QValueKindUUID: + return "'00000000-0000-0000-0000-000000000000'" + case types.QValueKindDate: + return "'1970-01-01'" + case types.QValueKindTimestamp, types.QValueKindTimestampTZ: + return "'1970-01-01 00:00:00'" + } + } return "NULL" case types.QValueArrayBoolean: return formatSlice(v.Val, func(val bool) string { @@ -304,7 +354,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe for _, col := range tableSchema.Columns { colNames = append(colNames, peerdb_clickhouse.QuoteIdentifier(col.Name)) val := r.Items.GetColumnValue(col.Name) - values = append(values, formatQValue(val)) + values = append(values, formatQValue(val, tableSchema.NullableEnabled && col.Nullable)) } if err := c.execWithLogging(ctx, fmt.Sprintf("INSERT INTO %s(%s,_peerdb_is_deleted,_peerdb_version) VALUES (%s,0,%d)", peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), @@ -318,7 +368,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe // TODO needs to match custom ordering key if !slices.Contains(tableSchema.PrimaryKeyColumns, col.Name) { assignments = append(assignments, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(col.Name), formatQValue(r.NewItems.GetColumnValue(col.Name)), + peerdb_clickhouse.QuoteIdentifier(col.Name), formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), )) } } @@ -328,8 +378,19 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe if item == nil { item = r.NewItems.GetColumnValue(colName) } + var nullable bool + if tableSchema.NullableEnabled { + for _, col := range tableSchema.Columns { + if col.Name == colName { + if col.Nullable { + nullable = true + } + break + } + } + } where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(item), + peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(item, nullable), )) } if err := c.execWithLogging(ctx, fmt.Sprintf("UPDATE %s SET %s,_peerdb_is_deleted=0 WHERE %s", @@ -341,8 +402,19 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe case *model.DeleteRecord[model.RecordItems]: where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) for _, colName := range tableSchema.PrimaryKeyColumns { + var nullable bool + if tableSchema.NullableEnabled { + for _, col := range tableSchema.Columns { + if col.Name == colName { + if col.Nullable { + nullable = true + } + break + } + } + } where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(r.Items.GetColumnValue(colName)), + peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(r.Items.GetColumnValue(colName), nullable), )) } if err := c.execWithLogging(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 4e79e419b7..c628c73795 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -262,6 +262,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou return nil, fmt.Errorf("failed to load stream config: %w", err) } else if allowStream { settings["allow_experimental_lightweight_update"] = true + settings["async_insert"] = true } conn, err := clickhouse.Open(&clickhouse.Options{ From 0a35ba2fdc3575fb94a5179b9285aeba016def00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Sat, 20 Sep 2025 03:12:31 +0000 Subject: [PATCH 06/13] lint --- flow/connectors/clickhouse/cdc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 5d9e1943b2..81d3d0e020 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -368,7 +368,8 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe // TODO needs to match custom ordering key if !slices.Contains(tableSchema.PrimaryKeyColumns, col.Name) { assignments = append(assignments, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(col.Name), formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), + peerdb_clickhouse.QuoteIdentifier(col.Name), + formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), )) } } From c1ddeb738fa14e8e48d481e8c24181220805d340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 26 Sep 2025 00:52:08 +0000 Subject: [PATCH 07/13] ordering key, not primary key --- flow/connectors/clickhouse/cdc.go | 34 ++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 81d3d0e020..dcf4a79ec5 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "slices" "strconv" "strings" "sync/atomic" @@ -347,6 +346,17 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } numRecords += 1 tableSchema := req.TableNameSchemaMapping[record.GetDestinationTableName()] + orderingKey := make(map[string]struct{}) + for _, tm := range req.TableMappings { + if tm.DestinationTableIdentifier == record.GetDestinationTableName() && tm.SourceTableIdentifier == record.GetSourceTableName() { + for _, col := range tm.Columns { + if col.Ordering > 0 { + orderingKey[col.DestinationName] = struct{}{} + } + } + break + } + } switch r := record.(type) { case *model.InsertRecord[model.RecordItems]: colNames := make([]string, 0, len(tableSchema.Columns)) @@ -365,16 +375,20 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe case *model.UpdateRecord[model.RecordItems]: assignments := make([]string, 0, len(tableSchema.Columns)) for _, col := range tableSchema.Columns { - // TODO needs to match custom ordering key - if !slices.Contains(tableSchema.PrimaryKeyColumns, col.Name) { - assignments = append(assignments, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(col.Name), - formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), - )) + if _, unchanged := r.UnchangedToastColumns[col.Name]; unchanged { + continue } + if _, isOrdering := orderingKey[col.Name]; isOrdering { + // CH does not support UPDATE on these columns + continue + } + assignments = append(assignments, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(col.Name), + formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), + )) } - where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) - for _, colName := range tableSchema.PrimaryKeyColumns { + where := make([]string, 0, len(orderingKey)) + for colName, _ := range orderingKey { item := r.OldItems.GetColumnValue(colName) if item == nil { item = r.NewItems.GetColumnValue(colName) @@ -402,7 +416,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } case *model.DeleteRecord[model.RecordItems]: where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) - for _, colName := range tableSchema.PrimaryKeyColumns { + for colName, _ := range orderingKey { var nullable bool if tableSchema.NullableEnabled { for _, col := range tableSchema.Columns { From bf92e305b6663bac55fc6043972ce61055d3b85a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 29 Sep 2025 14:02:27 +0000 Subject: [PATCH 08/13] also work when no custom ordering key --- flow/connectors/clickhouse/cdc.go | 17 ++++++++++++++--- flow/shared/clickhouse/escape.go | 3 +++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index dcf4a79ec5..7fc3ba2bdf 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -290,6 +290,8 @@ func formatQValue(value types.QValue, nullable bool) string { return v.Val.Format(time.StampNano) case types.QValueEnum: return peerdb_clickhouse.QuoteLiteral(v.Val) + case types.QValueBytes: + return peerdb_clickhouse.QuoteLiteral(shared.UnsafeFastReadOnlyBytesToString(v.Val)) default: return peerdb_clickhouse.QuoteLiteral(fmt.Sprint(v.Value())) } @@ -345,7 +347,11 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe break Loop } numRecords += 1 - tableSchema := req.TableNameSchemaMapping[record.GetDestinationTableName()] + tableSchema, ok := req.TableNameSchemaMapping[record.GetDestinationTableName()] + if !ok { + c.logger.Warn("missing schema for table, ignoring", slog.String("table", record.GetDestinationTableName())) + continue + } orderingKey := make(map[string]struct{}) for _, tm := range req.TableMappings { if tm.DestinationTableIdentifier == record.GetDestinationTableName() && tm.SourceTableIdentifier == record.GetSourceTableName() { @@ -357,6 +363,11 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe break } } + if len(orderingKey) == 0 { + for _, colName := range tableSchema.PrimaryKeyColumns { + orderingKey[colName] = struct{}{} + } + } switch r := record.(type) { case *model.InsertRecord[model.RecordItems]: colNames := make([]string, 0, len(tableSchema.Columns)) @@ -388,7 +399,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe )) } where := make([]string, 0, len(orderingKey)) - for colName, _ := range orderingKey { + for colName := range orderingKey { item := r.OldItems.GetColumnValue(colName) if item == nil { item = r.NewItems.GetColumnValue(colName) @@ -416,7 +427,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } case *model.DeleteRecord[model.RecordItems]: where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) - for colName, _ := range orderingKey { + for colName := range orderingKey { var nullable bool if tableSchema.NullableEnabled { for _, col := range tableSchema.Columns { diff --git a/flow/shared/clickhouse/escape.go b/flow/shared/clickhouse/escape.go index 7a3bedec61..ce2784590a 100644 --- a/flow/shared/clickhouse/escape.go +++ b/flow/shared/clickhouse/escape.go @@ -21,6 +21,9 @@ func escape(result *strings.Builder, value string) { for idx := range len(value) { if mustEscape(value[idx]) { result.WriteByte('\\') + } else if value[idx] == 0 { + result.WriteString("\\x00") + continue } result.WriteByte(value[idx]) } From 7a1b73102e1d4434db016f43510d7974ef229ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 30 Sep 2025 04:46:02 +0000 Subject: [PATCH 09/13] parallel --- flow/connectors/clickhouse/cdc.go | 129 +++++++++++++++++++++++------- 1 file changed, 98 insertions(+), 31 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 7fc3ba2bdf..dc925f1905 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -3,6 +3,8 @@ package connclickhouse import ( "context" "fmt" + "hash/fnv" + "io" "log/slog" "strconv" "strings" @@ -13,6 +15,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/shopspring/decimal" + "golang.org/x/sync/errgroup" "github.com/PeerDB-io/peerdb/flow/connectors/utils" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -303,7 +306,15 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe return nil, err } else if enableStream { var numRecords int64 - lastSeenLSN := atomic.Int64{} + type query struct { + sql string + lsn int64 + } + var lsns [4]atomic.Int64 + var queries [4]chan query + for workerId := range len(queries) { + queries[workerId] = make(chan query) + } tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) flushLoopDone := make(chan struct{}) go func() { @@ -323,8 +334,12 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe return // flush loop doesn't block processing new messages case <-ticker.C: - lastSeen := lastSeenLSN.Load() - if lastSeen > req.ConsumedOffset.Load() { + oldLastSeen := req.ConsumedOffset.Load() + lastSeen := oldLastSeen + for workerId := range len(lsns) { + lastSeen = min(lastSeen, lsns[workerId].Load()) + } + if lastSeen > oldLastSeen { if err := c.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil { c.logger.Warn("SetLastOffset error", slog.Any("error", err)) } else { @@ -338,6 +353,28 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe defer close(flushLoopDone) // TODO sourceSchemaAsDestinationColumn + // TODO schema changes + fnvHash := fnv.New64a() + group, groupCtx := errgroup.WithContext(ctx) + for workerId := range len(lsns) { + group.Go(func() error { + for { + select { + case q := <-queries[workerId]: + if err := c.exec(groupCtx, q.sql); err != nil { + c.logger.Error("failed to execute %s: %w", q.sql, err) + if groupCtx.Err() != nil { + return nil + } + return err + } + shared.AtomicInt64Max(&lsns[workerId], q.lsn) + case <-groupCtx.Done(): + return nil + } + } + }) + } Loop: for { select { @@ -352,36 +389,48 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe c.logger.Warn("missing schema for table, ignoring", slog.String("table", record.GetDestinationTableName())) continue } - orderingKey := make(map[string]struct{}) + fnvHash.Reset() + orderingKeySlice := make([]string, 0) for _, tm := range req.TableMappings { if tm.DestinationTableIdentifier == record.GetDestinationTableName() && tm.SourceTableIdentifier == record.GetSourceTableName() { for _, col := range tm.Columns { if col.Ordering > 0 { - orderingKey[col.DestinationName] = struct{}{} + orderingKeySlice = append(orderingKeySlice, col.DestinationName) } } break } } - if len(orderingKey) == 0 { - for _, colName := range tableSchema.PrimaryKeyColumns { - orderingKey[colName] = struct{}{} - } + if len(orderingKeySlice) == 0 { + orderingKeySlice = tableSchema.PrimaryKeyColumns } + orderingKey := make(map[string]struct{}, len(orderingKeySlice)) + for _, colName := range orderingKeySlice { + orderingKey[colName] = struct{}{} + } + lsn := record.GetBaseRecord().CheckpointID switch r := record.(type) { case *model.InsertRecord[model.RecordItems]: colNames := make([]string, 0, len(tableSchema.Columns)) values := make([]string, 0, len(tableSchema.Columns)) + formattedValMap := make(map[string]string, len(orderingKeySlice)) for _, col := range tableSchema.Columns { colNames = append(colNames, peerdb_clickhouse.QuoteIdentifier(col.Name)) val := r.Items.GetColumnValue(col.Name) - values = append(values, formatQValue(val, tableSchema.NullableEnabled && col.Nullable)) + formattedVal := formatQValue(val, tableSchema.NullableEnabled && col.Nullable) + values = append(values, formattedVal) + if _, isOrdering := orderingKey[col.Name]; isOrdering { + formattedValMap[col.Name] = formattedVal + } } - if err := c.execWithLogging(ctx, fmt.Sprintf("INSERT INTO %s(%s,_peerdb_is_deleted,_peerdb_version) VALUES (%s,0,%d)", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), - strings.Join(colNames, ","), strings.Join(values, ","), r.BaseRecord.CommitTimeNano), - ); err != nil { - return nil, err + for _, colName := range orderingKeySlice { + io.WriteString(fnvHash, formattedValMap[colName]) + } + queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ + sql: fmt.Sprintf("INSERT INTO %s(%s,_peerdb_is_deleted,_peerdb_version) VALUES (%s,0,%d)", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), + strings.Join(colNames, ","), strings.Join(values, ","), r.BaseRecord.CommitTimeNano), + lsn: lsn, } case *model.UpdateRecord[model.RecordItems]: assignments := make([]string, 0, len(tableSchema.Columns)) @@ -398,8 +447,8 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), )) } - where := make([]string, 0, len(orderingKey)) - for colName := range orderingKey { + where := make([]string, 0, len(orderingKeySlice)) + for _, colName := range orderingKeySlice { item := r.OldItems.GetColumnValue(colName) if item == nil { item = r.NewItems.GetColumnValue(colName) @@ -415,19 +464,24 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } } } + formattedVal := formatQValue(item, nullable) where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(item, nullable), + peerdb_clickhouse.QuoteIdentifier(colName), formattedVal, )) + if _, isOrdering := orderingKey[colName]; isOrdering { + io.WriteString(fnvHash, formattedVal) + } } - if err := c.execWithLogging(ctx, fmt.Sprintf("UPDATE %s SET %s,_peerdb_is_deleted=0 WHERE %s", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), - strings.Join(assignments, ","), - strings.Join(where, " AND "))); err != nil { - return nil, err + queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ + sql: fmt.Sprintf("UPDATE %s SET %s,_peerdb_is_deleted=0 WHERE %s", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), + strings.Join(assignments, ","), + strings.Join(where, " AND ")), + lsn: lsn, } case *model.DeleteRecord[model.RecordItems]: where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) - for colName := range orderingKey { + for _, colName := range orderingKeySlice { var nullable bool if tableSchema.NullableEnabled { for _, col := range tableSchema.Columns { @@ -439,21 +493,34 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } } } + formattedVal := formatQValue(r.Items.GetColumnValue(colName), nullable) where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), formatQValue(r.Items.GetColumnValue(colName), nullable), + peerdb_clickhouse.QuoteIdentifier(colName), formattedVal, )) + if _, isOrdering := orderingKey[colName]; isOrdering { + io.WriteString(fnvHash, formattedVal) + } } - if err := c.execWithLogging(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(where, " AND "))); err != nil { - return nil, err + queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ + sql: fmt.Sprintf("DELETE FROM %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(where, " AND ")), + lsn: lsn, } } - shared.AtomicInt64Max(&lastSeenLSN, record.GetBaseRecord().CheckpointID) - case <-ctx.Done(): - return nil, ctx.Err() + case <-groupCtx.Done(): + if err := ctx.Err(); err != nil { + return nil, err + } + c.logger.Error("error syncing, ending batch", slog.Any("error", groupCtx.Err())) + break Loop } } + for _, ch := range queries { + close(ch) + } + group.Wait() + lastCheckpoint := req.Records.GetLastCheckpoint() if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil { return nil, err From c2acddbc7b7ebc81d66a944f0a277db13b3adcae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 2 Oct 2025 03:05:58 +0000 Subject: [PATCH 10/13] batch inserts, need to benchmark if batching DELETE makes sense using WHERE tuple(...) IN (...) --- flow/connectors/clickhouse/cdc.go | 194 ++++++++++++++++++----- flow/connectors/clickhouse/clickhouse.go | 1 + flow/connectors/clickhouse/validate.go | 3 +- 3 files changed, 159 insertions(+), 39 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index dc925f1905..7d3ec7f993 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,6 +2,7 @@ package connclickhouse import ( "context" + "errors" "fmt" "hash/fnv" "io" @@ -306,9 +307,20 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe return nil, err } else if enableStream { var numRecords int64 + const ( + queryTypeNone int8 = iota + queryTypeInsert + queryTypeUpdate + queryTypeDelete + ) type query struct { - sql string - lsn int64 + table string + values []string + columns []string + whereValues []string + whereColumns []string + lsn int64 + ty int8 } var lsns [4]atomic.Int64 var queries [4]chan query @@ -358,17 +370,123 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe group, groupCtx := errgroup.WithContext(ctx) for workerId := range len(lsns) { group.Go(func() error { + currentType := queryTypeNone + currentTable := "" + batchTimeout := make(<-chan time.Time) + var batch []query + finishBatch := func() error { + c.logger.Info("finishBatch", slog.Int("len", len(batch))) + if len(batch) > 0 { + switch currentType { + case queryTypeInsert: + columns := make([]string, 0, len(batch[0].columns)) + for _, colName := range batch[0].columns { + col := peerdb_clickhouse.QuoteIdentifier(colName) + columns = append(columns, col) + } + baseQuery := fmt.Sprintf("INSERT INTO %s(%s) VALUES ", + peerdb_clickhouse.QuoteIdentifier(currentTable), + strings.Join(columns, ","), + ) + // batch in inserts of 100KB + var batchIdx int + for batchIdx != -1 { + querySize := len(baseQuery) + finalValues := make([]string, 0, len(batch)) + for idx, q := range batch[batchIdx:] { + finalValue := fmt.Sprintf("(%s)", strings.Join(q.values, ",")) + finalValues = append(finalValues, finalValue) + querySize += len(finalValue) + 1 + if querySize > 100000 { + batchIdx += idx + break + } + } + batchIdx = -1 + if err := c.execWithLogging(ctx, + baseQuery+strings.Join(finalValues, ","), + ); err != nil { + return err + } + } + case queryTypeUpdate: + return errors.New("UPDATE does not support batching") + case queryTypeDelete: + return errors.New("DELETE does not support batching") + } + if currentType != queryTypeNone { + shared.AtomicInt64Max(&lsns[workerId], batch[len(batch)-1].lsn) + } + batchTimeout = make(<-chan time.Time) + batch = batch[:0] + } + currentType = queryTypeNone + currentTable = "" + return nil + } for { + c.logger.Info("top select") select { - case q := <-queries[workerId]: - if err := c.exec(groupCtx, q.sql); err != nil { - c.logger.Error("failed to execute %s: %w", q.sql, err) - if groupCtx.Err() != nil { - return nil + case q, ok := <-queries[workerId]: + if !ok { + return finishBatch() + } + if currentType != q.ty || currentTable != q.table { + if err := finishBatch(); err != nil { + return err } + } + switch q.ty { + case queryTypeUpdate: + assign := make([]string, 0, len(q.columns)) + where := make([]string, 0, len(q.whereColumns)) + for idx, colName := range q.columns { + assign = append(assign, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), + q.values[idx], + )) + } + for idx, colName := range q.whereColumns { + where = append(where, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), + q.whereValues[idx], + )) + } + if err := c.execWithLogging(ctx, fmt.Sprintf("UPDATE %s SET %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(q.table), + strings.Join(assign, ","), + strings.Join(where, " AND "), + )); err != nil { + return err + } + shared.AtomicInt64Max(&lsns[workerId], q.lsn) + case queryTypeDelete: + where := make([]string, 0, len(q.whereColumns)) + for idx, colName := range q.whereColumns { + where = append(where, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), + q.whereValues[idx], + )) + } + if err := c.execWithLogging(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(q.table), + strings.Join(where, " AND "), + )); err != nil { + return err + } + shared.AtomicInt64Max(&lsns[workerId], q.lsn) + case queryTypeInsert: + if len(batch) == 0 { + batchTimeout = time.After(time.Second) + } + batch = append(batch, q) + currentType = q.ty + currentTable = q.table + } + case <-batchTimeout: + if err := finishBatch(); err != nil { return err } - shared.AtomicInt64Max(&lsns[workerId], q.lsn) case <-groupCtx.Done(): return nil } @@ -415,7 +533,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe values := make([]string, 0, len(tableSchema.Columns)) formattedValMap := make(map[string]string, len(orderingKeySlice)) for _, col := range tableSchema.Columns { - colNames = append(colNames, peerdb_clickhouse.QuoteIdentifier(col.Name)) + colNames = append(colNames, col.Name) val := r.Items.GetColumnValue(col.Name) formattedVal := formatQValue(val, tableSchema.NullableEnabled && col.Nullable) values = append(values, formattedVal) @@ -427,13 +545,15 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe io.WriteString(fnvHash, formattedValMap[colName]) } queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ - sql: fmt.Sprintf("INSERT INTO %s(%s,_peerdb_is_deleted,_peerdb_version) VALUES (%s,0,%d)", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), - strings.Join(colNames, ","), strings.Join(values, ","), r.BaseRecord.CommitTimeNano), - lsn: lsn, + ty: queryTypeInsert, + table: r.DestinationTableName, + columns: colNames, + values: values, + lsn: lsn, } case *model.UpdateRecord[model.RecordItems]: - assignments := make([]string, 0, len(tableSchema.Columns)) + columns := make([]string, 0, len(tableSchema.Columns)) + values := make([]string, 0, len(tableSchema.Columns)) for _, col := range tableSchema.Columns { if _, unchanged := r.UnchangedToastColumns[col.Name]; unchanged { continue @@ -442,18 +562,16 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe // CH does not support UPDATE on these columns continue } - assignments = append(assignments, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(col.Name), - formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable), - )) + columns = append(columns, col.Name) + values = append(values, formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable)) } - where := make([]string, 0, len(orderingKeySlice)) + whereValues := make([]string, 0, len(orderingKeySlice)) for _, colName := range orderingKeySlice { item := r.OldItems.GetColumnValue(colName) if item == nil { item = r.NewItems.GetColumnValue(colName) } - var nullable bool + var nullable bool // TODO be a map lookup if tableSchema.NullableEnabled { for _, col := range tableSchema.Columns { if col.Name == colName { @@ -465,24 +583,24 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } } formattedVal := formatQValue(item, nullable) - where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), formattedVal, - )) + whereValues = append(whereValues, formattedVal) if _, isOrdering := orderingKey[colName]; isOrdering { io.WriteString(fnvHash, formattedVal) } } queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ - sql: fmt.Sprintf("UPDATE %s SET %s,_peerdb_is_deleted=0 WHERE %s", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), - strings.Join(assignments, ","), - strings.Join(where, " AND ")), - lsn: lsn, + ty: queryTypeUpdate, + table: r.DestinationTableName, + values: values, + columns: columns, + whereValues: whereValues, + whereColumns: orderingKeySlice, + lsn: lsn, } case *model.DeleteRecord[model.RecordItems]: - where := make([]string, 0, len(tableSchema.PrimaryKeyColumns)) + values := make([]string, 0, len(orderingKeySlice)) for _, colName := range orderingKeySlice { - var nullable bool + var nullable bool // TODO be a map lookup if tableSchema.NullableEnabled { for _, col := range tableSchema.Columns { if col.Name == colName { @@ -494,17 +612,17 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe } } formattedVal := formatQValue(r.Items.GetColumnValue(colName), nullable) - where = append(where, fmt.Sprintf("%s=%s", - peerdb_clickhouse.QuoteIdentifier(colName), formattedVal, - )) + values = append(values, formattedVal) if _, isOrdering := orderingKey[colName]; isOrdering { io.WriteString(fnvHash, formattedVal) } } queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ - sql: fmt.Sprintf("DELETE FROM %s WHERE %s", - peerdb_clickhouse.QuoteIdentifier(r.DestinationTableName), strings.Join(where, " AND ")), - lsn: lsn, + ty: queryTypeDelete, + table: r.DestinationTableName, + whereValues: values, + whereColumns: orderingKeySlice, + lsn: lsn, } } case <-groupCtx.Done(): @@ -519,7 +637,9 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe for _, ch := range queries { close(ch) } - group.Wait() + if err := group.Wait(); err != nil { + return nil, err + } lastCheckpoint := req.Records.GetLastCheckpoint() if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil { diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index c628c73795..cf6539c192 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -263,6 +263,7 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou } else if allowStream { settings["allow_experimental_lightweight_update"] = true settings["async_insert"] = true + settings["lightweight_delete_mode"] = "lightweight_update" } conn, err := clickhouse.Open(&clickhouse.Options{ diff --git a/flow/connectors/clickhouse/validate.go b/flow/connectors/clickhouse/validate.go index 81736a529b..f6d2d8b83a 100644 --- a/flow/connectors/clickhouse/validate.go +++ b/flow/connectors/clickhouse/validate.go @@ -18,8 +18,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( tableNameSchemaMapping map[string]*protos.TableSchema, ) error { if internal.PeerDBOnlyClickHouseAllowed() { - err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) - if err != nil { + if err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database); err != nil { return err } } From 746bb5712b3470484ea9ff7570681a637ec17973 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 2 Oct 2025 12:58:46 +0000 Subject: [PATCH 11/13] use clickhouse normalize parallelism as parallelism parameter for streaming --- flow/connectors/clickhouse/cdc.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 7d3ec7f993..578de88410 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -322,11 +322,17 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe lsn int64 ty int8 } - var lsns [4]atomic.Int64 - var queries [4]chan query + + parallelNormalize, err := internal.PeerDBClickHouseParallelNormalize(ctx, req.Env) + if err != nil { + return nil, err + } + lsns := make([]atomic.Int64, parallelNormalize) + queries := make([]chan query, parallelNormalize) for workerId := range len(queries) { queries[workerId] = make(chan query) } + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) flushLoopDone := make(chan struct{}) go func() { From df8b405167cf6c6f8d130137f68140b59fa6736f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 3 Oct 2025 17:05:03 +0000 Subject: [PATCH 12/13] quote date literal --- flow/connectors/clickhouse/cdc.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 578de88410..25785d0c34 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -254,15 +254,15 @@ func formatQValue(value types.QValue, nullable bool) string { }) case types.QValueArrayDate: return formatSlice(v.Val, func(val time.Time) string { - return val.Format(time.DateOnly) + return peerdb_clickhouse.QuoteLiteral(val.Format(time.DateOnly)) }) case types.QValueArrayTimestamp: return formatSlice(v.Val, func(val time.Time) string { - return val.Format(time.StampNano) + return peerdb_clickhouse.QuoteLiteral(val.Format(time.StampNano)) }) case types.QValueArrayTimestampTZ: return formatSlice(v.Val, func(val time.Time) string { - return val.Format(time.StampNano) + return peerdb_clickhouse.QuoteLiteral(val.Format(time.StampNano)) }) case types.QValueArrayEnum: return formatSlice(v.Val, peerdb_clickhouse.QuoteLiteral) @@ -287,11 +287,11 @@ func formatQValue(value types.QValue, nullable bool) string { case types.QValueNumeric: return v.Val.String() case types.QValueDate: - return v.Val.Format(time.DateOnly) + return peerdb_clickhouse.QuoteLiteral(v.Val.Format(time.DateOnly)) case types.QValueTimestamp: - return v.Val.Format(time.StampNano) + return peerdb_clickhouse.QuoteLiteral(v.Val.Format(time.StampNano)) case types.QValueTimestampTZ: - return v.Val.Format(time.StampNano) + return peerdb_clickhouse.QuoteLiteral(v.Val.Format(time.StampNano)) case types.QValueEnum: return peerdb_clickhouse.QuoteLiteral(v.Val) case types.QValueBytes: From a9bc8d3c31738ab57dac06772f7b7be0b205059f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 3 Oct 2025 17:39:09 +0000 Subject: [PATCH 13/13] avoid division by 0 --- flow/connectors/clickhouse/cdc.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 25785d0c34..c653b36b0f 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -327,6 +327,9 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe if err != nil { return nil, err } + if parallelNormalize < 1 { + parallelNormalize = 1 + } lsns := make([]atomic.Int64, parallelNormalize) queries := make([]chan query, parallelNormalize) for workerId := range len(queries) {