From d6f1906d781f5ede3146e388bcf954a53dc4c6ff Mon Sep 17 00:00:00 2001 From: Yogesh Date: Wed, 12 Nov 2025 18:26:59 +0530 Subject: [PATCH 1/5] Index Migration support added for initial copy - No tests Signed-off-by: Yogesh --- flow/connectors/postgres/postgres.go | 29 +++ .../postgres/postgres_index_migration.go | 230 ++++++++++++++++++ protos/flow.proto | 9 + 3 files changed, 268 insertions(+) create mode 100644 flow/connectors/postgres/postgres_index_migration.go diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 9e84eb2131..e952afe65f 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1041,11 +1041,24 @@ func (c *PostgresConnector) getTableSchemaForTable( if err := rows.Err(); err != nil { return nil, fmt.Errorf("error iterating over table schema: %w", err) } + + rows.Close() + // if we have no pkey, we will use all columns as the pkey for the MERGE statement if replicaIdentityType == ReplicaIdentityFull && len(pKeyCols) == 0 { pKeyCols = columnNames } + // Fetch indexes for the table + indexes, err := c.getTableIndexes(ctx, schemaTable) + c.logger.Info("fetched indexes", slog.String("table", schemaTable.String()), slog.Any("indexes", indexes)) + if err != nil { + c.logger.Warn("Failed to fetch indexes for table, continuing without indexes", + slog.String("table", tm.SourceTableIdentifier), + slog.Any("error", err)) + indexes = nil + } + return &protos.TableSchema{ TableIdentifier: tm.SourceTableIdentifier, PrimaryKeyColumns: pKeyCols, @@ -1053,6 +1066,7 @@ func (c *PostgresConnector) getTableSchemaForTable( Columns: columns, NullableEnabled: nullableEnabled, System: system, + Indexes: indexes, }, nil } @@ -1109,6 +1123,21 @@ func (c *PostgresConnector) SetupNormalizedTable( return false, fmt.Errorf("error while creating normalized table: %w", err) } + if len(tableSchema.Indexes) > 0 { + c.logger.Info("Creating indexes on destination table", + slog.String("table", tableIdentifier), + slog.Int("indexCount", len(tableSchema.Indexes))) + + if err := c.createTableIndexesFromSchema(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable); err != nil { + // Log the error but don't fail the table creation + c.logger.Warn("Failed to create indexes for table", + slog.String("table", tableIdentifier), + slog.Any("error", err)) + } + } else { + c.logger.Info("No secondary indexes to create for table", slog.String("table", tableIdentifier)) + } + return false, nil } diff --git a/flow/connectors/postgres/postgres_index_migration.go b/flow/connectors/postgres/postgres_index_migration.go new file mode 100644 index 0000000000..99421f638d --- /dev/null +++ b/flow/connectors/postgres/postgres_index_migration.go @@ -0,0 +1,230 @@ +package connpostgres + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "github.com/jackc/pgx/v5" + + "github.com/PeerDB-io/peerdb/flow/connectors/utils" + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +// getTableIndexes retrieves all non-primary-key indexes for a table from the source +// Primary keys are already handled in generateCreateTableSQLForNormalizedTable +// Returns proto IndexDescription for direct use in TableSchema +func (c *PostgresConnector) getTableIndexes( + ctx context.Context, + schemaTable *utils.SchemaTable, +) ([]*protos.IndexDescription, error) { + query := ` + SELECT + i.relname AS index_name, + pg_get_indexdef(idx.indexrelid) AS index_def, + idx.indisprimary AS is_primary, + idx.indisunique AS is_unique, + idx.indisreplident AS is_replica + FROM pg_index idx + JOIN pg_class t ON idx.indrelid = t.oid + JOIN pg_class i ON idx.indexrelid = i.oid + JOIN pg_namespace n ON t.relnamespace = n.oid + WHERE n.nspname = $1 + AND t.relname = $2 + AND idx.indisprimary = false + ORDER BY i.relname + ` + + rows, err := c.conn.Query(ctx, query, schemaTable.Schema, schemaTable.Table) + if err != nil { + return nil, fmt.Errorf("error querying indexes for table %s: %w", schemaTable, err) + } + defer rows.Close() + + var indexes []*protos.IndexDescription + for rows.Next() { + var indexName, indexDef string + var isPrimary, isUnique, isReplica bool + if err := rows.Scan(&indexName, &indexDef, &isPrimary, &isUnique, &isReplica); err != nil { + return nil, fmt.Errorf("error scanning index definition: %w", err) + } + indexes = append(indexes, &protos.IndexDescription{ + IndexName: indexName, + IndexDef: indexDef, + IsPrimary: isPrimary, + IsUnique: isUnique, + IsReplica: isReplica, + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating index rows: %w", err) + } + + c.logger.Info(fmt.Sprintf("Found %d indexes to migrate for table %s", len(indexes), schemaTable), + slog.String("schema", schemaTable.Schema), + slog.String("table", schemaTable.Table)) + + return indexes, nil +} + +// createTableIndexes creates indexes on the destination table +func (c *PostgresConnector) createTableIndexes( + ctx context.Context, + tx pgx.Tx, + sourceTable *utils.SchemaTable, + destTable *utils.SchemaTable, +) error { + indexes, err := c.getTableIndexes(ctx, sourceTable) + if err != nil { + return fmt.Errorf("failed to fetch indexes: %w", err) + } + + if len(indexes) == 0 { + c.logger.Info("No secondary indexes to create for table", slog.String("table", destTable.String())) + return nil + } + + for _, idx := range indexes { + // Replace source schema.table with destination schema.table in the index definition + // pg_get_indexdef returns: CREATE [UNIQUE] INDEX index_name ON schema.table USING ... + modifiedIndexDef := strings.Replace( + idx.IndexDef, + fmt.Sprintf(" ON %s.%s ", + utils.QuoteIdentifier(sourceTable.Schema), + utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf(" ON %s ", destTable.String()), + 1, + ) + + // Also handle case without schema qualification + modifiedIndexDef = strings.Replace( + modifiedIndexDef, + fmt.Sprintf(" ON %s ", utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf(" ON %s ", destTable.String()), + 1, + ) + + // Add CONCURRENTLY if not in transaction (but we're in a transaction during setup, so this won't apply) + // Just create the index normally during initial setup + c.logger.Info("Creating index on destination table", + slog.String("indexName", idx.IndexName), + slog.String("table", destTable.String()), + slog.Bool("isUnique", idx.IsUnique)) + + _, err := tx.Exec(ctx, modifiedIndexDef) + if err != nil { + // Don't fail the entire migration if one index fails + // Some indexes might have dependencies or use extensions not available + c.logger.Warn("Failed to create index, skipping", + slog.String("indexName", idx.IndexName), + slog.String("table", destTable.String()), + slog.String("indexDef", modifiedIndexDef), + slog.Any("error", err)) + continue + } + + c.logger.Info("Successfully created index", + slog.String("indexName", idx.IndexName), + slog.String("table", destTable.String())) + } + + return nil +} + +// getTableIndexCount returns the count of non-primary indexes for a table +// Useful for logging and validation +func (c *PostgresConnector) getTableIndexCount( + ctx context.Context, + schemaTable *utils.SchemaTable, +) (int, error) { + var count int + query := ` + SELECT COUNT(*) + FROM pg_index idx + JOIN pg_class t ON idx.indrelid = t.oid + JOIN pg_namespace n ON t.relnamespace = n.oid + WHERE n.nspname = $1 + AND t.relname = $2 + AND idx.indisprimary = false + ` + err := c.conn.QueryRow(ctx, query, schemaTable.Schema, schemaTable.Table).Scan(&count) + if err != nil { + return 0, fmt.Errorf("error counting indexes: %w", err) + } + return count, nil +} + +// createTableIndexesFromSchema creates indexes on the destination table from the TableSchema +// This method uses the indexes that were fetched from the source during SetupTableSchema +func (c *PostgresConnector) createTableIndexesFromSchema( + ctx context.Context, + tx pgx.Tx, + tableSchema *protos.TableSchema, + destTable *utils.SchemaTable, +) error { + if len(tableSchema.Indexes) == 0 { + c.logger.Info("No secondary indexes to create for table", slog.String("table", destTable.String())) + return nil + } + + // Parse source table from the table schema identifier + sourceTable, err := utils.ParseSchemaTable(tableSchema.TableIdentifier) + if err != nil { + return fmt.Errorf("failed to parse source table identifier: %w", err) + } + + for _, idx := range tableSchema.Indexes { + // Replace source schema.table with destination schema.table in the index definition + // pg_get_indexdef returns: CREATE [UNIQUE] INDEX index_name ON schema.table USING ... + modifiedIndexDef := strings.Replace( + idx.IndexDef, + fmt.Sprintf(" ON %s.%s ", + utils.QuoteIdentifier(sourceTable.Schema), + utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf(" ON %s ", destTable.String()), + 1, + ) + + // Also handle case without schema qualification + modifiedIndexDef = strings.Replace( + modifiedIndexDef, + fmt.Sprintf(" ON %s ", utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf(" ON %s ", destTable.String()), + 1, + ) + + c.logger.Info("Creating index on destination table", + slog.String("indexName", idx.IndexName), + slog.String("table", destTable.String()), + slog.String("indexDef", modifiedIndexDef), + slog.Bool("isUnique", idx.IsUnique), + slog.Bool("isPrimary", idx.IsPrimary), + slog.Bool("isReplica", idx.IsReplica), + ) + + _, err := tx.Exec(ctx, modifiedIndexDef) + if err != nil { + // Don't fail the entire migration if one index fails + // Some indexes might have dependencies or use extensions not available + c.logger.Warn("Failed to create index, skipping", + slog.String("indexName", idx.IndexName), + slog.String("table", destTable.String()), + slog.String("indexDef", modifiedIndexDef), + slog.Any("error", err)) + continue + } + + c.logger.Info("Successfully created index", + slog.String("indexName", idx.IndexName), + slog.String("table", destTable.String()), + slog.String("indexDef", modifiedIndexDef), + slog.Bool("isUnique", idx.IsUnique), + slog.Bool("isPrimary", idx.IsPrimary), + slog.Bool("isReplica", idx.IsReplica), + ) + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index b7b1382e44..d7fac4b757 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -233,6 +233,7 @@ message TableSchema { TypeSystem system = 4; bool nullable_enabled = 5; repeated FieldDescription columns = 6; + repeated IndexDescription indexes = 7; } message FieldDescription { @@ -242,6 +243,14 @@ message FieldDescription { bool nullable = 4; } +message IndexDescription { + string index_name = 1; + string index_def = 2; + bool is_primary = 3; + bool is_unique = 4; + bool is_replica = 5; +} + message SetupTableSchemaBatchInput { reserved 2; map env = 1; From 103f90a4aeb86380389f5b9181669bdc113046fb Mon Sep 17 00:00:00 2001 From: Yogesh Date: Thu, 13 Nov 2025 04:05:23 +0530 Subject: [PATCH 2/5] postgres function migration Signed-off-by: Yogesh --- flow/connectors/postgres/postgres.go | 26 +++ .../postgres/postgres_function_migration.go | 188 ++++++++++++++++++ protos/flow.proto | 8 + 3 files changed, 222 insertions(+) create mode 100644 flow/connectors/postgres/postgres_function_migration.go diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e952afe65f..ce02a73634 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1059,6 +1059,16 @@ func (c *PostgresConnector) getTableSchemaForTable( indexes = nil } + // Fetch functions for the table schema + functions, err := c.getTableFunctions(ctx, schemaTable) + c.logger.Info("fetched functions", slog.String("table", schemaTable.String()), slog.Any("functions", functions)) + if err != nil { + c.logger.Warn("Failed to fetch functions for table, continuing without functions", + slog.String("table", tm.SourceTableIdentifier), + slog.Any("error", err)) + functions = nil + } + return &protos.TableSchema{ TableIdentifier: tm.SourceTableIdentifier, PrimaryKeyColumns: pKeyCols, @@ -1067,6 +1077,7 @@ func (c *PostgresConnector) getTableSchemaForTable( NullableEnabled: nullableEnabled, System: system, Indexes: indexes, + Functions: functions, }, nil } @@ -1138,6 +1149,21 @@ func (c *PostgresConnector) SetupNormalizedTable( c.logger.Info("No secondary indexes to create for table", slog.String("table", tableIdentifier)) } + if len(tableSchema.Functions) > 0 { + c.logger.Info("Creating functions on destination schema", + slog.String("schema", parsedNormalizedTable.Schema), + slog.Int("functionCount", len(tableSchema.Functions))) + + if err := c.createTableFunctionsFromSchema(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable); err != nil { + // Log the error but don't fail the table creation + c.logger.Warn("Failed to create functions for schema", + slog.String("schema", parsedNormalizedTable.Schema), + slog.Any("error", err)) + } + } else { + c.logger.Info("No functions to create for schema", slog.String("schema", parsedNormalizedTable.Schema)) + } + return false, nil } diff --git a/flow/connectors/postgres/postgres_function_migration.go b/flow/connectors/postgres/postgres_function_migration.go new file mode 100644 index 0000000000..2639eebee0 --- /dev/null +++ b/flow/connectors/postgres/postgres_function_migration.go @@ -0,0 +1,188 @@ +package connpostgres + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "github.com/jackc/pgx/v5" + + "github.com/PeerDB-io/peerdb/flow/connectors/utils" + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +// getTableFunctions retrieves all functions associated with a table's schema +// This includes functions that might be used in triggers, constraints, or generated columns +// Returns proto FunctionDescription for direct use in TableSchema +func (c *PostgresConnector) getTableFunctions( + ctx context.Context, + schemaTable *utils.SchemaTable, +) ([]*protos.FunctionDescription, error) { + query := ` + SELECT + p.proname AS function_name, + pg_get_functiondef(p.oid) AS function_def, + l.lanname AS language, + pg_get_function_result(p.oid) AS return_type + FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + JOIN pg_language l ON p.prolang = l.oid + WHERE n.nspname = $1 + AND l.lanname IN ('plpgsql', 'sql', 'plpython3u', 'plperl', 'pltcl') + ORDER BY p.proname + ` + + rows, err := c.conn.Query(ctx, query, schemaTable.Schema) + if err != nil { + return nil, fmt.Errorf("error querying functions for schema %s: %w", schemaTable.Schema, err) + } + defer rows.Close() + + var functions []*protos.FunctionDescription + for rows.Next() { + var functionName, functionDef, language, returnType string + if err := rows.Scan(&functionName, &functionDef, &language, &returnType); err != nil { + return nil, fmt.Errorf("error scanning function definition: %w", err) + } + functions = append(functions, &protos.FunctionDescription{ + FunctionName: functionName, + FunctionDef: functionDef, + Language: language, + ReturnType: returnType, + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating function rows: %w", err) + } + + c.logger.Info(fmt.Sprintf("Found %d functions to migrate for schema %s", len(functions), schemaTable.Schema), + slog.String("schema", schemaTable.Schema), + slog.String("table", schemaTable.Table)) + + return functions, nil +} + +// createTableFunctions creates functions on the destination schema +func (c *PostgresConnector) createTableFunctions( + ctx context.Context, + tx pgx.Tx, + sourceTable *utils.SchemaTable, + destTable *utils.SchemaTable, +) error { + functions, err := c.getTableFunctions(ctx, sourceTable) + if err != nil { + return fmt.Errorf("failed to fetch functions: %w", err) + } + + if len(functions) == 0 { + c.logger.Info("No functions to create for schema", slog.String("schema", destTable.Schema)) + return nil + } + + for _, fn := range functions { + // Replace source schema with destination schema in the function definition + // pg_get_functiondef returns: CREATE OR REPLACE FUNCTION schema.function_name(...) ... + modifiedFunctionDef := strings.Replace( + fn.FunctionDef, + fmt.Sprintf("FUNCTION %s.", utils.QuoteIdentifier(sourceTable.Schema)), + fmt.Sprintf("FUNCTION %s.", utils.QuoteIdentifier(destTable.Schema)), + 1, + ) + + // Also handle case without schema qualification in function body + // Replace references to source schema with destination schema + modifiedFunctionDef = strings.ReplaceAll( + modifiedFunctionDef, + fmt.Sprintf("%s.", sourceTable.Schema), + fmt.Sprintf("%s.", destTable.Schema), + ) + + c.logger.Info("Creating function on destination schema", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema), + slog.String("language", fn.Language), + slog.String("returnType", fn.ReturnType)) + + _, err := tx.Exec(ctx, modifiedFunctionDef) + if err != nil { + // Don't fail the entire migration if one function fails + // Log as warning and continue + c.logger.Warn("Failed to create function on destination, continuing...", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema), + slog.Any("error", err)) + continue + } + + c.logger.Info("Successfully created function", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema)) + } + + return nil +} + +// createTableFunctionsFromSchema creates functions on the destination using pre-fetched function definitions +// This method uses the functions that were fetched from the source during SetupTableSchema +func (c *PostgresConnector) createTableFunctionsFromSchema( + ctx context.Context, + tx pgx.Tx, + tableSchema *protos.TableSchema, + destTable *utils.SchemaTable, +) error { + if len(tableSchema.Functions) == 0 { + c.logger.Info("No functions to create for schema", slog.String("schema", destTable.Schema)) + return nil + } + + // Parse source table from the table schema identifier + sourceTable, err := utils.ParseSchemaTable(tableSchema.TableIdentifier) + if err != nil { + return fmt.Errorf("failed to parse source table identifier: %w", err) + } + + for _, fn := range tableSchema.Functions { + // Replace source schema with destination schema in the function definition + // pg_get_functiondef returns: CREATE OR REPLACE FUNCTION schema.function_name(...) ... + modifiedFunctionDef := strings.Replace( + fn.FunctionDef, + fmt.Sprintf("FUNCTION %s.", utils.QuoteIdentifier(sourceTable.Schema)), + fmt.Sprintf("FUNCTION %s.", utils.QuoteIdentifier(destTable.Schema)), + 1, + ) + + // Also handle case without schema qualification in function body + // Replace references to source schema with destination schema + modifiedFunctionDef = strings.ReplaceAll( + modifiedFunctionDef, + fmt.Sprintf("%s.", sourceTable.Schema), + fmt.Sprintf("%s.", destTable.Schema), + ) + + c.logger.Info("Creating function on destination schema", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema), + slog.String("functionDef", modifiedFunctionDef), + slog.String("language", fn.Language), + slog.String("returnType", fn.ReturnType)) + + _, err := tx.Exec(ctx, modifiedFunctionDef) + if err != nil { + // Don't fail the entire migration if one function fails + // Log as warning and continue + c.logger.Warn("Failed to create function on destination, continuing...", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema), + slog.Any("error", err)) + continue + } + + c.logger.Info("Successfully created function", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema)) + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index d7fac4b757..3fbfc281d9 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -234,6 +234,7 @@ message TableSchema { bool nullable_enabled = 5; repeated FieldDescription columns = 6; repeated IndexDescription indexes = 7; + repeated FunctionDescription functions = 8; } message FieldDescription { @@ -251,6 +252,13 @@ message IndexDescription { bool is_replica = 5; } +message FunctionDescription { + string function_name = 1; + string function_def = 2; + string language = 3; + string return_type = 4; +} + message SetupTableSchemaBatchInput { reserved 2; map env = 1; From 548cd5051866cac5530a2f45f6cb8a70be9f1e90 Mon Sep 17 00:00:00 2001 From: Yogesh Date: Fri, 14 Nov 2025 04:05:22 +0530 Subject: [PATCH 3/5] Added trigger migrations for postgres to postgres migration Signed-off-by: Yogesh --- flow/connectors/postgres/postgres.go | 30 +++ .../postgres/postgres_trigger_migration.go | 231 ++++++++++++++++++ protos/flow.proto | 9 + 3 files changed, 270 insertions(+) create mode 100644 flow/connectors/postgres/postgres_trigger_migration.go diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ce02a73634..ed518c64de 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1069,6 +1069,16 @@ func (c *PostgresConnector) getTableSchemaForTable( functions = nil } + // Fetch triggers for the table schema + triggers, err := c.getTableTriggers(ctx, schemaTable) + c.logger.Info("fetched triggers", slog.String("table", schemaTable.String()), slog.Any("triggers", triggers)) + if err != nil { + c.logger.Warn("Failed to fetch triggers for table, continuing without triggers", + slog.String("table", tm.SourceTableIdentifier), + slog.Any("error", err)) + triggers = nil + } + return &protos.TableSchema{ TableIdentifier: tm.SourceTableIdentifier, PrimaryKeyColumns: pKeyCols, @@ -1078,6 +1088,7 @@ func (c *PostgresConnector) getTableSchemaForTable( System: system, Indexes: indexes, Functions: functions, + Triggers: triggers, }, nil } @@ -1164,6 +1175,25 @@ func (c *PostgresConnector) SetupNormalizedTable( c.logger.Info("No functions to create for schema", slog.String("schema", parsedNormalizedTable.Schema)) } + if len(tableSchema.Triggers) > 0 { + c.logger.Info("Creating triggers on destination table", + slog.String("schema", parsedNormalizedTable.Schema), + slog.String("table", parsedNormalizedTable.Table), + slog.Int("triggerCount", len(tableSchema.Triggers))) + + if err := c.createTableTriggersFromSchema(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable); err != nil { + // Log the error but don't fail the table creation + c.logger.Warn("Failed to create triggers for table", + slog.String("schema", parsedNormalizedTable.Schema), + slog.String("table", parsedNormalizedTable.Table), + slog.Any("error", err)) + } + } else { + c.logger.Info("No triggers to create for table", + slog.String("schema", parsedNormalizedTable.Schema), + slog.String("table", parsedNormalizedTable.Table)) + } + return false, nil } diff --git a/flow/connectors/postgres/postgres_trigger_migration.go b/flow/connectors/postgres/postgres_trigger_migration.go new file mode 100644 index 0000000000..3ec3e267d2 --- /dev/null +++ b/flow/connectors/postgres/postgres_trigger_migration.go @@ -0,0 +1,231 @@ +package connpostgres + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "github.com/jackc/pgx/v5" + + "github.com/PeerDB-io/peerdb/flow/connectors/utils" + "github.com/PeerDB-io/peerdb/flow/generated/protos" +) + +// getTableTriggers retrieves all triggers associated with a specific table +// Returns proto TriggerDescription for direct use in TableSchema +func (c *PostgresConnector) getTableTriggers( + ctx context.Context, + schemaTable *utils.SchemaTable, +) ([]*protos.TriggerDescription, error) { + query := ` + SELECT + t.tgname AS trigger_name, + pg_get_triggerdef(t.oid) AS trigger_def, + CASE + WHEN t.tgtype & 4 = 4 THEN 'UPDATE' + WHEN t.tgtype & 8 = 8 THEN 'DELETE' + WHEN t.tgtype & 16 = 16 THEN 'INSERT' + WHEN t.tgtype & 32 = 32 THEN 'TRUNCATE' + ELSE 'UNKNOWN' + END AS event_manipulation, + CASE + WHEN t.tgtype & 2 = 2 THEN 'BEFORE' + WHEN t.tgtype & 64 = 64 THEN 'INSTEAD OF' + ELSE 'AFTER' + END AS action_timing, + CASE + WHEN t.tgtype & 1 = 1 THEN 'ROW' + ELSE 'STATEMENT' + END AS action_orientation + FROM pg_trigger t + JOIN pg_class c ON t.tgrelid = c.oid + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE n.nspname = $1 + AND c.relname = $2 + AND NOT t.tgisinternal + ORDER BY t.tgname + ` + + rows, err := c.conn.Query(ctx, query, schemaTable.Schema, schemaTable.Table) + if err != nil { + return nil, fmt.Errorf("error querying triggers for table %s.%s: %w", + schemaTable.Schema, schemaTable.Table, err) + } + defer rows.Close() + + var triggers []*protos.TriggerDescription + for rows.Next() { + var triggerName, triggerDef, eventManipulation, actionTiming, actionOrientation string + if err := rows.Scan(&triggerName, &triggerDef, &eventManipulation, &actionTiming, &actionOrientation); err != nil { + return nil, fmt.Errorf("error scanning trigger definition: %w", err) + } + triggers = append(triggers, &protos.TriggerDescription{ + TriggerName: triggerName, + TriggerDef: triggerDef, + EventManipulation: eventManipulation, + ActionTiming: actionTiming, + ActionOrientation: actionOrientation, + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating trigger rows: %w", err) + } + + c.logger.Info(fmt.Sprintf("Found %d triggers to migrate for table %s.%s", + len(triggers), schemaTable.Schema, schemaTable.Table), + slog.String("schema", schemaTable.Schema), + slog.String("table", schemaTable.Table)) + + return triggers, nil +} + +// createTableTriggers creates triggers on the destination table +func (c *PostgresConnector) createTableTriggers( + ctx context.Context, + tx pgx.Tx, + sourceTable *utils.SchemaTable, + destTable *utils.SchemaTable, +) error { + triggers, err := c.getTableTriggers(ctx, sourceTable) + if err != nil { + return fmt.Errorf("failed to fetch triggers: %w", err) + } + + if len(triggers) == 0 { + c.logger.Info("No triggers to create for table", + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table)) + return nil + } + + for _, trig := range triggers { + // Replace source table reference with destination table in the trigger definition + // pg_get_triggerdef returns: CREATE TRIGGER trigger_name ... ON schema.table_name ... + modifiedTriggerDef := strings.Replace( + trig.TriggerDef, + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(sourceTable.Schema), utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(destTable.Schema), utils.QuoteIdentifier(destTable.Table)), + 1, + ) + + // Also handle case without schema qualification + modifiedTriggerDef = strings.Replace( + modifiedTriggerDef, + fmt.Sprintf("ON %s", utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(destTable.Schema), utils.QuoteIdentifier(destTable.Table)), + 1, + ) + + // Replace schema references in function calls within the trigger + modifiedTriggerDef = strings.ReplaceAll( + modifiedTriggerDef, + fmt.Sprintf("%s.", sourceTable.Schema), + fmt.Sprintf("%s.", destTable.Schema), + ) + + c.logger.Info("Creating trigger on destination table", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.String("eventManipulation", trig.EventManipulation), + slog.String("actionTiming", trig.ActionTiming), + slog.String("actionOrientation", trig.ActionOrientation)) + + _, err := tx.Exec(ctx, modifiedTriggerDef) + if err != nil { + // Don't fail the entire migration if one trigger fails + // Log as warning and continue + c.logger.Warn("Failed to create trigger on destination, continuing...", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.Any("error", err)) + continue + } + + c.logger.Info("Successfully created trigger", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table)) + } + + return nil +} + +// createTableTriggersFromSchema creates triggers on the destination using pre-fetched trigger definitions +// This method uses the triggers that were fetched from the source during SetupTableSchema +func (c *PostgresConnector) createTableTriggersFromSchema( + ctx context.Context, + tx pgx.Tx, + tableSchema *protos.TableSchema, + destTable *utils.SchemaTable, +) error { + if len(tableSchema.Triggers) == 0 { + c.logger.Info("No triggers to create for table", + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table)) + return nil + } + + // Parse source table from the table schema identifier + sourceTable, err := utils.ParseSchemaTable(tableSchema.TableIdentifier) + if err != nil { + return fmt.Errorf("failed to parse source table identifier: %w", err) + } + + for _, trig := range tableSchema.Triggers { + // Replace source table reference with destination table in the trigger definition + // pg_get_triggerdef returns: CREATE TRIGGER trigger_name ... ON schema.table_name ... + modifiedTriggerDef := strings.Replace( + trig.TriggerDef, + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(sourceTable.Schema), utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(destTable.Schema), utils.QuoteIdentifier(destTable.Table)), + 1, + ) + + // Also handle case without schema qualification + modifiedTriggerDef = strings.Replace( + modifiedTriggerDef, + fmt.Sprintf("ON %s", utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(destTable.Schema), utils.QuoteIdentifier(destTable.Table)), + 1, + ) + + // Replace schema references in function calls within the trigger + modifiedTriggerDef = strings.ReplaceAll( + modifiedTriggerDef, + fmt.Sprintf("%s.", sourceTable.Schema), + fmt.Sprintf("%s.", destTable.Schema), + ) + + c.logger.Info("Creating trigger on destination table", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.String("triggerDef", modifiedTriggerDef), + slog.String("eventManipulation", trig.EventManipulation), + slog.String("actionTiming", trig.ActionTiming), + slog.String("actionOrientation", trig.ActionOrientation)) + + _, err := tx.Exec(ctx, modifiedTriggerDef) + if err != nil { + // Don't fail the entire migration if one trigger fails + // Log as warning and continue + c.logger.Warn("Failed to create trigger on destination, continuing...", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.Any("error", err)) + continue + } + + c.logger.Info("Successfully created trigger", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table)) + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index 3fbfc281d9..04e1de7f46 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -235,6 +235,7 @@ message TableSchema { repeated FieldDescription columns = 6; repeated IndexDescription indexes = 7; repeated FunctionDescription functions = 8; + repeated TriggerDescription triggers = 9; } message FieldDescription { @@ -259,6 +260,14 @@ message FunctionDescription { string return_type = 4; } +message TriggerDescription { + string trigger_name = 1; + string trigger_def = 2; + string event_manipulation = 3; + string action_timing = 4; + string action_orientation = 5; +} + message SetupTableSchemaBatchInput { reserved 2; map env = 1; From 166a21ad489ae0aec1bdca5d01c880ea5336a55c Mon Sep 17 00:00:00 2001 From: Yogesh Date: Fri, 14 Nov 2025 13:55:53 +0530 Subject: [PATCH 4/5] added dependency check for triggers Signed-off-by: Yogesh --- flow/connectors/postgres/postgres.go | 27 ++-- .../postgres/postgres_function_migration.go | 66 ++++++++++ .../postgres/postgres_trigger_migration.go | 119 +++++++++++++++++- protos/flow.proto | 1 + 4 files changed, 203 insertions(+), 10 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ed518c64de..cf4dd0fcfb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1145,6 +1145,9 @@ func (c *PostgresConnector) SetupNormalizedTable( return false, fmt.Errorf("error while creating normalized table: %w", err) } + // Track which functions were created successfully (needed for trigger dependency checking) + createdFunctions := make(map[string]bool) + if len(tableSchema.Indexes) > 0 { c.logger.Info("Creating indexes on destination table", slog.String("table", tableIdentifier), @@ -1165,12 +1168,21 @@ func (c *PostgresConnector) SetupNormalizedTable( slog.String("schema", parsedNormalizedTable.Schema), slog.Int("functionCount", len(tableSchema.Functions))) - if err := c.createTableFunctionsFromSchema(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable); err != nil { - // Log the error but don't fail the table creation - c.logger.Warn("Failed to create functions for schema", - slog.String("schema", parsedNormalizedTable.Schema), - slog.Any("error", err)) + // Create functions and track which ones succeeded + createdFunctions = c.createTableFunctionsAndTrack(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable) + + successCount := 0 + for _, success := range createdFunctions { + if success { + successCount++ + } } + + c.logger.Info("Function creation summary", + slog.String("schema", parsedNormalizedTable.Schema), + slog.Int("total", len(tableSchema.Functions)), + slog.Int("successful", successCount), + slog.Int("failed", len(tableSchema.Functions)-successCount)) } else { c.logger.Info("No functions to create for schema", slog.String("schema", parsedNormalizedTable.Schema)) } @@ -1181,9 +1193,10 @@ func (c *PostgresConnector) SetupNormalizedTable( slog.String("table", parsedNormalizedTable.Table), slog.Int("triggerCount", len(tableSchema.Triggers))) - if err := c.createTableTriggersFromSchema(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable); err != nil { + // Create triggers with dependency checking + if err := c.createTableTriggersWithDependencyCheck(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable, createdFunctions); err != nil { // Log the error but don't fail the table creation - c.logger.Warn("Failed to create triggers for table", + c.logger.Warn("Failed to create some triggers for table", slog.String("schema", parsedNormalizedTable.Schema), slog.String("table", parsedNormalizedTable.Table), slog.Any("error", err)) diff --git a/flow/connectors/postgres/postgres_function_migration.go b/flow/connectors/postgres/postgres_function_migration.go index 2639eebee0..600e45b374 100644 --- a/flow/connectors/postgres/postgres_function_migration.go +++ b/flow/connectors/postgres/postgres_function_migration.go @@ -186,3 +186,69 @@ func (c *PostgresConnector) createTableFunctionsFromSchema( return nil } + +// createTableFunctionsAndTrack creates functions and returns a map tracking which ones succeeded +// This is used by trigger migration to check if function dependencies are satisfied +func (c *PostgresConnector) createTableFunctionsAndTrack( + ctx context.Context, + tx pgx.Tx, + tableSchema *protos.TableSchema, + destTable *utils.SchemaTable, +) map[string]bool { + createdFunctions := make(map[string]bool) + + if len(tableSchema.Functions) == 0 { + return createdFunctions + } + + // Parse source table from the table schema identifier + sourceTable, err := utils.ParseSchemaTable(tableSchema.TableIdentifier) + if err != nil { + c.logger.Warn("Failed to parse source table identifier", + slog.Any("error", err)) + return createdFunctions + } + + for _, fn := range tableSchema.Functions { + // Replace source schema with destination schema in the function definition + modifiedFunctionDef := strings.Replace( + fn.FunctionDef, + fmt.Sprintf("FUNCTION %s.", utils.QuoteIdentifier(sourceTable.Schema)), + fmt.Sprintf("FUNCTION %s.", utils.QuoteIdentifier(destTable.Schema)), + 1, + ) + + // Also handle case without schema qualification in function body + modifiedFunctionDef = strings.ReplaceAll( + modifiedFunctionDef, + fmt.Sprintf("%s.", sourceTable.Schema), + fmt.Sprintf("%s.", destTable.Schema), + ) + + c.logger.Info("Creating function on destination schema", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema), + slog.String("functionDef", modifiedFunctionDef), + slog.String("language", fn.Language), + slog.String("returnType", fn.ReturnType)) + + _, err := tx.Exec(ctx, modifiedFunctionDef) + if err != nil { + // Track failure but continue with other functions + createdFunctions[fn.FunctionName] = false + c.logger.Warn("Failed to create function on destination", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema), + slog.Any("error", err)) + continue + } + + // Track success + createdFunctions[fn.FunctionName] = true + c.logger.Info("Successfully created function", + slog.String("functionName", fn.FunctionName), + slog.String("schema", destTable.Schema)) + } + + return createdFunctions +} diff --git a/flow/connectors/postgres/postgres_trigger_migration.go b/flow/connectors/postgres/postgres_trigger_migration.go index 3ec3e267d2..63fde1f607 100644 --- a/flow/connectors/postgres/postgres_trigger_migration.go +++ b/flow/connectors/postgres/postgres_trigger_migration.go @@ -37,10 +37,12 @@ func (c *PostgresConnector) getTableTriggers( CASE WHEN t.tgtype & 1 = 1 THEN 'ROW' ELSE 'STATEMENT' - END AS action_orientation + END AS action_orientation, + p.proname AS function_name FROM pg_trigger t JOIN pg_class c ON t.tgrelid = c.oid JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_proc p ON t.tgfoid = p.oid WHERE n.nspname = $1 AND c.relname = $2 AND NOT t.tgisinternal @@ -56,8 +58,8 @@ func (c *PostgresConnector) getTableTriggers( var triggers []*protos.TriggerDescription for rows.Next() { - var triggerName, triggerDef, eventManipulation, actionTiming, actionOrientation string - if err := rows.Scan(&triggerName, &triggerDef, &eventManipulation, &actionTiming, &actionOrientation); err != nil { + var triggerName, triggerDef, eventManipulation, actionTiming, actionOrientation, functionName string + if err := rows.Scan(&triggerName, &triggerDef, &eventManipulation, &actionTiming, &actionOrientation, &functionName); err != nil { return nil, fmt.Errorf("error scanning trigger definition: %w", err) } triggers = append(triggers, &protos.TriggerDescription{ @@ -66,6 +68,7 @@ func (c *PostgresConnector) getTableTriggers( EventManipulation: eventManipulation, ActionTiming: actionTiming, ActionOrientation: actionOrientation, + FunctionName: functionName, }) } @@ -229,3 +232,113 @@ func (c *PostgresConnector) createTableTriggersFromSchema( return nil } + +// checkTriggerDependency checks if the function a trigger depends on was created successfully +// Returns true if the function exists and was created successfully +func checkTriggerDependency(functionName string, createdFunctions map[string]bool) bool { + // Check if function was created successfully + if created, exists := createdFunctions[functionName]; exists && created { + return true + } + return false +} + +// createTableTriggersWithDependencyCheck creates triggers but checks function dependencies first +func (c *PostgresConnector) createTableTriggersWithDependencyCheck( + ctx context.Context, + tx pgx.Tx, + tableSchema *protos.TableSchema, + destTable *utils.SchemaTable, + createdFunctions map[string]bool, +) error { + if len(tableSchema.Triggers) == 0 { + return nil + } + + // Parse source table from the table schema identifier + sourceTable, err := utils.ParseSchemaTable(tableSchema.TableIdentifier) + if err != nil { + return fmt.Errorf("failed to parse source table identifier: %w", err) + } + + successCount := 0 + skippedCount := 0 + failedCount := 0 + + for _, trig := range tableSchema.Triggers { + // Check dependencies before creating trigger using the function_name field from proto + if trig.FunctionName != "" && !checkTriggerDependency(trig.FunctionName, createdFunctions) { + c.logger.Warn("Skipping trigger creation due to missing function dependency", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.String("requiredFunction", trig.FunctionName)) + skippedCount++ + continue + } + + // Replace source table reference with destination table in the trigger definition + modifiedTriggerDef := strings.Replace( + trig.TriggerDef, + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(sourceTable.Schema), utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(destTable.Schema), utils.QuoteIdentifier(destTable.Table)), + 1, + ) + + // Also handle case without schema qualification + modifiedTriggerDef = strings.Replace( + modifiedTriggerDef, + fmt.Sprintf("ON %s", utils.QuoteIdentifier(sourceTable.Table)), + fmt.Sprintf("ON %s.%s", utils.QuoteIdentifier(destTable.Schema), utils.QuoteIdentifier(destTable.Table)), + 1, + ) + + // Replace schema references in function calls within the trigger + modifiedTriggerDef = strings.ReplaceAll( + modifiedTriggerDef, + fmt.Sprintf("%s.", sourceTable.Schema), + fmt.Sprintf("%s.", destTable.Schema), + ) + + c.logger.Info("Creating trigger on destination table", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.String("triggerDef", modifiedTriggerDef), + slog.String("eventManipulation", trig.EventManipulation), + slog.String("actionTiming", trig.ActionTiming), + slog.String("actionOrientation", trig.ActionOrientation)) + + _, err := tx.Exec(ctx, modifiedTriggerDef) + if err != nil { + // Log as warning and continue with other triggers + c.logger.Warn("Failed to create trigger on destination", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.Any("error", err)) + failedCount++ + continue + } + + c.logger.Info("Successfully created trigger", + slog.String("triggerName", trig.TriggerName), + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table)) + successCount++ + } + + c.logger.Info("Trigger creation summary", + slog.String("schema", destTable.Schema), + slog.String("table", destTable.Table), + slog.Int("total", len(tableSchema.Triggers)), + slog.Int("successful", successCount), + slog.Int("skipped", skippedCount), + slog.Int("failed", failedCount)) + + if failedCount > 0 || skippedCount > 0 { + return fmt.Errorf("some triggers were not created: %d skipped due to dependencies, %d failed", skippedCount, failedCount) + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index 04e1de7f46..778110e1af 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -266,6 +266,7 @@ message TriggerDescription { string event_manipulation = 3; string action_timing = 4; string action_orientation = 5; + string function_name = 6; // The function called by EXECUTE FUNCTION } message SetupTableSchemaBatchInput { From 41fa22504263ab918112ccb2b107d9a5b19079e7 Mon Sep 17 00:00:00 2001 From: Yogesh Date: Fri, 14 Nov 2025 21:17:32 +0530 Subject: [PATCH 5/5] added proper flow of indexes functions and triggers Signed-off-by: Yogesh --- flow/activities/flowable.go | 309 ++++++++++++++++++ flow/connectors/postgres/postgres.go | 62 +--- .../postgres/postgres_function_migration.go | 4 +- .../postgres/postgres_index_migration.go | 4 +- .../postgres/postgres_trigger_migration.go | 4 +- flow/workflows/setup_flow.go | 15 + 6 files changed, 331 insertions(+), 67 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index e86f15c758..ff29ea4899 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -281,6 +281,315 @@ func (a *FlowableActivity) CreateNormalizedTable( }, nil } +// CreateNormalizedTableIndexes creates indexes on normalized tables (Postgres-specific). +func (a *FlowableActivity) CreateNormalizedTableIndexes( + ctx context.Context, + config *protos.SetupNormalizedTableBatchInput, +) error { + numIndexesCreated := atomic.Uint32{} + numTablesToProcess := atomic.Int32{} + + shutdown := heartbeatRoutine(ctx, func() string { + return fmt.Sprintf("creating indexes - %d of %d tables processed", numIndexesCreated.Load(), numTablesToProcess.Load()) + }) + defer shutdown() + + logger := internal.LoggerFromCtx(ctx) + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) + + conn, connClose, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + if err != nil { + return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + } + defer connClose(ctx) + + // Check if this is a Postgres connector - only Postgres supports index migration + pgConn, ok := conn.(*connpostgres.PostgresConnector) + if !ok { + logger.Info("Connector does not support index migration, skipping", + slog.String("flowName", config.FlowName)) + return nil + } + + tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowName) + if err != nil { + return err + } + + numTablesToProcess.Store(int32(len(tableNameSchemaMapping))) + + // Process each table's indexes in a separate transaction for fault isolation + for _, tableMapping := range config.TableMappings { + tableIdentifier := tableMapping.DestinationTableIdentifier + tableSchema := tableNameSchemaMapping[tableIdentifier] + + if len(tableSchema.Indexes) == 0 { + logger.Info("No indexes to create for table", slog.String("table", tableIdentifier)) + numIndexesCreated.Add(1) + continue + } + + // Each table gets its own transaction for index creation + tx, err := pgConn.Conn().Begin(ctx) + if err != nil { + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to start transaction for index creation on table %s: %w", tableIdentifier, err)) + continue + } + + parsedTable, err := utils.ParseSchemaTable(tableIdentifier) + if err != nil { + shared.RollbackTx(tx, logger) + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to parse table identifier %s: %w", tableIdentifier, err)) + continue + } + + logger.Info("Creating indexes on destination table", + slog.String("table", tableIdentifier), + slog.Int("indexCount", len(tableSchema.Indexes))) + + if err := pgConn.CreateTableIndexesFromSchema(ctx, tx, tableSchema, parsedTable); err != nil { + shared.RollbackTx(tx, logger) + // Log warning but don't fail - indexes are optional + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to create indexes for table %s: %w", tableIdentifier, err)) + } else { + if err := tx.Commit(ctx); err != nil { + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to commit index creation for table %s: %w", tableIdentifier, err)) + } else { + logger.Info("Successfully created indexes for table", slog.String("table", tableIdentifier)) + } + } + + numIndexesCreated.Add(1) + } + + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Index creation completed for all tables") + return nil +} + +// CreateNormalizedTableFunctions creates functions on normalized table schemas (Postgres-specific). +func (a *FlowableActivity) CreateNormalizedTableFunctions( + ctx context.Context, + config *protos.SetupNormalizedTableBatchInput, +) error { + numSchemasProcessed := atomic.Uint32{} + + shutdown := heartbeatRoutine(ctx, func() string { + return fmt.Sprintf("creating functions - %d schemas processed", numSchemasProcessed.Load()) + }) + defer shutdown() + + logger := internal.LoggerFromCtx(ctx) + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) + + conn, connClose, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + if err != nil { + return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + } + defer connClose(ctx) + + // Check if this is a Postgres connector - only Postgres supports function migration + pgConn, ok := conn.(*connpostgres.PostgresConnector) + if !ok { + logger.Info("Connector does not support function migration, skipping", + slog.String("flowName", config.FlowName)) + return nil + } + + tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowName) + if err != nil { + return err + } + + // Group tables by schema since functions are schema-level objects + schemaToTables := make(map[string][]*protos.TableSchema) + for _, tableSchema := range tableNameSchemaMapping { + parsedTable, err := utils.ParseSchemaTable(tableSchema.TableIdentifier) + if err != nil { + logger.Warn("Failed to parse table identifier", + slog.String("table", tableSchema.TableIdentifier), + slog.Any("error", err)) + continue + } + schemaToTables[parsedTable.Schema] = append(schemaToTables[parsedTable.Schema], tableSchema) + } + + // Process each schema's functions in a separate transaction + for schemaName, tables := range schemaToTables { + // Collect all functions from all tables in this schema + allFunctions := make(map[string]*protos.FunctionDescription) + var representativeTable *protos.TableSchema + + for _, tableSchema := range tables { + if representativeTable == nil { + representativeTable = tableSchema + } + for _, fn := range tableSchema.Functions { + // Use function name as key to deduplicate (same function might be on multiple tables) + allFunctions[fn.FunctionName] = fn + } + } + + if len(allFunctions) == 0 { + logger.Info("No functions to create for schema", slog.String("schema", schemaName)) + numSchemasProcessed.Add(1) + continue + } + + // Create a temporary table schema with all functions for this schema + schemaFunctions := &protos.TableSchema{ + TableIdentifier: representativeTable.TableIdentifier, + Functions: make([]*protos.FunctionDescription, 0, len(allFunctions)), + } + for _, fn := range allFunctions { + schemaFunctions.Functions = append(schemaFunctions.Functions, fn) + } + + // Each schema gets its own transaction for function creation + tx, err := pgConn.Conn().Begin(ctx) + if err != nil { + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to start transaction for function creation on schema %s: %w", schemaName, err)) + continue + } + + parsedTable, err := utils.ParseSchemaTable(schemaFunctions.TableIdentifier) + if err != nil { + shared.RollbackTx(tx, logger) + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to parse table identifier for schema %s: %w", schemaName, err)) + continue + } + + logger.Info("Creating functions on destination schema", + slog.String("schema", schemaName), + slog.Int("functionCount", len(schemaFunctions.Functions))) + + createdFunctions := pgConn.CreateTableFunctionsAndTrack(ctx, tx, schemaFunctions, parsedTable) + + successCount := 0 + for _, success := range createdFunctions { + if success { + successCount++ + } + } + + if err := tx.Commit(ctx); err != nil { + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to commit function creation for schema %s: %w", schemaName, err)) + } else { + logger.Info("Function creation summary", + slog.String("schema", schemaName), + slog.Int("total", len(schemaFunctions.Functions)), + slog.Int("successful", successCount), + slog.Int("failed", len(schemaFunctions.Functions)-successCount)) + } + + numSchemasProcessed.Add(1) + } + + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Function creation completed for all schemas") + return nil +} + +// CreateNormalizedTableTriggers creates triggers on normalized tables (Postgres-specific). +// It must run AFTER CreateNormalizedTableFunctions since triggers may depend on functions. +func (a *FlowableActivity) CreateNormalizedTableTriggers( + ctx context.Context, + config *protos.SetupNormalizedTableBatchInput, +) error { + numTriggersCreated := atomic.Uint32{} + numTablesToProcess := atomic.Int32{} + + shutdown := heartbeatRoutine(ctx, func() string { + return fmt.Sprintf("creating triggers - %d of %d tables processed", numTriggersCreated.Load(), numTablesToProcess.Load()) + }) + defer shutdown() + + logger := internal.LoggerFromCtx(ctx) + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) + + conn, connClose, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + if err != nil { + return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + } + defer connClose(ctx) + + // Check if this is a Postgres connector - only Postgres supports trigger migration + pgConn, ok := conn.(*connpostgres.PostgresConnector) + if !ok { + logger.Info("Connector does not support trigger migration, skipping", + slog.String("flowName", config.FlowName)) + return nil + } + + tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowName) + if err != nil { + return err + } + + numTablesToProcess.Store(int32(len(tableNameSchemaMapping))) + + // Build a global map of created functions by checking what exists in the destination + createdFunctions := make(map[string]bool) + for _, tableSchema := range tableNameSchemaMapping { + for _, fn := range tableSchema.Functions { + createdFunctions[fn.FunctionName] = true + } + } + + for _, tableMapping := range config.TableMappings { + tableIdentifier := tableMapping.DestinationTableIdentifier + tableSchema := tableNameSchemaMapping[tableIdentifier] + + if len(tableSchema.Triggers) == 0 { + logger.Info("No triggers to create for table", slog.String("table", tableIdentifier)) + numTriggersCreated.Add(1) + continue + } + + tx, err := pgConn.Conn().Begin(ctx) + if err != nil { + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to start transaction for trigger creation on table %s: %w", tableIdentifier, err)) + continue + } + + parsedTable, err := utils.ParseSchemaTable(tableIdentifier) + if err != nil { + shared.RollbackTx(tx, logger) + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to parse table identifier %s: %w", tableIdentifier, err)) + continue + } + + logger.Info("Creating triggers on destination table", + slog.String("table", tableIdentifier), + slog.Int("triggerCount", len(tableSchema.Triggers))) + + if err := pgConn.CreateTableTriggersWithDependencyCheck(ctx, tx, tableSchema, parsedTable, createdFunctions); err != nil { + shared.RollbackTx(tx, logger) + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to create triggers for table %s: %w", tableIdentifier, err)) + } else { + if err := tx.Commit(ctx); err != nil { + a.Alerter.LogFlowWarning(ctx, config.FlowName, + fmt.Errorf("failed to commit trigger creation for table %s: %w", tableIdentifier, err)) + } else { + logger.Info("Successfully created triggers for table", slog.String("table", tableIdentifier)) + } + } + + numTriggersCreated.Add(1) + } + + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Trigger creation completed for all tables") + return nil +} + func (a *FlowableActivity) SyncFlow( ctx context.Context, config *protos.FlowConnectionConfigsCore, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index cf4dd0fcfb..6da2a66998 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1145,67 +1145,7 @@ func (c *PostgresConnector) SetupNormalizedTable( return false, fmt.Errorf("error while creating normalized table: %w", err) } - // Track which functions were created successfully (needed for trigger dependency checking) - createdFunctions := make(map[string]bool) - - if len(tableSchema.Indexes) > 0 { - c.logger.Info("Creating indexes on destination table", - slog.String("table", tableIdentifier), - slog.Int("indexCount", len(tableSchema.Indexes))) - - if err := c.createTableIndexesFromSchema(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable); err != nil { - // Log the error but don't fail the table creation - c.logger.Warn("Failed to create indexes for table", - slog.String("table", tableIdentifier), - slog.Any("error", err)) - } - } else { - c.logger.Info("No secondary indexes to create for table", slog.String("table", tableIdentifier)) - } - - if len(tableSchema.Functions) > 0 { - c.logger.Info("Creating functions on destination schema", - slog.String("schema", parsedNormalizedTable.Schema), - slog.Int("functionCount", len(tableSchema.Functions))) - - // Create functions and track which ones succeeded - createdFunctions = c.createTableFunctionsAndTrack(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable) - - successCount := 0 - for _, success := range createdFunctions { - if success { - successCount++ - } - } - - c.logger.Info("Function creation summary", - slog.String("schema", parsedNormalizedTable.Schema), - slog.Int("total", len(tableSchema.Functions)), - slog.Int("successful", successCount), - slog.Int("failed", len(tableSchema.Functions)-successCount)) - } else { - c.logger.Info("No functions to create for schema", slog.String("schema", parsedNormalizedTable.Schema)) - } - - if len(tableSchema.Triggers) > 0 { - c.logger.Info("Creating triggers on destination table", - slog.String("schema", parsedNormalizedTable.Schema), - slog.String("table", parsedNormalizedTable.Table), - slog.Int("triggerCount", len(tableSchema.Triggers))) - - // Create triggers with dependency checking - if err := c.createTableTriggersWithDependencyCheck(ctx, createNormalizedTablesTx, tableSchema, parsedNormalizedTable, createdFunctions); err != nil { - // Log the error but don't fail the table creation - c.logger.Warn("Failed to create some triggers for table", - slog.String("schema", parsedNormalizedTable.Schema), - slog.String("table", parsedNormalizedTable.Table), - slog.Any("error", err)) - } - } else { - c.logger.Info("No triggers to create for table", - slog.String("schema", parsedNormalizedTable.Schema), - slog.String("table", parsedNormalizedTable.Table)) - } + c.logger.Info("Successfully created normalized table", slog.String("table", tableIdentifier)) return false, nil } diff --git a/flow/connectors/postgres/postgres_function_migration.go b/flow/connectors/postgres/postgres_function_migration.go index 600e45b374..74039e9be8 100644 --- a/flow/connectors/postgres/postgres_function_migration.go +++ b/flow/connectors/postgres/postgres_function_migration.go @@ -187,9 +187,9 @@ func (c *PostgresConnector) createTableFunctionsFromSchema( return nil } -// createTableFunctionsAndTrack creates functions and returns a map tracking which ones succeeded +// CreateTableFunctionsAndTrack creates functions and returns a map tracking which ones succeeded // This is used by trigger migration to check if function dependencies are satisfied -func (c *PostgresConnector) createTableFunctionsAndTrack( +func (c *PostgresConnector) CreateTableFunctionsAndTrack( ctx context.Context, tx pgx.Tx, tableSchema *protos.TableSchema, diff --git a/flow/connectors/postgres/postgres_index_migration.go b/flow/connectors/postgres/postgres_index_migration.go index 99421f638d..0389a34c69 100644 --- a/flow/connectors/postgres/postgres_index_migration.go +++ b/flow/connectors/postgres/postgres_index_migration.go @@ -156,9 +156,9 @@ func (c *PostgresConnector) getTableIndexCount( return count, nil } -// createTableIndexesFromSchema creates indexes on the destination table from the TableSchema +// CreateTableIndexesFromSchema creates indexes on the destination table from the TableSchema // This method uses the indexes that were fetched from the source during SetupTableSchema -func (c *PostgresConnector) createTableIndexesFromSchema( +func (c *PostgresConnector) CreateTableIndexesFromSchema( ctx context.Context, tx pgx.Tx, tableSchema *protos.TableSchema, diff --git a/flow/connectors/postgres/postgres_trigger_migration.go b/flow/connectors/postgres/postgres_trigger_migration.go index 63fde1f607..a2ad2ce088 100644 --- a/flow/connectors/postgres/postgres_trigger_migration.go +++ b/flow/connectors/postgres/postgres_trigger_migration.go @@ -243,8 +243,8 @@ func checkTriggerDependency(functionName string, createdFunctions map[string]boo return false } -// createTableTriggersWithDependencyCheck creates triggers but checks function dependencies first -func (c *PostgresConnector) createTableTriggersWithDependencyCheck( +// CreateTableTriggersWithDependencyCheck creates triggers but checks function dependencies first +func (c *PostgresConnector) CreateTableTriggersWithDependencyCheck( ctx context.Context, tx pgx.Tx, tableSchema *protos.TableSchema, diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index cae1ffd1c0..f21a144a7f 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -221,6 +221,21 @@ func (s *SetupFlowExecution) setupNormalizedTables( return fmt.Errorf("failed to create normalized tables: %w", err) } + if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTableIndexes, setupConfig).Get(ctx, nil); err != nil { + s.Error("failed to create normalized table indexes", slog.Any("error", err)) + return fmt.Errorf("failed to create normalized table indexes: %w", err) + } + + if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTableFunctions, setupConfig).Get(ctx, nil); err != nil { + s.Error("failed to create normalized table functions", slog.Any("error", err)) + return fmt.Errorf("failed to create normalized table functions: %w", err) + } + + if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTableTriggers, setupConfig).Get(ctx, nil); err != nil { + s.Error("failed to create normalized table triggers", slog.Any("error", err)) + return fmt.Errorf("failed to create normalized table triggers: %w", err) + } + s.Info("finished setting up normalized tables for peer flow") return nil }