Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/connectorstore/connectorstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ type Writer interface {
PutResources(ctx context.Context, resources ...*v2.Resource) error
PutEntitlements(ctx context.Context, entitlements ...*v2.Entitlement) error
DeleteGrant(ctx context.Context, grantId string) error

// SetExpansionStarted marks the current sync as having started expansion.
// This marker is used to detect syncs that expanded with older code that dropped annotations.
SetExpansionStarted(ctx context.Context, syncID string) error
}
114 changes: 114 additions & 0 deletions pkg/dotc1z/c1file_attached.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func (c *C1FileAttached) UpdateSync(ctx context.Context, baseSync *reader_v2.Syn
return nil
}

// ErrOldSyncMissingExpansionMarker is returned when the old sync doesn't have the expansion_started_at
// marker set. This indicates the sync was expanded with older code that dropped grant annotations,
// making it unsuitable for diff-based incremental expansion.
var ErrOldSyncMissingExpansionMarker = errors.New("old sync is missing expansion marker; cannot generate diff from sync expanded with older code that dropped annotations")

// GenerateSyncDiffFromFile compares the old sync (in attached) with the new sync (in main)
// and generates two new syncs in the main database.
//
Expand All @@ -198,6 +203,20 @@ func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID
ctx, span := tracer.Start(ctx, "C1FileAttached.GenerateSyncDiffFromFile")
defer span.End()

// Check that the old sync has the expansion marker set.
// Syncs expanded with older code dropped annotations, making them unusable for diffs.
var expansionStartedAt sql.NullTime
err := c.file.db.QueryRowContext(ctx,
fmt.Sprintf("SELECT expansion_started_at FROM attached.%s WHERE sync_id = ?", syncRuns.Name()),
oldSyncID,
).Scan(&expansionStartedAt)
if err != nil {
return "", "", fmt.Errorf("failed to check expansion marker for old sync: %w", err)
}
if !expansionStartedAt.Valid {
return "", "", ErrOldSyncMissingExpansionMarker
}

// Generate unique IDs for the diff syncs
deletionsSyncID := ksuid.New().String()
upsertsSyncID := ksuid.New().String()
Expand Down Expand Up @@ -259,12 +278,29 @@ func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID
// - diffTableFromMainTx finds items in NEW not in OLD or modified = upserts
tables := []string{"v1_resource_types", "v1_resources", "v1_entitlements", "v1_grants"}
for _, tableName := range tables {
// Always include resource types in the upserts diff. Targeted/partial syncs may not emit a complete
// snapshot of resource types, and we do not want missing types to be interpreted as deletions.
if tableName == "v1_resource_types" {
if err := c.copyTableFromMainTx(ctx, tx, tableName, newSyncID, upsertsSyncID); err != nil {
return "", "", fmt.Errorf("failed to copy resource types for %s: %w", tableName, err)
}
continue
}
if err := c.diffTableFromAttachedTx(ctx, tx, tableName, oldSyncID, newSyncID, deletionsSyncID); err != nil {
return "", "", fmt.Errorf("failed to generate deletions for %s: %w", tableName, err)
}
if err := c.diffTableFromMainTx(ctx, tx, tableName, oldSyncID, newSyncID, upsertsSyncID); err != nil {
return "", "", fmt.Errorf("failed to generate upserts for %s: %w", tableName, err)
}
// For grants, also include the OLD version of modified rows in the deletions sync.
// This allows downstream consumers (including incremental expansion) to treat modifications
// as delete+insert and compute accurate edge/source invalidation without looking back into
// the attached database later.
if tableName == "v1_grants" {
if err := c.diffModifiedFromAttachedTx(ctx, tx, tableName, oldSyncID, newSyncID, deletionsSyncID); err != nil {
return "", "", fmt.Errorf("failed to generate modified-row deletions for %s: %w", tableName, err)
}
}
}

// End the syncs (deletions first, then upserts)
Expand Down Expand Up @@ -397,3 +433,81 @@ func (c *C1FileAttached) diffTableFromMainTx(ctx context.Context, tx *sql.Tx, ta
_, err = tx.ExecContext(ctx, query, targetSyncID, newSyncID, oldSyncID, oldSyncID)
return err
}

// diffModifiedFromAttachedTx inserts the OLD version of rows that were modified between OLD (attached) and NEW (main).
// This is used to make modifications behave like delete+insert when applying diffs.
func (c *C1FileAttached) diffModifiedFromAttachedTx(ctx context.Context, tx *sql.Tx, tableName string, oldSyncID string, newSyncID string, targetSyncID string) error {
columns, err := c.getTableColumns(ctx, tableName)
if err != nil {
return err
}

// Build column lists
columnList := ""
selectList := ""
for i, col := range columns {
if i > 0 {
columnList += ", "
selectList += ", "
}
columnList += col
if col == "sync_id" {
selectList += "? as sync_id"
} else {
selectList += col
}
}

// Insert OLD rows for modified records.
//nolint:gosec // table names are from hardcoded list, not user input
query := fmt.Sprintf(`
INSERT INTO main.%s (%s)
SELECT %s
FROM attached.%s AS a
WHERE a.sync_id = ?
AND EXISTS (
SELECT 1 FROM main.%s AS m
WHERE m.external_id = a.external_id
AND m.sync_id = ?
AND a.data != m.data
)
`, tableName, columnList, selectList, tableName, tableName)

_, err = tx.ExecContext(ctx, query, targetSyncID, oldSyncID, newSyncID)
return err
}

// copyTableFromMainTx copies all rows for newSyncID (NEW) into targetSyncID. This is used for tables where we
// want the upserts sync to always contain a full snapshot (e.g., resource types).
func (c *C1FileAttached) copyTableFromMainTx(ctx context.Context, tx *sql.Tx, tableName string, newSyncID string, targetSyncID string) error {
columns, err := c.getTableColumns(ctx, tableName)
if err != nil {
return err
}

columnList := ""
selectList := ""
for i, col := range columns {
if i > 0 {
columnList += ", "
selectList += ", "
}
columnList += col
if col == "sync_id" {
selectList += "? as sync_id"
} else {
selectList += col
}
}

//nolint:gosec // table names are from hardcoded list, not user input
query := fmt.Sprintf(`
INSERT INTO main.%s (%s)
SELECT %s
FROM main.%s AS m
WHERE m.sync_id = ?
`, tableName, columnList, selectList, tableName)

_, err = tx.ExecContext(ctx, query, targetSyncID, newSyncID)
return err
}
Loading
Loading