From 90b7203bf734efae61e4dc3ec8f880abe66613f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 16 Oct 2025 19:52:37 +0000 Subject: [PATCH 1/2] pg qrep: avro schema nullable by default despite our efforts, nulled values still frequently leak through --- flow/connectors/postgres/qrep.go | 7 ++- flow/connectors/postgres/qrep_bench_test.go | 2 +- .../postgres/qrep_query_executor.go | 59 ++++++++++++------- .../postgres/qrep_query_executor_test.go | 6 +- flow/e2e/pg.go | 6 +- flow/e2e/postgres.go | 2 +- flow/internal/dynamicconf.go | 12 ++++ 7 files changed, 61 insertions(+), 33 deletions(-) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index d9fc3bd5f3..2120910ebd 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -418,7 +418,7 @@ func corePullQRepRecords( if partition.FullTablePartition { c.logger.Info("pulling full table partition", partitionIdLog) - executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, + executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId) if err != nil { return 0, 0, fmt.Errorf("failed to create query executor: %w", err) @@ -460,7 +460,8 @@ func corePullQRepRecords( return 0, 0, err } - executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId) + executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, + config.SnapshotName, config.FlowJobName, partition.PartitionId) if err != nil { return 0, 0, fmt.Errorf("failed to create query executor: %w", err) } @@ -749,7 +750,7 @@ func pullXminRecordStream( queryArgs = []any{strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)} } - executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, + executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId) if err != nil { return 0, 0, 0, fmt.Errorf("failed to create query executor: %w", err) diff --git a/flow/connectors/postgres/qrep_bench_test.go b/flow/connectors/postgres/qrep_bench_test.go index 9d3bd286ec..2fd547f1c8 100644 --- a/flow/connectors/postgres/qrep_bench_test.go +++ b/flow/connectors/postgres/qrep_bench_test.go @@ -18,7 +18,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) { defer connector.Close() // Create a new QRepQueryExecutor instance - qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part") + qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part") require.NoError(b, err, "error while creating QRepQueryExecutor") // Run the benchmark diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 3969d1b45d..adbf7413df 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -15,6 +15,7 @@ import ( "go.temporal.io/sdk/log" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/model" "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/datatypes" @@ -24,19 +25,20 @@ import ( type QRepQueryExecutor struct { *PostgresConnector logger log.Logger + env map[string]string snapshot string flowJobName string partitionID string version uint32 } -func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, version uint32, +func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32, flowJobName string, partitionID string, ) (*QRepQueryExecutor, error) { - return c.NewQRepQueryExecutorSnapshot(ctx, version, "", flowJobName, partitionID) + return c.NewQRepQueryExecutorSnapshot(ctx, env, version, "", flowJobName, partitionID) } -func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, version uint32, +func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32, snapshot string, flowJobName string, partitionID string, ) (*QRepQueryExecutor, error) { if _, err := c.fetchCustomTypeMapping(ctx); err != nil { @@ -49,6 +51,7 @@ func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, ve flowJobName: flowJobName, partitionID: partitionID, logger: log.With(c.logger, slog.String(string(shared.PartitionIDKey), partitionID)), + env: env, version: version, }, nil } @@ -73,13 +76,21 @@ func (qe *QRepQueryExecutor) cursorToSchema( num uint16 } + strictNullable, err := internal.PeerDBNullableStrict(ctx, qe.env) + if err != nil { + return types.QRecordSchema{}, err + } + rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName) if err != nil { return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err) } fds := rows.FieldDescriptions() tableOIDset := make(map[uint32]struct{}) - nullPointers := make(map[attId]*bool, len(fds)) + var nullPointers map[attId]*bool + if strictNullable { + nullPointers = make(map[attId]*bool, len(fds)) + } qfields := make([]types.QField, len(fds)) for i, fd := range fds { tableOIDset[fd.TableOID] = struct{}{} @@ -89,7 +100,7 @@ func (qe *QRepQueryExecutor) cursorToSchema( qfields[i] = types.QField{ Name: fd.Name, Type: ctype, - Nullable: false, + Nullable: !strictNullable, Precision: precision, Scale: scale, } @@ -97,30 +108,34 @@ func (qe *QRepQueryExecutor) cursorToSchema( qfields[i] = types.QField{ Name: fd.Name, Type: ctype, - Nullable: false, + Nullable: !strictNullable, } } - nullPointers[attId{ - relid: fd.TableOID, - num: fd.TableAttributeNumber, - }] = &qfields[i].Nullable + if nullPointers != nil { + nullPointers[attId{ + relid: fd.TableOID, + num: fd.TableAttributeNumber, + }] = &qfields[i].Nullable + } } rows.Close() - tableOIDs := slices.Collect(maps.Keys(tableOIDset)) - rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs) - if err != nil { - return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err) - } + if nullPointers != nil { + tableOIDs := slices.Collect(maps.Keys(tableOIDset)) + rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs) + if err != nil { + return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err) + } - var att attId - if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error { - if nullPointer, ok := nullPointers[att]; ok { - *nullPointer = true + var att attId + if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error { + if nullPointer, ok := nullPointers[att]; ok { + *nullPointer = true + } + return nil + }); err != nil { + return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err) } - return nil - }); err != nil { - return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err) } return types.NewQRecordSchema(qfields), nil diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index 5cb778e12e..4e5f73da1b 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -58,7 +58,7 @@ func TestExecuteAndProcessQuery(t *testing.T) { fmt.Sprintf("INSERT INTO %s.test(data) VALUES ('testdata')", utils.QuoteIdentifier(schemaName))) require.NoError(t, err, "error while inserting data") - qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part") + qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part") require.NoError(t, err, "error while creating QRepQueryExecutor") batch, err := qe.ExecuteAndProcessQuery(t.Context(), fmt.Sprintf("SELECT * FROM %s.test", utils.QuoteIdentifier(schemaName))) @@ -175,7 +175,7 @@ func TestSupportedDataTypes(t *testing.T) { ) require.NoError(t, err, "error while inserting into test table") - qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part") + qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part") require.NoError(t, err, "error while creating QRepQueryExecutor") // Select the row back out of the table batch, err := qe.ExecuteAndProcessQuery(t.Context(), @@ -674,7 +674,7 @@ func TestStringDataTypes(t *testing.T) { _, err = conn.Exec(ctx, query) require.NoError(t, err) - qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part") + qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part") require.NoError(t, err) // Select the row back out of the table batch, err := qe.ExecuteAndProcessQuery(t.Context(), diff --git a/flow/e2e/pg.go b/flow/e2e/pg.go index 33a2383ebd..c199d139c5 100644 --- a/flow/e2e/pg.go +++ b/flow/e2e/pg.go @@ -198,7 +198,7 @@ func (s *PostgresSource) Exec(ctx context.Context, sql string) error { } func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) { - pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart") + pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart") if err != nil { return nil, err } @@ -211,7 +211,7 @@ func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table strin // to avoid fetching rows from "child" tables ala Postgres table inheritance func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) { - pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart") + pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart") if err != nil { return nil, err } @@ -248,7 +248,7 @@ func RevokePermissionForTableColumns(ctx context.Context, conn *pgx.Conn, tableI } func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error) { - pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart") + pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart") if err != nil { return nil, err } diff --git a/flow/e2e/postgres.go b/flow/e2e/postgres.go index cfcefca65c..399880cc8c 100644 --- a/flow/e2e/postgres.go +++ b/flow/e2e/postgres.go @@ -63,7 +63,7 @@ func (s PeerFlowE2ETestSuitePG) Exec(ctx context.Context, sql string) error { func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) { s.t.Helper() - pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), shared.InternalVersion_Latest, "testflow", "testpart") + pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), nil, shared.InternalVersion_Latest, "testflow", "testpart") if err != nil { return nil, err } diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index 52b8bba6af..3b87a65235 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -119,6 +119,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_NULLABLE_STRICT", + Description: "Propagate nullability in avro schema during initial load", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, + TargetForSetting: protos.DynconfTarget_ALL, + }, { Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit", @@ -588,6 +596,10 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_NULLABLE") } +func PeerDBNullableStrict(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_NULLABLE_STRICT") +} + func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) { format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT") if err != nil { From f641adcad3002f182475dfd3f00b81a3d6920d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 17 Oct 2025 20:08:53 +0000 Subject: [PATCH 2/2] test --- flow/e2e/clickhouse_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/flow/e2e/clickhouse_test.go b/flow/e2e/clickhouse_test.go index 3ea234c241..68d6975abe 100644 --- a/flow/e2e/clickhouse_test.go +++ b/flow/e2e/clickhouse_test.go @@ -1298,6 +1298,14 @@ func (s ClickHouseSuite) Test_Types_CH() { } func (s ClickHouseSuite) Test_InfiniteTimestamp() { + s.testInfiniteTimestamp(false) +} + +func (s ClickHouseSuite) Test_InfiniteTimestampStrict() { + s.testInfiniteTimestamp(true) +} + +func (s ClickHouseSuite) testInfiniteTimestamp(strictNull bool) { if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only applies to postgres") } @@ -1330,6 +1338,9 @@ func (s ClickHouseSuite) Test_InfiniteTimestamp() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) flowConnConfig.DoInitialSnapshot = true flowConnConfig.Env = map[string]string{"PEERDB_NULLABLE": "true"} + if strictNull { + flowConnConfig.Env["PEERDB_NULLABLE_STRICT"] = "true" + } tc := NewTemporalClient(s.t) env := ExecutePeerflow(s.t, tc, flowConnConfig) @@ -1380,6 +1391,14 @@ func (s ClickHouseSuite) Test_InfiniteTimestamp() { } func (s ClickHouseSuite) Test_JSON_Null() { + s.testJSONNull(false) +} + +func (s ClickHouseSuite) Test_JSON_Null_Strict() { + s.testJSONNull(true) +} + +func (s ClickHouseSuite) testJSONNull(strictNull bool) { if _, ok := s.source.(*PostgresSource); !ok { s.t.Skip("only applies to postgres") } @@ -1413,6 +1432,9 @@ func (s ClickHouseSuite) Test_JSON_Null() { flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) flowConnConfig.DoInitialSnapshot = true flowConnConfig.Env = map[string]string{"PEERDB_NULLABLE": "true"} + if strictNull { + flowConnConfig.Env["PEERDB_NULLABLE_STRICT"] = "true" + } tc := NewTemporalClient(s.t) env := ExecutePeerflow(s.t, tc, flowConnConfig)