Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,55 @@ func (a *FlowableActivity) CreateNormalizedTable(
}, nil
}

// SyncIndexesAndTriggers syncs indexes and triggers from source to destination
// This is called once during initial setup, not for on-the-fly changes, thats reason its added into flow
func (a *FlowableActivity) SyncIndexesAndTriggers(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
) error {
logger := internal.LoggerFromCtx(ctx)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)

dstConn, dstClose, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("Connector does not support normalized tables, skipping index/trigger sync")
return nil
}
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get destination connector: %w", err))
}
defer dstClose(ctx)

pgDstConn, ok := dstConn.(*connpostgres.PostgresConnector)
if !ok {
logger.Info("Destination is not Postgres, skipping index/trigger sync")
return nil
}
if config.SourcePeerName == "" {
logger.Info("Source peer name not provided, skipping index/trigger sync")
return nil
}

srcConn, srcClose, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.SourcePeerName)
if err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get source connector: %w", err))
}
defer srcClose(ctx)

pgSrcConn, ok := srcConn.(*connpostgres.PostgresConnector)
if !ok {
logger.Info("Source is not Postgres, skipping index/trigger sync")
return nil
}
a.Alerter.LogFlowInfo(ctx, config.FlowName, "Syncing indexes, triggers, and constraints from source to destination")
if err := pgDstConn.SyncIndexesAndTriggers(ctx, config.TableMappings, pgSrcConn); err != nil {
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to sync indexes, triggers, and constraints: %w", err))
}

a.Alerter.LogFlowInfo(ctx, config.FlowName, "Successfully synced indexes, triggers, and constraints")
return nil
}

func (a *FlowableActivity) SyncFlow(
ctx context.Context,
config *protos.FlowConnectionConfigsCore,
Expand Down
71 changes: 71 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,70 @@ func (a *FlowableActivity) applySchemaDeltas(
return nil
}

func (a *FlowableActivity) updateDestinationSchemaMapping(
ctx context.Context,
config *protos.FlowConnectionConfigsCore,
options *protos.SyncFlowOptions,
schemaDeltas []*protos.TableSchemaDelta,
) error {
logger := internal.LoggerFromCtx(ctx)

filteredTableMappings := make([]*protos.TableMapping, 0, len(schemaDeltas))
for _, tableMapping := range options.TableMappings {
if slices.ContainsFunc(schemaDeltas, func(schemaDelta *protos.TableSchemaDelta) bool {
return schemaDelta.SrcTableName == tableMapping.SourceTableIdentifier &&
schemaDelta.DstTableName == tableMapping.DestinationTableIdentifier
}) {
filteredTableMappings = append(filteredTableMappings, tableMapping)
}
}

if len(filteredTableMappings) == 0 {
return nil
}

dstConn, dstClose, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return fmt.Errorf("failed to get destination connector for schema update: %w", err)
}
defer dstClose(ctx)

tableNameSchemaMapping, err := dstConn.GetTableSchema(ctx, config.Env, config.Version, config.System, filteredTableMappings)
if err != nil {
return fmt.Errorf("failed to get updated schema from destination: %w", err)
}

processed := internal.BuildProcessedSchemaMapping(filteredTableMappings, tableNameSchemaMapping, logger)

tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("failed to start transaction for schema mapping update: %w", err)
}
defer shared.RollbackTx(tx, logger)

for tableName, tableSchema := range processed {
processedBytes, err := proto.Marshal(tableSchema)
if err != nil {
return fmt.Errorf("failed to marshal table schema for %s: %w", tableName, err)
}
if _, err := tx.Exec(
ctx,
"insert into table_schema_mapping(flow_name, table_name, table_schema) values ($1, $2, $3) "+
"on conflict (flow_name, table_name) do update set table_schema = $3",
config.FlowJobName,
tableName,
processedBytes,
); err != nil {
return fmt.Errorf("failed to update schema mapping for %s: %w", tableName, err)
}
}

if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit schema mapping update: %w", err)
}
return nil
}

func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncConnectorCore, Items model.Items](
ctx context.Context,
a *FlowableActivity,
Expand Down Expand Up @@ -338,6 +402,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, err
}

if len(res.TableSchemaDeltas) > 0 {
if err := a.updateDestinationSchemaMapping(ctx, config, options, res.TableSchemaDeltas); err != nil {
logger.Warn("Failed to update destination schema mapping, normalization may use stale schema",
slog.Any("error", err))
}
}

if recordBatchSync.NeedsNormalize() {
syncState.Store(shared.Ptr("normalizing"))
normRequests.Update(res.CurrentSyncBatchID)
Expand Down
Loading
Loading