Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func corePullQRepRecords(

if partition.FullTablePartition {
c.logger.Info("pulling full table partition", partitionIdLog)
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
Expand Down Expand Up @@ -460,7 +460,8 @@ func corePullQRepRecords(
return 0, 0, err
}

executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName, config.FlowJobName, partition.PartitionId)
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version,
config.SnapshotName, config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, 0, fmt.Errorf("failed to create query executor: %w", err)
}
Expand Down Expand Up @@ -749,7 +750,7 @@ func pullXminRecordStream(
queryArgs = []any{strconv.FormatInt(partition.Range.Range.(*protos.PartitionRange_IntRange).IntRange.Start&0xffffffff, 10)}
}

executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Version, config.SnapshotName,
executor, err := c.NewQRepQueryExecutorSnapshot(ctx, config.Env, config.Version, config.SnapshotName,
config.FlowJobName, partition.PartitionId)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to create query executor: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {
defer connector.Close()

// Create a new QRepQueryExecutor instance
qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(b, err, "error while creating QRepQueryExecutor")

// Run the benchmark
Expand Down
59 changes: 37 additions & 22 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
"github.com/PeerDB-io/peerdb/flow/model"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/datatypes"
Expand All @@ -24,19 +25,20 @@ import (
type QRepQueryExecutor struct {
*PostgresConnector
logger log.Logger
env map[string]string
snapshot string
flowJobName string
partitionID string
version uint32
}

func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, version uint32,
func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, env map[string]string, version uint32,
flowJobName string, partitionID string,
) (*QRepQueryExecutor, error) {
return c.NewQRepQueryExecutorSnapshot(ctx, version, "", flowJobName, partitionID)
return c.NewQRepQueryExecutorSnapshot(ctx, env, version, "", flowJobName, partitionID)
}

func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, version uint32,
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, env map[string]string, version uint32,
snapshot string, flowJobName string, partitionID string,
) (*QRepQueryExecutor, error) {
if _, err := c.fetchCustomTypeMapping(ctx); err != nil {
Expand All @@ -49,6 +51,7 @@ func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, ve
flowJobName: flowJobName,
partitionID: partitionID,
logger: log.With(c.logger, slog.String(string(shared.PartitionIDKey), partitionID)),
env: env,
version: version,
}, nil
}
Expand All @@ -73,13 +76,21 @@ func (qe *QRepQueryExecutor) cursorToSchema(
num uint16
}

strictNullable, err := internal.PeerDBNullableStrict(ctx, qe.env)
if err != nil {
return types.QRecordSchema{}, err
}

rows, err := tx.Query(ctx, "FETCH 0 FROM "+cursorName)
if err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to fetch 0 for field descriptions: %w", err)
}
fds := rows.FieldDescriptions()
tableOIDset := make(map[uint32]struct{})
nullPointers := make(map[attId]*bool, len(fds))
var nullPointers map[attId]*bool
if strictNullable {
nullPointers = make(map[attId]*bool, len(fds))
}
qfields := make([]types.QField, len(fds))
for i, fd := range fds {
tableOIDset[fd.TableOID] = struct{}{}
Expand All @@ -89,38 +100,42 @@ func (qe *QRepQueryExecutor) cursorToSchema(
qfields[i] = types.QField{
Name: fd.Name,
Type: ctype,
Nullable: false,
Nullable: !strictNullable,
Precision: precision,
Scale: scale,
}
} else {
qfields[i] = types.QField{
Name: fd.Name,
Type: ctype,
Nullable: false,
Nullable: !strictNullable,
}
}
nullPointers[attId{
relid: fd.TableOID,
num: fd.TableAttributeNumber,
}] = &qfields[i].Nullable
if nullPointers != nil {
nullPointers[attId{
relid: fd.TableOID,
num: fd.TableAttributeNumber,
}] = &qfields[i].Nullable
}
}
rows.Close()
tableOIDs := slices.Collect(maps.Keys(tableOIDset))

rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
if err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
}
if nullPointers != nil {
tableOIDs := slices.Collect(maps.Keys(tableOIDset))
rows, err = tx.Query(ctx, "SELECT a.attrelid,a.attnum FROM pg_attribute a WHERE a.attrelid = ANY($1) AND NOT a.attnotnull", tableOIDs)
if err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to query schema for field descriptions: %w", err)
}

var att attId
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
if nullPointer, ok := nullPointers[att]; ok {
*nullPointer = true
var att attId
if _, err := pgx.ForEachRow(rows, []any{&att.relid, &att.num}, func() error {
if nullPointer, ok := nullPointers[att]; ok {
*nullPointer = true
}
return nil
}); err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
}
return nil
}); err != nil {
return types.QRecordSchema{}, fmt.Errorf("failed to process schema for field descriptions: %w", err)
}

return types.NewQRecordSchema(qfields), nil
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestExecuteAndProcessQuery(t *testing.T) {
fmt.Sprintf("INSERT INTO %s.test(data) VALUES ('testdata')", utils.QuoteIdentifier(schemaName)))
require.NoError(t, err, "error while inserting data")

qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(t, err, "error while creating QRepQueryExecutor")

batch, err := qe.ExecuteAndProcessQuery(t.Context(), fmt.Sprintf("SELECT * FROM %s.test", utils.QuoteIdentifier(schemaName)))
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestSupportedDataTypes(t *testing.T) {
)
require.NoError(t, err, "error while inserting into test table")

qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(t, err, "error while creating QRepQueryExecutor")
// Select the row back out of the table
batch, err := qe.ExecuteAndProcessQuery(t.Context(),
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestStringDataTypes(t *testing.T) {
_, err = conn.Exec(ctx, query)
require.NoError(t, err)

qe, err := connector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "test flow", "test part")
qe, err := connector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "test flow", "test part")
require.NoError(t, err)
// Select the row back out of the table
batch, err := qe.ExecuteAndProcessQuery(t.Context(),
Expand Down
22 changes: 22 additions & 0 deletions flow/e2e/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,14 @@ func (s ClickHouseSuite) Test_Types_CH() {
}

func (s ClickHouseSuite) Test_InfiniteTimestamp() {
s.testInfiniteTimestamp(false)
}

func (s ClickHouseSuite) Test_InfiniteTimestampStrict() {
s.testInfiniteTimestamp(true)
}

func (s ClickHouseSuite) testInfiniteTimestamp(strictNull bool) {
if _, ok := s.source.(*PostgresSource); !ok {
s.t.Skip("only applies to postgres")
}
Expand Down Expand Up @@ -1330,6 +1338,9 @@ func (s ClickHouseSuite) Test_InfiniteTimestamp() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_NULLABLE": "true"}
if strictNull {
flowConnConfig.Env["PEERDB_NULLABLE_STRICT"] = "true"
}

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
Expand Down Expand Up @@ -1380,6 +1391,14 @@ func (s ClickHouseSuite) Test_InfiniteTimestamp() {
}

func (s ClickHouseSuite) Test_JSON_Null() {
s.testJSONNull(false)
}

func (s ClickHouseSuite) Test_JSON_Null_Strict() {
s.testJSONNull(true)
}

func (s ClickHouseSuite) testJSONNull(strictNull bool) {
if _, ok := s.source.(*PostgresSource); !ok {
s.t.Skip("only applies to postgres")
}
Expand Down Expand Up @@ -1413,6 +1432,9 @@ func (s ClickHouseSuite) Test_JSON_Null() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_NULLABLE": "true"}
if strictNull {
flowConnConfig.Env["PEERDB_NULLABLE_STRICT"] = "true"
}

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *PostgresSource) Exec(ctx context.Context, sql string) error {
}

func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) {
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand All @@ -211,7 +211,7 @@ func (s *PostgresSource) GetRows(ctx context.Context, suffix string, table strin

// to avoid fetching rows from "child" tables ala Postgres table inheritance
func (s *PostgresSource) GetRowsOnly(ctx context.Context, suffix string, table string, cols string) (*model.QRecordBatch, error) {
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func RevokePermissionForTableColumns(ctx context.Context, conn *pgx.Conn, tableI
}

func (s *PostgresSource) Query(ctx context.Context, query string) (*model.QRecordBatch, error) {
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.PostgresConnector.NewQRepQueryExecutor(ctx, nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s PeerFlowE2ETestSuitePG) Exec(ctx context.Context, sql string) error {

func (s PeerFlowE2ETestSuitePG) GetRows(table string, cols string) (*model.QRecordBatch, error) {
s.t.Helper()
pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), shared.InternalVersion_Latest, "testflow", "testpart")
pgQueryExecutor, err := s.conn.NewQRepQueryExecutor(s.t.Context(), nil, shared.InternalVersion_Latest, "testflow", "testpart")
if err != nil {
return nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions flow/internal/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_NULLABLE_STRICT",
Description: "Propagate nullability in avro schema during initial load",
DefaultValue: "false",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM",
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
Expand Down Expand Up @@ -588,6 +596,10 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE")
}

func PeerDBNullableStrict(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_NULLABLE_STRICT")
}

func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) {
format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT")
if err != nil {
Expand Down
Loading