Skip to content

Commit 90b7203

Browse files
committed
pg qrep: avro schema nullable by default
despite our efforts, nulled values still frequently leak through
1 parent a6d0527 commit 90b7203

File tree

7 files changed

+61
-33
lines changed

7 files changed

+61
-33
lines changed

flow/connectors/postgres/qrep.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ func corePullQRepRecords(
418418

419419
if partition.FullTablePartition {
420420
c.logger.Info("pulling full table partition", partitionIdLog)
421-
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
421+
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
422422
config.FlowJobName, partition.PartitionId)
423423
if err != nil {
424424
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
@@ -460,7 +460,8 @@ func corePullQRepRecords(
460460
return 0, 0, err
461461
}
462462

463-
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId)
463+
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version,
464+
config.SnapshotName, config.FlowJobName, partition.PartitionId)
464465
if err != nil {
465466
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
466467
}
@@ -749,7 +750,7 @@ func pullXminRecordStream(
749750
queryArgs = []any{strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)}
750751
}
751752

752-
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
753+
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
753754
config.FlowJobName, partition.PartitionId)
754755
if err != nil {
755756
return 0, 0, 0, fmt.Errorf("failed to create query executor: %w", err)

flow/connectors/postgres/qrep_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
1818
defer connector.Close()
1919

2020
// Create a new QRepQueryExecutor instance
21-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
21+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
2222
require.NoError(b, err, "error while creating QRepQueryExecutor")
2323

2424
// Run the benchmark

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.temporal.io/sdk/log"
1616

1717
"github.com/PeerDB-io/peerdb/flow/generated/protos"
18+
"github.com/PeerDB-io/peerdb/flow/internal"
1819
"github.com/PeerDB-io/peerdb/flow/model"
1920
"github.com/PeerDB-io/peerdb/flow/shared"
2021
"github.com/PeerDB-io/peerdb/flow/shared/datatypes"
@@ -24,19 +25,20 @@ import (
2425
type QRepQueryExecutor struct {
2526
*PostgresConnector
2627
logger log.Logger
28+
env map[string]string
2729
snapshot string
2830
flowJobName string
2931
partitionID string
3032
version uint32
3133
}
3234

33-
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, version uint32,
35+
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32,
3436
flowJobName string, partitionID string,
3537
) (*QRepQueryExecutor, error) {
36-
return c.NewQRepQueryExecutorSnapshot(ctx, version, "", flowJobName, partitionID)
38+
return c.NewQRepQueryExecutorSnapshot(ctx, env, version, "", flowJobName, partitionID)
3739
}
3840

39-
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, version uint32,
41+
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32,
4042
snapshot string, flowJobName string, partitionID string,
4143
) (*QRepQueryExecutor, error) {
4244
if _, err := c.fetchCustomTypeMapping(ctx); err != nil {
@@ -49,6 +51,7 @@ func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, ve
4951
flowJobName: flowJobName,
5052
partitionID: partitionID,
5153
logger: log.With(c.logger, slog.String(string(shared.PartitionIDKey), partitionID)),
54+
env: env,
5255
version: version,
5356
}, nil
5457
}
@@ -73,13 +76,21 @@ func (qe *QRepQueryExecutor) cursorToSchema(
7376
num uint16
7477
}
7578

79+
strictNullable, err := internal.PeerDBNullableStrict(ctx, qe.env)
80+
if err != nil {
81+
return types.QRecordSchema{}, err
82+
}
83+
7684
rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName)
7785
if err != nil {
7886
return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
7987
}
8088
fds := rows.FieldDescriptions()
8189
tableOIDset := make(map[uint32]struct{})
82-
nullPointers := make(map[attId]*bool, len(fds))
90+
var nullPointers map[attId]*bool
91+
if strictNullable {
92+
nullPointers = make(map[attId]*bool, len(fds))
93+
}
8394
qfields := make([]types.QField, len(fds))
8495
for i, fd := range fds {
8596
tableOIDset[fd.TableOID] = struct{}{}
@@ -89,38 +100,42 @@ func (qe *QRepQueryExecutor) cursorToSchema(
89100
qfields[i] = types.QField{
90101
Name: fd.Name,
91102
Type: ctype,
92-
Nullable: false,
103+
Nullable: !strictNullable,
93104
Precision: precision,
94105
Scale: scale,
95106
}
96107
} else {
97108
qfields[i] = types.QField{
98109
Name: fd.Name,
99110
Type: ctype,
100-
Nullable: false,
111+
Nullable: !strictNullable,
101112
}
102113
}
103-
nullPointers[attId{
104-
relid: fd.TableOID,
105-
num: fd.TableAttributeNumber,
106-
}] = &qfields[i].Nullable
114+
if nullPointers != nil {
115+
nullPointers[attId{
116+
relid: fd.TableOID,
117+
num: fd.TableAttributeNumber,
118+
}] = &qfields[i].Nullable
119+
}
107120
}
108121
rows.Close()
109-
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
110122

111-
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
112-
if err != nil {
113-
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
114-
}
123+
if nullPointers != nil {
124+
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
125+
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
126+
if err != nil {
127+
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
128+
}
115129

116-
var att attId
117-
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
118-
if nullPointer, ok := nullPointers[att]; ok {
119-
*nullPointer = true
130+
var att attId
131+
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
132+
if nullPointer, ok := nullPointers[att]; ok {
133+
*nullPointer = true
134+
}
135+
return nil
136+
}); err != nil {
137+
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
120138
}
121-
return nil
122-
}); err != nil {
123-
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
124139
}
125140

126141
return types.NewQRecordSchema(qfields), nil

flow/connectors/postgres/qrep_query_executor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestExecuteAndProcessQuery(t *testing.T) {
5858
fmt.Sprintf("INSERT INTO %s.test(data) VALUES ('testdata')", utils.QuoteIdentifier(schemaName)))
5959
require.NoError(t, err, "error while inserting data")
6060

61-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
61+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
6262
require.NoError(t, err, "error while creating QRepQueryExecutor")
6363

6464
batch, err := qe.ExecuteAndProcessQuery(t.Context(), fmt.Sprintf("SELECT * FROM %s.test", utils.QuoteIdentifier(schemaName)))
@@ -175,7 +175,7 @@ func TestSupportedDataTypes(t *testing.T) {
175175
)
176176
require.NoError(t, err, "error while inserting into test table")
177177

178-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
178+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
179179
require.NoError(t, err, "error while creating QRepQueryExecutor")
180180
// Select the row back out of the table
181181
batch, err := qe.ExecuteAndProcessQuery(t.Context(),
@@ -674,7 +674,7 @@ func TestStringDataTypes(t *testing.T) {
674674
_, err = conn.Exec(ctx, query)
675675
require.NoError(t, err)
676676

677-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
677+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
678678
require.NoError(t, err)
679679
// Select the row back out of the table
680680
batch, err := qe.ExecuteAndProcessQuery(t.Context(),

flow/e2e/pg.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (s *PostgresSource) Exec(ctx context.Context, sql string) error {
198198
}
199199

200200
func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) {
201-
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
201+
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
202202
if err != nil {
203203
return nil, err
204204
}
@@ -211,7 +211,7 @@ func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table strin
211211

212212
// to avoid fetching rows from "child" tables ala Postgres table inheritance
213213
func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) {
214-
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
214+
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
215215
if err != nil {
216216
return nil, err
217217
}
@@ -248,7 +248,7 @@ func RevokePermissionForTableColumns(ctx context.Context, conn *pgx.Conn, tableI
248248
}
249249

250250
func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error) {
251-
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
251+
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
252252
if err != nil {
253253
return nil, err
254254
}

flow/e2e/postgres.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s PeerFlowE2ETestSuitePG) Exec(ctx context.Context, sql string) error {
6363

6464
func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) {
6565
s.t.Helper()
66-
pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), shared.InternalVersion_Latest, "testflow", "testpart")
66+
pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), nil, shared.InternalVersion_Latest, "testflow", "testpart")
6767
if err != nil {
6868
return nil, err
6969
}

flow/internal/dynamicconf.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
119119
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
120120
TargetForSetting: protos.DynconfTarget_ALL,
121121
},
122+
{
123+
Name: "PEERDB_NULLABLE_STRICT",
124+
Description: "Propagate nullability in avro schema during initial load",
125+
DefaultValue: "false",
126+
ValueType: protos.DynconfValueType_BOOL,
127+
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
128+
TargetForSetting: protos.DynconfTarget_ALL,
129+
},
122130
{
123131
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
124132
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
@@ -588,6 +596,10 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
588596
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE")
589597
}
590598

599+
func PeerDBNullableStrict(ctx context.Context, env map[string]string) (bool, error) {
600+
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE_STRICT")
601+
}
602+
591603
func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
592604
format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT")
593605
if err != nil {

0 commit comments

Comments
 (0)