Skip to content

Commit 21fc6ec

Browse files
authored
pg qrep: config for avro schema nullable, debug logging (#3750)
Current situation: 1. `failed to sync records: failed to write records to S3: failed to write records to OCF writer: failed to write record to OCF: some_column_100: avro: *avro.null is unsupported for Avro long` occurs once a month for one customer 2. The column in question is a nullable integer, all values in it are null, it is also inherited from a parent table 3. Parent table has same name but different schema 4. Parent and child attnums are different, so possibly column deletions were involved, and also child seems to have become inherited later on. 5. I was unable to reproduce the error with that information. It is possible the schema has changed since, so it would be nice to capture the exact data when we have it. This change is a spiritual successor of #3613, but defaults to strict behavior (which has been working fine for everyone else) and puts the lax one under a config. When lax is enabled, it collects all the inputs that go into deciding whether a column would be nullable under strict behavior, then some extra about table inheritance, and logs it later if any mismatch with strict was detected. Tested that the new logic runs and logs if the code is adjusted to under-do nullable, but as-is the test doesn't do much as the issue is not cleanly reproducible yet. Also adding a generic code notification metric that's easy to emit from anywhere, will set up a non-paging alert on it once this goes in. The plan is to enable the setting just for one service, leave it to bake for another month or two, then check back when the notification fires. After the issue is sorted out, all the null tracking can be removed.
1 parent a7f7b63 commit 21fc6ec

File tree

15 files changed

+386
-36
lines changed

15 files changed

+386
-36
lines changed

flow/connectors/clickhouse/avro_sync.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func (s *ClickHouseAvroSyncMethod) pushDataToS3ForSnapshot(
203203
createChunkedSubstream := func(done *atomic.Bool) (*model.QRecordStream, *model.QRecordAvroChunkSizeTracker) {
204204
substream := model.NewQRecordStream(0)
205205
substream.SetSchema(schema)
206+
substream.SetSchemaDebug(stream.SchemaDebug())
206207
sizeTracker := model.QRecordAvroChunkSizeTracker{TrackUncompressed: trackUncompressed}
207208
go func() {
208209
recordsDone := true

flow/connectors/postgres/qrep.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func corePullQRepRecords(
348348

349349
if partition.FullTablePartition {
350350
c.logger.Info("pulling full table partition", partitionIdLog)
351-
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
351+
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
352352
config.FlowJobName, partition.PartitionId)
353353
if err != nil {
354354
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
@@ -390,7 +390,8 @@ func corePullQRepRecords(
390390
return 0, 0, err
391391
}
392392

393-
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId)
393+
executor, err := c.NewQRepQueryExecutorSnapshot(
394+
ctx, config.Env, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId)
394395
if err != nil {
395396
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
396397
}
@@ -679,7 +680,7 @@ func pullXminRecordStream(
679680
queryArgs = []any{strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)}
680681
}
681682

682-
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
683+
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
683684
config.FlowJobName, partition.PartitionId)
684685
if err != nil {
685686
return 0, 0, 0, fmt.Errorf("failed to create query executor: %w", err)

flow/connectors/postgres/qrep_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
1818
defer connector.Close()
1919

2020
// Create a new QRepQueryExecutor instance
21-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
21+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
2222
require.NoError(b, err, "error while creating QRepQueryExecutor")
2323

2424
// Run the benchmark

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 173 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.temporal.io/sdk/log"
1616

1717
"github.com/PeerDB-io/peerdb/flow/generated/protos"
18+
"github.com/PeerDB-io/peerdb/flow/internal"
1819
"github.com/PeerDB-io/peerdb/flow/model"
1920
"github.com/PeerDB-io/peerdb/flow/shared"
2021
"github.com/PeerDB-io/peerdb/flow/shared/datatypes"
@@ -24,19 +25,20 @@ import (
2425
type QRepQueryExecutor struct {
2526
*PostgresConnector
2627
logger log.Logger
28+
env map[string]string
2729
snapshot string
2830
flowJobName string
2931
partitionID string
3032
version uint32
3133
}
3234

33-
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, version uint32,
35+
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32,
3436
flowJobName string, partitionID string,
3537
) (*QRepQueryExecutor, error) {
36-
return c.NewQRepQueryExecutorSnapshot(ctx, version, "", flowJobName, partitionID)
38+
return c.NewQRepQueryExecutorSnapshot(ctx, env, version, "", flowJobName, partitionID)
3739
}
3840

39-
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, version uint32,
41+
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32,
4042
snapshot string, flowJobName string, partitionID string,
4143
) (*QRepQueryExecutor, error) {
4244
if _, err := c.fetchCustomTypeMapping(ctx); err != nil {
@@ -45,6 +47,7 @@ func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, ve
4547
}
4648
return &QRepQueryExecutor{
4749
PostgresConnector: c,
50+
env: env,
4851
snapshot: snapshot,
4952
flowJobName: flowJobName,
5053
partitionID: partitionID,
@@ -67,63 +70,209 @@ func (qe *QRepQueryExecutor) cursorToSchema(
6770
ctx context.Context,
6871
tx pgx.Tx,
6972
cursorName string,
70-
) (types.QRecordSchema, error) {
71-
type attId struct {
72-
relid uint32
73-
num uint16
73+
) (types.QRecordSchema, *types.NullableSchemaDebug, error) {
74+
laxMode, err := internal.PeerDBAvroNullableLax(ctx, qe.env)
75+
if err != nil {
76+
return types.QRecordSchema{}, nil, err
7477
}
7578

7679
rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName)
7780
if err != nil {
78-
return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
81+
return types.QRecordSchema{}, nil, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
7982
}
8083
fds := rows.FieldDescriptions()
84+
rows.Close()
85+
8186
tableOIDset := make(map[uint32]struct{})
82-
nullPointers := make(map[attId]*bool, len(fds))
8387
qfields := make([]types.QField, len(fds))
88+
89+
// In lax mode: track debug info and map attIds to field indices
90+
// In strict mode: track pointers to nullable fields
91+
var schemaDebug *types.NullableSchemaDebug
92+
var attIdToFieldIdx map[attId][]int // lax mode
93+
var nullPointers map[attId]*bool // strict mode
94+
95+
if laxMode {
96+
schemaDebug = &types.NullableSchemaDebug{
97+
PgxFields: make([]types.PgxFieldDebug, len(fds)),
98+
StrictNullable: make([]bool, len(fds)),
99+
}
100+
attIdToFieldIdx = make(map[attId][]int, len(fds))
101+
} else {
102+
nullPointers = make(map[attId]*bool, len(fds))
103+
}
104+
84105
for i, fd := range fds {
85106
tableOIDset[fd.TableOID] = struct{}{}
86107
ctype := qe.postgresOIDToQValueKind(fd.DataTypeOID, qe.customTypeMapping, qe.version)
108+
87109
if ctype == types.QValueKindNumeric || ctype == types.QValueKindArrayNumeric {
88110
precision, scale := datatypes.ParseNumericTypmod(fd.TypeModifier)
89111
qfields[i] = types.QField{
90112
Name: fd.Name,
91113
Type: ctype,
92-
Nullable: false,
114+
Nullable: laxMode, // lax=true, strict=false (until pg_attribute says otherwise)
93115
Precision: precision,
94116
Scale: scale,
95117
}
96118
} else {
97119
qfields[i] = types.QField{
98120
Name: fd.Name,
99121
Type: ctype,
100-
Nullable: false,
122+
Nullable: laxMode,
123+
}
124+
}
125+
126+
key := attId{relid: fd.TableOID, num: fd.TableAttributeNumber}
127+
if laxMode {
128+
schemaDebug.PgxFields[i] = types.PgxFieldDebug{
129+
Name: fd.Name,
130+
TableOID: fd.TableOID,
131+
TableAttributeNumber: fd.TableAttributeNumber,
132+
DataTypeOID: fd.DataTypeOID,
101133
}
134+
attIdToFieldIdx[key] = append(attIdToFieldIdx[key], i)
135+
} else {
136+
nullPointers[key] = &qfields[i].Nullable
102137
}
103-
nullPointers[attId{
104-
relid: fd.TableOID,
105-
num: fd.TableAttributeNumber,
106-
}] = &qfields[i].Nullable
107138
}
108-
rows.Close()
139+
109140
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
141+
if laxMode {
142+
schemaDebug.QueriedTableOIDs = tableOIDs
143+
}
144+
145+
// Query pg_attribute - different queries for lax vs strict
146+
if laxMode {
147+
if err := qe.populateLaxModeDebugInfo(ctx, tx, tableOIDs, schemaDebug, attIdToFieldIdx); err != nil {
148+
return types.QRecordSchema{}, nil, err
149+
}
150+
} else {
151+
// Strict mode: minimal query, just need nullable columns
152+
rows, err := tx.Query(ctx,
153+
"SELECT a.attrelid, a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull",
154+
tableOIDs)
155+
if err != nil {
156+
return types.QRecordSchema{}, nil, fmt.Errorf("failed to query pg_attribute: %w", err)
157+
}
158+
159+
var att attId
160+
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
161+
if nullPointer, ok := nullPointers[att]; ok {
162+
*nullPointer = true
163+
}
164+
return nil
165+
}); err != nil {
166+
return types.QRecordSchema{}, nil, fmt.Errorf("failed to process pg_attribute: %w", err)
167+
}
168+
}
110169

111-
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
170+
return types.NewQRecordSchema(qfields), schemaDebug, nil
171+
}
172+
173+
type attId struct {
174+
relid uint32
175+
num uint16
176+
}
177+
178+
// populateLaxModeDebugInfo populates debug info for diagnosing nullable mismatches.
179+
// The aim is to capture enough data that the customer can change the schema in any way
180+
// after the snapshot transaction is done and we still have a way to debug.
181+
func (qe *QRepQueryExecutor) populateLaxModeDebugInfo(
182+
ctx context.Context,
183+
tx pgx.Tx,
184+
tableOIDs []uint32,
185+
schemaDebug *types.NullableSchemaDebug,
186+
attIdToFieldIdx map[attId][]int,
187+
) error {
188+
// First, expand tableOIDs to include all parent tables (for full column info)
189+
allTableOIDs := make(map[uint32]struct{})
190+
for _, oid := range tableOIDs {
191+
allTableOIDs[oid] = struct{}{}
192+
}
193+
parentOIDByTableOID := make(map[uint32]uint32)
194+
195+
// Iteratively find all parent tables
196+
oidsToQuery := tableOIDs
197+
for len(oidsToQuery) > 0 {
198+
rows, err := tx.Query(ctx, `SELECT inhrelid, inhparent FROM pg_inherits WHERE inhrelid = ANY($1)`, oidsToQuery)
199+
if err != nil {
200+
return fmt.Errorf("failed to query pg_inherits: %w", err)
201+
}
202+
var childOID, parentOID uint32
203+
var nextOids []uint32
204+
if _, err := pgx.ForEachRow(rows, []any{&childOID, &parentOID}, func() error {
205+
parentOIDByTableOID[childOID] = parentOID
206+
if _, seen := allTableOIDs[parentOID]; !seen {
207+
allTableOIDs[parentOID] = struct{}{}
208+
nextOids = append(nextOids, parentOID)
209+
}
210+
return nil
211+
}); err != nil {
212+
return fmt.Errorf("failed to process pg_inherits: %w", err)
213+
}
214+
oidsToQuery = nextOids
215+
}
216+
217+
allOIDSlice := slices.Collect(maps.Keys(allTableOIDs))
218+
219+
// Query pg_attribute for ALL tables (children + parents)
220+
rows, err := tx.Query(ctx, `
221+
SELECT a.attrelid, a.attnum, a.attname, a.attnotnull, a.atttypid, a.attinhcount, a.attislocal
222+
FROM pg_attribute a
223+
WHERE a.attrelid = ANY($1) AND a.attnum > 0 AND NOT a.attisdropped
224+
ORDER BY a.attrelid, a.attnum`,
225+
allOIDSlice)
112226
if err != nil {
113-
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
227+
return fmt.Errorf("failed to query pg_attribute: %w", err)
114228
}
115229

116-
var att attId
117-
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
118-
if nullPointer, ok := nullPointers[att]; ok {
119-
*nullPointer = true
230+
var row types.PgAttributeDebug
231+
if _, err := pgx.ForEachRow(rows, []any{
232+
&row.AttRelID, &row.AttNum, &row.AttName, &row.AttNotNull, &row.AttTypID, &row.AttInhCount, &row.AttIsLocal,
233+
}, func() error {
234+
schemaDebug.PgAttributeRows = append(schemaDebug.PgAttributeRows, row)
235+
236+
// Compute strict nullable: if NOT attnotnull and matches a field, mark it nullable
237+
if !row.AttNotNull {
238+
key := attId{relid: row.AttRelID, num: uint16(row.AttNum)}
239+
if indices, ok := attIdToFieldIdx[key]; ok {
240+
for _, idx := range indices {
241+
schemaDebug.StrictNullable[idx] = true
242+
}
243+
}
120244
}
121245
return nil
122246
}); err != nil {
123-
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
247+
return fmt.Errorf("failed to process pg_attribute: %w", err)
248+
}
249+
250+
// Query table names and schemas for all tables
251+
rows, err = tx.Query(ctx, `
252+
SELECT c.oid, c.relname, n.nspname
253+
FROM pg_class c
254+
JOIN pg_namespace n ON c.relnamespace = n.oid
255+
WHERE c.oid = ANY($1)`,
256+
allOIDSlice)
257+
if err != nil {
258+
return fmt.Errorf("failed to query pg_class: %w", err)
259+
}
260+
261+
var oid uint32
262+
var tableName, schemaName string
263+
if _, err := pgx.ForEachRow(rows, []any{&oid, &tableName, &schemaName}, func() error {
264+
schemaDebug.Tables = append(schemaDebug.Tables, types.TableDebug{
265+
OID: oid,
266+
TableName: tableName,
267+
SchemaName: schemaName,
268+
ParentOID: parentOIDByTableOID[oid],
269+
})
270+
return nil
271+
}); err != nil {
272+
return fmt.Errorf("failed to process pg_class: %w", err)
124273
}
125274

126-
return types.NewQRecordSchema(qfields), nil
275+
return nil
127276
}
128277

129278
func (qe *QRepQueryExecutor) processRowsStream(

flow/connectors/postgres/qrep_query_executor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestExecuteAndProcessQuery(t *testing.T) {
5858
fmt.Sprintf("INSERT INTO %s.test(data) VALUES ('testdata')", utils.QuoteIdentifier(schemaName)))
5959
require.NoError(t, err, "error while inserting data")
6060

61-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
61+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
6262
require.NoError(t, err, "error while creating QRepQueryExecutor")
6363

6464
batch, err := qe.ExecuteAndProcessQuery(t.Context(), fmt.Sprintf("SELECT * FROM %s.test", utils.QuoteIdentifier(schemaName)))
@@ -175,7 +175,7 @@ func TestSupportedDataTypes(t *testing.T) {
175175
)
176176
require.NoError(t, err, "error while inserting into test table")
177177

178-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
178+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
179179
require.NoError(t, err, "error while creating QRepQueryExecutor")
180180
// Select the row back out of the table
181181
batch, err := qe.ExecuteAndProcessQuery(t.Context(),
@@ -674,7 +674,7 @@ func TestStringDataTypes(t *testing.T) {
674674
_, err = conn.Exec(ctx, query)
675675
require.NoError(t, err)
676676

677-
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
677+
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
678678
require.NoError(t, err)
679679
// Select the row back out of the table
680680
batch, err := qe.ExecuteAndProcessQuery(t.Context(),

flow/connectors/postgres/sink_q.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
6262
slog.Int("channelLen", len(stream.Records)))
6363

6464
if !stream.IsSchemaSet() {
65-
schema, err := qe.cursorToSchema(ctx, tx, cursorName)
65+
schema, schemaDebug, err := qe.cursorToSchema(ctx, tx, cursorName)
6666
if err != nil {
6767
return 0, 0, err
6868
}
6969
stream.SetSchema(schema)
70+
stream.SetSchemaDebug(schemaDebug)
7071
}
7172

7273
var totalNumRows int64

flow/connectors/utils/avro_writer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(
231231
return 0, err
232232
}
233233

234+
// Create null mismatch tracker if in nullable lax mode
235+
avroConverter.NullMismatchTracker = model.NewNullMismatchTracker(p.stream.SchemaDebug())
236+
234237
logger.Info("writing records to OCF start",
235238
slog.Int("channelLen", len(p.stream.Records)))
236239

@@ -281,6 +284,10 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(
281284
slog.String("compression", string(p.avroCompressionCodec)),
282285
)
283286

287+
if avroConverter.NullMismatchTracker != nil {
288+
avroConverter.NullMismatchTracker.LogIfMismatch(ctx, logger)
289+
}
290+
284291
if err := p.stream.Err(); err != nil {
285292
logger.Error("Failed to get record from stream", slog.Any("error", err))
286293
return numRows.Load(), fmt.Errorf("failed to get record from stream: %w", err)

0 commit comments

Comments
 (0)