Conversation
WalkthroughRequire old syncs to have an expansion marker to generate diffs; snapshot resource types into diff upserts; emit OLD rows for modified grants; add grants expansion metadata and needs_expansion bookkeeping; implement incremental expansion pipeline (edge-delta, invalidation, marking, targeted expansion), syncer fast-path, and accompanying tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Syncer as Syncer
participant Diff as DiffGenerator
participant C1 as C1File/DB
participant Incr as IncrOrchestrator
participant Store as GrantStore
participant Graph as EntitlementGraph
Syncer->>C1: SetExpansionStarted(syncID)
Syncer->>Diff: GenerateSyncDiffFromFile(oldSync,newSync)
Diff->>C1: Check expansion_started_at on oldSync (error if missing)
Diff->>C1: copy `v1_resource_types` -> upsertsSync
Diff->>C1: emit OLD rows for modified `v1_grants` -> deletionsSync
Diff-->>Syncer: upsertsSyncID, deletionsSyncID
Syncer->>Incr: ApplyIncrementalExpansionFromDiff(upsertsSync,deletionsSync)
Incr->>C1: ListDistinctGrantEntitlementIDsForSync(upserts/deletions)
Incr->>Incr: EdgeDeltaFromDiffSyncs(upserts,deletions)
Incr->>Incr: AffectedEntitlements(delta) (BFS)
Incr->>Store: InvalidateRemovedEdges(targetSyncID, delta.Removed)
Incr->>Store: InvalidateChangedSourceEntitlements(targetSyncID, changedSources)
Incr->>Store: MarkNeedsExpansionForAffectedEdges(targetSyncID, affected)
Incr->>Graph: ExpandDirtySubgraph(targetSyncID)
Graph->>C1: ListExpandableGrants(needs_expansion_only)
C1-->>Graph: ExpandableGrantDef[] (paged)
Graph->>Graph: Build nodes & edges, FixCycles, RunExpander
Graph->>C1: ClearNeedsExpansionForSync(targetSyncID)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pkg/dotc1z/c1file_attached.go`:
- Around line 261-276: The v1_resource_types path can produce duplicate inserts
into upsertsSyncID because diffTableFromMainTx may already add the same rows
that copyTableFromMainTx then re-inserts; to fix, special-case the
"v1_resource_types" table: skip calling diffTableFromMainTx for that table
(i.e., call diffTableFromAttachedTx, then directly call copyTableFromMainTx) OR
alternatively change copyTableFromMainTx to perform INSERT OR REPLACE (or
equivalent upsert) so inserting all rows into upsertsSyncID cannot violate the
(external_id, sync_id) unique constraint; update the logic around
diffTableFromMainTx, copyTableFromMainTx, and the tableName ==
"v1_resource_types" branch accordingly.
In `@pkg/dotc1z/sql_helpers.go`:
- Around line 298-319: Remove the unnecessary //nolint:gosec directives that
precede fmt.Sprintf calls in tableHasColumn and ensureRowHashColumn (they're
unused and trigger nolintlint); delete those comments and similarly remove the
same //nolint:gosec occurrences in backfillRowHash so the linter no longer
complains while leaving the fmt.Sprintf usage unchanged in each function.
🧹 Nitpick comments (1)
pkg/dotc1z/c1file_attached.go (1)
366-387: Static analysis flagged SQL string formatting - acknowledged as safe.The
hasRowHashclosure at Line 369 usesfmt.Sprintfto construct a SQL query. The static analysis tool flagged this as G201 (SQL string formatting). However, this is a false positive because:
tableNamecomes from the hardcoded list at Line 260schemais either "main" or "attached" (internal values)Consider adding a
//nolint:gosecdirective with a comment for consistency with other similar patterns in this file, or refactoring to use parameterized queries if the linter configuration requires it.
399424a to
8a3a126
Compare
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@pkg/sync/incrementalexpansion/affected_subgraph.go`:
- Around line 1-8: Run goimports on the file to fix import ordering and unused
imports; specifically in package incrementalexpansion adjust the import block
that currently lists "context", "fmt", and
"github.com/conductorone/baton-sdk/pkg/dotc1z" so that standard library imports
are grouped and sorted, and remove "fmt" if it is unused (or use it where
intended) to satisfy goimports/CI.
In `@pkg/sync/incrementalexpansion/apply.go`:
- Around line 1-8: Run goimports on the apply.go source in package
incrementalexpansion to fix import ordering and remove any unused import entries
(the file currently imports context, fmt and
github.com/conductorone/baton-sdk/pkg/dotc1z); update the import block and save
the file so CI passes.
In `@pkg/sync/incrementalexpansion/edge_delta.go`:
- Around line 93-94: The file edge_delta.go in package incrementalexpansion has
extraneous blank line(s) at the end which violates goimports formatting; open
edge_delta.go and remove the trailing blank line(s) so the file ends immediately
after the final token (e.g., after the last function, type, or variable
declaration such as any functions or types in edge_delta.go), then run
goimports/gofmt to confirm no further formatting issues.
In `@pkg/sync/incrementalexpansion/expand_dirty.go`:
- Around line 1-11: Run goimports on the file to fix import formatting and
ordering in package incrementalexpansion: organize and group standard library
imports (context, fmt) before third-party imports, remove any unused imports,
and let goimports add any missing import paths; ensure the import block
containing context, fmt, v2, reader_v2, dotc1z, and expand is reformatted by
goimports so CI passes.
- Around line 43-49: The loop over def.SrcEntitlementIDs currently swallows
every error from c1f.GetEntitlement; change it to only ignore "not found" errors
and propagate any other failures: after calling c1f.GetEntitlement (using
reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder), detect
whether err represents a NotFound (or equivalent) and continue the loop only in
that case, otherwise return or propagate the err up from the enclosing function
so callers see real failures.
In `@pkg/sync/incrementalexpansion/incremental_correctness_test.go`:
- Around line 1306-1314: Create a tracked issue for the failing behavior and
annotate the test TestIncrementalExpansion_DirectGrantBecomesSourceless with
that issue ID instead of an undocumented skip: open a bug/issue describing the
incremental expansion adding self-source E2, add a TODO comment referencing the
new issue above the test, and replace the current t.Skip("BUG:...") with
t.Skipf("skipping due to known bug: %s", "ISSUE-<ID>") or a URL so CI shows the
issue link; keep the test in place so it can be re-enabled once the issue is
resolved.
- Around line 59-62: The call to c1f.SetSyncID currently discards errors which
can leave ctx stale before calling ViewSync; change the `_ = c1f.SetSyncID(ctx,
"")` line to check the returned error and return it (e.g., if err :=
c1f.SetSyncID(ctx, ""); err != nil { return nil, err }) so SetSyncID failures
are propagated instead of ignored before calling ViewSync.
- Around line 37-42: The test currently ignores errors from graph.AddEdge which
can leave the graph incomplete; update the loop that iterates defs and
def.SrcEntitlementIDs to capture the error returned by graph.AddEdge(ctx, src,
def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs) and fail the
test on error (e.g., t.Fatalf or use your test helper like require.NoError) so
any AddEdge failure during setup surfaces immediately; keep the existing calls
to graph.AddEntitlementID and ctx as-is but ensure the AddEdge error is checked
and handled.
In `@pkg/sync/incrementalexpansion/invalidate.go`:
- Around line 21-27: The top-level comment for InvalidateRemovedEdges is missing
a terminal period; update the comment block describing "InvalidateRemovedEdges
removes only the specific source keys implied by removed edges" so the final
sentence ends with a period (ensure each bullet line also ends with proper
punctuation if required), locating the comment above the InvalidateRemovedEdges
function and adding the trailing period to satisfy godot lint.
🧹 Nitpick comments (8)
pkg/dotc1z/grants_needs_expansion.go (1)
72-74: Remove trailing blank lines to fix formatting.Static analysis indicates the file has trailing blank lines that fail the
goimportscheck.🧹 Proposed fix
c.dbUpdated = true return nil } - -pkg/dotc1z/grants_diff_helpers.go (2)
131-134: Remove misleading compile check and fix formatting.The comment claims this is a "sanity compile check" for table existence, but
var _ = fmt.Sprintfonly ensures thefmtpackage is used—it doesn't validate table references. The static analysis also flags:
- Line 131: Comment should end in a period
- Lines 133-134: Trailing blank lines
🧹 Proposed fix: Remove the misleading check and trailing lines
return out, nil } - -// sanity compile check: ensure tables referenced exist in this package -var _ = fmt.Sprintf - -
10-49: Consider extracting a shared helper to reduce duplication.The three
ListDistinct*methods share nearly identical structure differing only in table name and column. While acceptable for now, a generic helper could reduce maintenance burden:func (c *C1File) listDistinctColumn(ctx context.Context, tableName, columnName, syncID string) ([]string, error)This is optional since the current implementation is clear and the methods are unlikely to diverge.
Also applies to: 51-89, 91-129
pkg/dotc1z/grants_expandable_query.go (2)
15-25: Fix formatting to address goimports failure.Static analysis indicates a formatting issue at line 16. Run
goimportson this file.
156-163: Add defensive handling for potentially empty JSON strings.The schema defaults
expandable_src_entitlement_ids_jsonandexpandable_principal_resource_type_ids_jsonto'[]', but edge cases during data migration could produce empty strings. Sincejson.Unmarshalfails on an empty string rather than"[]", consider checkingif input != ""before unmarshaling, consistent with similar patterns elsewhere in the codebase (e.g.,pkg/pagination/pagination.go,pkg/sync/state.go).pkg/dotc1z/grants.go (1)
228-240: Fix formatting to address goimports failure.Static analysis indicates a formatting issue at line 229. Run
goimportson this file.pkg/dotc1z/c1file_attached.go (1)
374-395: Consider extractinghasRowHashto avoid duplication.The
hasRowHashclosure andcompareColselection logic is duplicated betweendiffTableFromMainTxanddiffModifiedFromAttachedTx. Consider extracting to a shared helper method.♻️ Proposed refactor
// hasRowHashColumn checks if both main and attached schemas have a row_hash column for the given table. func (c *C1FileAttached) hasRowHashColumn(ctx context.Context, tx *sql.Tx, tableName string) (bool, error) { check := func(schema string) (bool, error) { q := fmt.Sprintf("select count(*) from %s.pragma_table_info('%s') where name='row_hash'", schema, tableName) var n int if err := tx.QueryRowContext(ctx, q).Scan(&n); err != nil { return false, err } return n > 0, nil } mainHas, err := check("main") if err != nil { return false, err } attachedHas, err := check("attached") if err != nil { return false, err } return mainHas && attachedHas, nil }Then in both methods:
useRowHash, err := c.hasRowHashColumn(ctx, tx, tableName) if err != nil { return err } compareCol := "data" if useRowHash { compareCol = "row_hash" }Also applies to: 449-470
pkg/sync/incrementalexpansion/edge_delta.go (1)
21-34: Consider sortingPrincipalResourceTypeIDsfor key stability.The key includes
PrincipalResourceTypeIDsin slice order. If the same edge has these IDs in different orders (e.g., from different query results or data sources), the keys won't match, potentially causing incorrect delta computation.Proposed fix to sort before joining
+import ( + "context" + "fmt" + "slices" + "strings" + + "github.com/conductorone/baton-sdk/pkg/dotc1z" +)func (e Edge) Key() string { // Use an unlikely separator to avoid accidental collisions. sep := "\x1f" shallow := "0" if e.Shallow { shallow = "1" } + sortedTypes := slices.Clone(e.PrincipalResourceTypeIDs) + slices.Sort(sortedTypes) return strings.Join([]string{ e.SrcEntitlementID, e.DstEntitlementID, shallow, - strings.Join(e.PrincipalResourceTypeIDs, sep), + strings.Join(sortedTypes, sep), }, sep) }Please verify whether
PrincipalResourceTypeIDsordering is guaranteed to be consistent when retrieved from the database. If ordering is guaranteed, this refactor is unnecessary.
2a6dcbe to
5163492
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/dotc1z/grants.go`:
- Around line 251-268: The function grantExpandableColumns currently marks
grants as expandable before canonicalizing entitlement and resource-type IDs, so
whitespace-only values can be stripped to empty slices yet still be treated as
expandable; update grantExpandableColumns to check srcEntIDs (and
principalRTIDs) after calling canonicalizeStringSlice and before marshalling: if
len(srcEntIDs)==0 (or len(principalRTIDs)==0) treat the grant as non-expandable
and return 0, "[]", 0, "[]", 0, nil (same non-expandable return shape used
earlier), avoiding needs_expansion=1 for empty canonicalized lists; reference
the variables srcEntIDs, principalRTIDs, srcEntIDsJSON, principalRTIDsJSON and
the function grantExpandableColumns to locate where to add this guard.
🧹 Nitpick comments (1)
pkg/dotc1z/c1file_attached.go (1)
417-459: Consider extracting the column list building logic.The column list construction pattern (lines 426-440) is duplicated across four methods:
diffTableFromAttachedTx,diffTableFromMainTx,diffModifiedFromAttachedTx, andcopyTableFromMainTx. This could be extracted into a helper function to reduce duplication.♻️ Suggested helper extraction
// buildColumnLists returns columnList and selectList for INSERT statements. // The selectList substitutes "? as sync_id" for the sync_id column. func buildColumnLists(columns []string) (columnList, selectList string) { for i, col := range columns { if i > 0 { columnList += ", " selectList += ", " } columnList += col if col == "sync_id" { selectList += "? as sync_id" } else { selectList += col } } return columnList, selectList }Then in each method:
- columnList := "" - selectList := "" - for i, col := range columns { - if i > 0 { - columnList += ", " - selectList += ", " - } - columnList += col - if col == "sync_id" { - selectList += "? as sync_id" - } else { - selectList += col - } - } + columnList, selectList := buildColumnLists(columns)
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/dotc1z/grants_test.go`:
- Around line 49-83: Run goimports (or gofmt + goimports) on the test file to
fix the linter formatting error: format pkg/dotc1z/grants_test.go so imports are
organized and the file is gofmt-compliant, then re-run tests; specifically
reformat the TestGrantExpandableColumns_MixedWhitespaceAndValidIDs test and any
related declarations (e.g., grantExpandableColumns usage) so the linter stops
flagging the file, and commit the updated file.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/sync/syncer.go`:
- Around line 1764-1779: The error handling in processGrantForGraph is
inconsistent with ListExpandableGrants: change the branch that currently logs
and continues on any error to only continue when errors.Is(err, sql.ErrNoRows)
and otherwise return the error so non-not-found failures are propagated; locate
the error-handling block in processGrantForGraph (the one that logs "source
entitlement not found, skipping edge" and currently continues) and modify it to
mirror the logic used in ListExpandableGrants (check sql.ErrNoRows, l.Debug for
not-found and continue, otherwise l.Error with zap.Error(err) and return err).
🧹 Nitpick comments (2)
pkg/sync/incrementalexpansion/expand_dirty.go (1)
58-65: Add identifiers to mismatch errors for easier triage.The mismatch errors lack context, which will make production debugging painful. Consider including IDs in the error message.
🛠️ Suggested improvement
- sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() - if sourceEntitlementResourceID == nil { - return fmt.Errorf("source entitlement resource id was nil") - } - if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || - principalID.GetResource() != sourceEntitlementResourceID.GetResource() { - return fmt.Errorf("source entitlement resource id did not match grant principal id") - } + sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() + if sourceEntitlementResourceID == nil { + return fmt.Errorf("source entitlement resource id was nil for src entitlement %q", srcEntitlementID) + } + if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || + principalID.GetResource() != sourceEntitlementResourceID.GetResource() { + return fmt.Errorf( + "source entitlement resource id did not match grant principal id (srcType=%q src=%q principalType=%q principal=%q)", + sourceEntitlementResourceID.GetResourceType(), + sourceEntitlementResourceID.GetResource(), + principalID.GetResourceType(), + principalID.GetResource(), + ) + }pkg/sync/syncer.go (1)
1728-1733: Consider reusing the existing interface definition.The AI summary indicates that
expandableGrantListeris also defined inpkg/sync/incrementalexpansion/edge_delta.go. Defining the same interface inline here creates duplication. Consider importing and reusing the existing definition to maintain a single source of truth.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/sync/incrementalexpansion/incremental_correctness_test.go`:
- Around line 93-210: Several test file Close calls (e.g., oldFile.Close(ctx),
newFile.Close(ctx), expectedFile.Close(ctx)) are deferred without checking
errors; replace those defer Close(ctx) calls with t.Cleanup wrappers that assert
the Close succeeded (e.g., t.Cleanup(func(){ require.NoError(t,
oldFile.Close(ctx)) })) and do the same for newFile and expectedFile (and any
other deferred Close calls in this test file) so cleanup failures fail the test.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/dotc1z/grants.go`:
- Around line 313-401: The loop can spin forever because rows that contain the
text but yield hasExpandable==0 are skipped and never updated; change
backfillGrantExpandableColumns so that when grantExpandableColumns returns
hasExpandable==0 you still execute the UPDATE (use a sentinel like -1 for
has_expandable and set the other expandable columns to null/empty) instead of
continuing, so the row is marked as "inspected but not expandable" and won't be
selected again; modify the branch around hasExpandable in
backfillGrantExpandableColumns (the code that currently does "if hasExpandable
== 0 { continue }") to call stmt.ExecContext(ctx, int64(-1), nil/empty values,
shallowInt, nil/empty, r.id) (adjust types to match existing columns) so the SQL
update persists the sentinel, leaving the selection query unchanged.
🧹 Nitpick comments (1)
pkg/dotc1z/grants.go (1)
333-345: Consider using defer for rows.Close() to prevent resource leak on early returns.If an error occurs during
rows.Scan,rows.Close()is called before return. However, usingdeferimmediately after obtainingrowswould be more robust and idiomatic.Suggested refactor
if err != nil { return err } + defer rows.Close() type row struct { id int64 data []byte } batch := make([]row, 0, 1000) for rows.Next() { var r row if err := rows.Scan(&r.id, &r.data); err != nil { - _ = rows.Close() return err } batch = append(batch, r) } if err := rows.Err(); err != nil { - _ = rows.Close() return err } - _ = rows.Close()
| func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tableName string) error { | ||
| // Scan for rows that contain "GrantExpandable" in the proto blob but haven't been | ||
| // backfilled yet (has_expandable=0). The LIKE filter skips the 99%+ of rows that | ||
| // don't have expandable annotations, making this fast even on large tables. | ||
| for { | ||
| rows, err := db.QueryContext(ctx, fmt.Sprintf( | ||
| `SELECT id, data FROM %s | ||
| WHERE has_expandable=0 AND data LIKE '%%GrantExpandable%%' | ||
| LIMIT 1000`, | ||
| tableName, | ||
| )) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| type row struct { | ||
| id int64 | ||
| data []byte | ||
| } | ||
| batch := make([]row, 0, 1000) | ||
| for rows.Next() { | ||
| var r row | ||
| if err := rows.Scan(&r.id, &r.data); err != nil { | ||
| _ = rows.Close() | ||
| return err | ||
| } | ||
| batch = append(batch, r) | ||
| } | ||
| if err := rows.Err(); err != nil { | ||
| _ = rows.Close() | ||
| return err | ||
| } | ||
| _ = rows.Close() | ||
|
|
||
| if len(batch) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| tx, err := db.BeginTx(ctx, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| stmt, err := tx.PrepareContext(ctx, fmt.Sprintf( | ||
| `UPDATE %s | ||
| SET has_expandable=?, | ||
| expandable_src_entitlement_ids_json=?, | ||
| expandable_shallow=?, | ||
| expandable_principal_resource_type_ids_json=?, | ||
| needs_expansion=? | ||
| WHERE id=?`, | ||
| tableName, | ||
| )) | ||
| if err != nil { | ||
| _ = tx.Rollback() | ||
| return err | ||
| } | ||
|
|
||
| for _, r := range batch { | ||
| g := &v2.Grant{} | ||
| if err := proto.Unmarshal(r.data, g); err != nil { | ||
| _ = stmt.Close() | ||
| _ = tx.Rollback() | ||
| return err | ||
| } | ||
| hasExpandable, srcEntsJSON, shallowInt, prRTsJSON, needsExpansion, err := grantExpandableColumns(g) | ||
| if err != nil { | ||
| _ = stmt.Close() | ||
| _ = tx.Rollback() | ||
| return err | ||
| } | ||
| // Only update if we found a valid expandable annotation. | ||
| // Rows with "GrantExpandable" in the blob but no valid annotation are skipped. | ||
| if hasExpandable == 0 { | ||
| continue | ||
| } | ||
| if _, err := stmt.ExecContext(ctx, hasExpandable, srcEntsJSON, shallowInt, prRTsJSON, needsExpansion, r.id); err != nil { | ||
| _ = stmt.Close() | ||
| _ = tx.Rollback() | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| _ = stmt.Close() | ||
| if err := tx.Commit(); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Infinite loop when blob contains "GrantExpandable" text but annotation is invalid.
The query selects rows where has_expandable=0 AND data LIKE '%GrantExpandable%'. When grantExpandableColumns returns hasExpandable == 0 (e.g., empty entitlement IDs after canonicalization), the row is skipped without being updated (lines 386-388). Since has_expandable remains 0, the same row will be selected again indefinitely.
This can occur when:
- The blob contains "GrantExpandable" as text but the actual annotation has empty/whitespace-only entitlement IDs
- Proto unmarshal succeeds but annotation extraction fails silently
Proposed fix: Mark inspected rows to prevent reprocessing
Use a sentinel value (e.g., -1) to indicate "inspected but not expandable":
// Only update if we found a valid expandable annotation.
// Rows with "GrantExpandable" in the blob but no valid annotation are skipped.
- if hasExpandable == 0 {
- continue
+ if hasExpandable == 0 {
+ // Mark as inspected to prevent infinite reprocessing
+ hasExpandable = -1
}
if _, err := stmt.ExecContext(ctx, hasExpandable, srcEntsJSON, shallowInt, prRTsJSON, needsExpansion, r.id); err != nil {Then update the query condition and any code that checks has_expandable to treat -1 and 0 equivalently for "not expandable" semantics:
- WHERE has_expandable=0 AND data LIKE '%%GrantExpandable%%'
+ WHERE has_expandable=0 AND data LIKE '%%GrantExpandable%%'Alternatively, track processed IDs within the batch and exclude them, or use has_expandable >= 0 checks elsewhere if -1 is used as sentinel.
🤖 Prompt for AI Agents
In `@pkg/dotc1z/grants.go` around lines 313 - 401, The loop can spin forever
because rows that contain the text but yield hasExpandable==0 are skipped and
never updated; change backfillGrantExpandableColumns so that when
grantExpandableColumns returns hasExpandable==0 you still execute the UPDATE
(use a sentinel like -1 for has_expandable and set the other expandable columns
to null/empty) instead of continuing, so the row is marked as "inspected but not
expandable" and won't be selected again; modify the branch around hasExpandable
in backfillGrantExpandableColumns (the code that currently does "if
hasExpandable == 0 { continue }") to call stmt.ExecContext(ctx, int64(-1),
nil/empty values, shallowInt, nil/empty, r.id) (adjust types to match existing
columns) so the SQL update persists the sentinel, leaving the selection query
unchanged.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/dotc1z/grants_expandable_query.go`:
- Around line 57-185: In ListExpandableGrants, handle the case where
annotations.Annotations(g.GetAnnotations()).Pick(ge) returns ok==false instead
of ignoring it; update the annos.Pick call (in function ListExpandableGrants
within pkg/dotc1z/grants_expandable_query.go) to capture the boolean result and
if ok is false return an explicit error (including the grant external ID)
indicating the GrantExpandable annotation is missing/corrupt so we don't
silently emit empty SrcEntitlementIDs or other fields for rows marked
expandable.
| // ListExpandableGrants lists expandable grants using the grants table's queryable columns. | ||
| // It avoids scanning/unmarshalling all grants. | ||
| func (c *C1File) ListExpandableGrants(ctx context.Context, opts ...ListExpandableGrantsOption) ([]*ExpandableGrantDef, string, error) { | ||
| ctx, span := tracer.Start(ctx, "C1File.ListExpandableGrants") | ||
| defer span.End() | ||
|
|
||
| if err := c.validateDb(ctx); err != nil { | ||
| return nil, "", err | ||
| } | ||
|
|
||
| o := &listExpandableGrantsOptions{} | ||
| for _, opt := range opts { | ||
| opt(o) | ||
| } | ||
|
|
||
| syncID, err := c.resolveSyncIDForInternalQuery(ctx, o.syncID) | ||
| if err != nil { | ||
| return nil, "", err | ||
| } | ||
|
|
||
| q := c.db.From(grants.Name()).Prepared(true) | ||
| q = q.Select( | ||
| "id", | ||
| "external_id", | ||
| "entitlement_id", | ||
| "principal_resource_type_id", | ||
| "principal_resource_id", | ||
| "data", | ||
| "needs_expansion", | ||
| ) | ||
| q = q.Where(goqu.C("sync_id").Eq(syncID)) | ||
| q = q.Where(goqu.C("is_expandable").Eq(1)) | ||
| if o.needsExpansionOnly { | ||
| q = q.Where(goqu.C("needs_expansion").Eq(1)) | ||
| } | ||
|
|
||
| if o.pageToken != "" { | ||
| // Page token is the grants table row ID. | ||
| id, err := strconv.ParseInt(o.pageToken, 10, 64) | ||
| if err != nil { | ||
| return nil, "", fmt.Errorf("invalid expandable grants page token %q: %w", o.pageToken, err) | ||
| } | ||
| q = q.Where(goqu.C("id").Gte(id)) | ||
| } | ||
|
|
||
| pageSize := o.pageSize | ||
| if pageSize > maxPageSize || pageSize == 0 { | ||
| pageSize = maxPageSize | ||
| } | ||
| q = q.Order(goqu.C("id").Asc()).Limit(uint(pageSize + 1)) | ||
|
|
||
| query, args, err := q.ToSQL() | ||
| if err != nil { | ||
| return nil, "", err | ||
| } | ||
|
|
||
| rows, err := c.db.QueryContext(ctx, query, args...) | ||
| if err != nil { | ||
| return nil, "", err | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| defs := make([]*ExpandableGrantDef, 0, pageSize) | ||
| var ( | ||
| count uint32 | ||
| lastRow int64 | ||
| ) | ||
| for rows.Next() { | ||
| count++ | ||
| if count > pageSize { | ||
| break | ||
| } | ||
|
|
||
| var ( | ||
| rowID int64 | ||
| externalID string | ||
| dstEntitlementID string | ||
| principalRTID string | ||
| principalRID string | ||
| dataBlob []byte | ||
| needsExpansionInt int | ||
| ) | ||
|
|
||
| if err := rows.Scan( | ||
| &rowID, | ||
| &externalID, | ||
| &dstEntitlementID, | ||
| &principalRTID, | ||
| &principalRID, | ||
| &dataBlob, | ||
| &needsExpansionInt, | ||
| ); err != nil { | ||
| return nil, "", err | ||
| } | ||
| lastRow = rowID | ||
|
|
||
| var g v2.Grant | ||
| if err := proto.Unmarshal(dataBlob, &g); err != nil { | ||
| return nil, "", fmt.Errorf("invalid grant data for %q: %w", externalID, err) | ||
| } | ||
|
|
||
| annos := annotations.Annotations(g.GetAnnotations()) | ||
| ge := &v2.GrantExpandable{} | ||
| if _, err := annos.Pick(ge); err != nil { | ||
| return nil, "", fmt.Errorf("failed to extract GrantExpandable from grant %q: %w", externalID, err) | ||
| } | ||
|
|
||
| defs = append(defs, &ExpandableGrantDef{ | ||
| RowID: rowID, | ||
| GrantExternalID: externalID, | ||
| DstEntitlementID: dstEntitlementID, | ||
| PrincipalResourceTypeID: principalRTID, | ||
| PrincipalResourceID: principalRID, | ||
| SrcEntitlementIDs: ge.GetEntitlementIds(), | ||
| Shallow: ge.GetShallow(), | ||
| PrincipalResourceTypeIDs: ge.GetResourceTypeIds(), | ||
| NeedsExpansion: needsExpansionInt != 0, | ||
| }) | ||
| } | ||
| if err := rows.Err(); err != nil { | ||
| return nil, "", err | ||
| } | ||
|
|
||
| nextPageToken := "" | ||
| if count > pageSize { | ||
| nextPageToken = strconv.FormatInt(lastRow+1, 10) | ||
| } | ||
| return defs, nextPageToken, nil | ||
| } |
There was a problem hiding this comment.
Handle missing GrantExpandable annotations explicitly.
annos.Pick returns an ok flag; ignoring it can silently return empty source entitlements if a row is marked expandable but the annotation is missing/corrupt.
🛠️ Proposed fix
- if _, err := annos.Pick(ge); err != nil {
+ ok, err := annos.Pick(ge)
+ if err != nil {
return nil, "", fmt.Errorf("failed to extract GrantExpandable from grant %q: %w", externalID, err)
}
+ if !ok {
+ return nil, "", fmt.Errorf("missing GrantExpandable annotation for grant %q", externalID)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // ListExpandableGrants lists expandable grants using the grants table's queryable columns. | |
| // It avoids scanning/unmarshalling all grants. | |
| func (c *C1File) ListExpandableGrants(ctx context.Context, opts ...ListExpandableGrantsOption) ([]*ExpandableGrantDef, string, error) { | |
| ctx, span := tracer.Start(ctx, "C1File.ListExpandableGrants") | |
| defer span.End() | |
| if err := c.validateDb(ctx); err != nil { | |
| return nil, "", err | |
| } | |
| o := &listExpandableGrantsOptions{} | |
| for _, opt := range opts { | |
| opt(o) | |
| } | |
| syncID, err := c.resolveSyncIDForInternalQuery(ctx, o.syncID) | |
| if err != nil { | |
| return nil, "", err | |
| } | |
| q := c.db.From(grants.Name()).Prepared(true) | |
| q = q.Select( | |
| "id", | |
| "external_id", | |
| "entitlement_id", | |
| "principal_resource_type_id", | |
| "principal_resource_id", | |
| "data", | |
| "needs_expansion", | |
| ) | |
| q = q.Where(goqu.C("sync_id").Eq(syncID)) | |
| q = q.Where(goqu.C("is_expandable").Eq(1)) | |
| if o.needsExpansionOnly { | |
| q = q.Where(goqu.C("needs_expansion").Eq(1)) | |
| } | |
| if o.pageToken != "" { | |
| // Page token is the grants table row ID. | |
| id, err := strconv.ParseInt(o.pageToken, 10, 64) | |
| if err != nil { | |
| return nil, "", fmt.Errorf("invalid expandable grants page token %q: %w", o.pageToken, err) | |
| } | |
| q = q.Where(goqu.C("id").Gte(id)) | |
| } | |
| pageSize := o.pageSize | |
| if pageSize > maxPageSize || pageSize == 0 { | |
| pageSize = maxPageSize | |
| } | |
| q = q.Order(goqu.C("id").Asc()).Limit(uint(pageSize + 1)) | |
| query, args, err := q.ToSQL() | |
| if err != nil { | |
| return nil, "", err | |
| } | |
| rows, err := c.db.QueryContext(ctx, query, args...) | |
| if err != nil { | |
| return nil, "", err | |
| } | |
| defer rows.Close() | |
| defs := make([]*ExpandableGrantDef, 0, pageSize) | |
| var ( | |
| count uint32 | |
| lastRow int64 | |
| ) | |
| for rows.Next() { | |
| count++ | |
| if count > pageSize { | |
| break | |
| } | |
| var ( | |
| rowID int64 | |
| externalID string | |
| dstEntitlementID string | |
| principalRTID string | |
| principalRID string | |
| dataBlob []byte | |
| needsExpansionInt int | |
| ) | |
| if err := rows.Scan( | |
| &rowID, | |
| &externalID, | |
| &dstEntitlementID, | |
| &principalRTID, | |
| &principalRID, | |
| &dataBlob, | |
| &needsExpansionInt, | |
| ); err != nil { | |
| return nil, "", err | |
| } | |
| lastRow = rowID | |
| var g v2.Grant | |
| if err := proto.Unmarshal(dataBlob, &g); err != nil { | |
| return nil, "", fmt.Errorf("invalid grant data for %q: %w", externalID, err) | |
| } | |
| annos := annotations.Annotations(g.GetAnnotations()) | |
| ge := &v2.GrantExpandable{} | |
| if _, err := annos.Pick(ge); err != nil { | |
| return nil, "", fmt.Errorf("failed to extract GrantExpandable from grant %q: %w", externalID, err) | |
| } | |
| defs = append(defs, &ExpandableGrantDef{ | |
| RowID: rowID, | |
| GrantExternalID: externalID, | |
| DstEntitlementID: dstEntitlementID, | |
| PrincipalResourceTypeID: principalRTID, | |
| PrincipalResourceID: principalRID, | |
| SrcEntitlementIDs: ge.GetEntitlementIds(), | |
| Shallow: ge.GetShallow(), | |
| PrincipalResourceTypeIDs: ge.GetResourceTypeIds(), | |
| NeedsExpansion: needsExpansionInt != 0, | |
| }) | |
| } | |
| if err := rows.Err(); err != nil { | |
| return nil, "", err | |
| } | |
| nextPageToken := "" | |
| if count > pageSize { | |
| nextPageToken = strconv.FormatInt(lastRow+1, 10) | |
| } | |
| return defs, nextPageToken, nil | |
| } | |
| // ListExpandableGrants lists expandable grants using the grants table's queryable columns. | |
| // It avoids scanning/unmarshalling all grants. | |
| func (c *C1File) ListExpandableGrants(ctx context.Context, opts ...ListExpandableGrantsOption) ([]*ExpandableGrantDef, string, error) { | |
| ctx, span := tracer.Start(ctx, "C1File.ListExpandableGrants") | |
| defer span.End() | |
| if err := c.validateDb(ctx); err != nil { | |
| return nil, "", err | |
| } | |
| o := &listExpandableGrantsOptions{} | |
| for _, opt := range opts { | |
| opt(o) | |
| } | |
| syncID, err := c.resolveSyncIDForInternalQuery(ctx, o.syncID) | |
| if err != nil { | |
| return nil, "", err | |
| } | |
| q := c.db.From(grants.Name()).Prepared(true) | |
| q = q.Select( | |
| "id", | |
| "external_id", | |
| "entitlement_id", | |
| "principal_resource_type_id", | |
| "principal_resource_id", | |
| "data", | |
| "needs_expansion", | |
| ) | |
| q = q.Where(goqu.C("sync_id").Eq(syncID)) | |
| q = q.Where(goqu.C("is_expandable").Eq(1)) | |
| if o.needsExpansionOnly { | |
| q = q.Where(goqu.C("needs_expansion").Eq(1)) | |
| } | |
| if o.pageToken != "" { | |
| // Page token is the grants table row ID. | |
| id, err := strconv.ParseInt(o.pageToken, 10, 64) | |
| if err != nil { | |
| return nil, "", fmt.Errorf("invalid expandable grants page token %q: %w", o.pageToken, err) | |
| } | |
| q = q.Where(goqu.C("id").Gte(id)) | |
| } | |
| pageSize := o.pageSize | |
| if pageSize > maxPageSize || pageSize == 0 { | |
| pageSize = maxPageSize | |
| } | |
| q = q.Order(goqu.C("id").Asc()).Limit(uint(pageSize + 1)) | |
| query, args, err := q.ToSQL() | |
| if err != nil { | |
| return nil, "", err | |
| } | |
| rows, err := c.db.QueryContext(ctx, query, args...) | |
| if err != nil { | |
| return nil, "", err | |
| } | |
| defer rows.Close() | |
| defs := make([]*ExpandableGrantDef, 0, pageSize) | |
| var ( | |
| count uint32 | |
| lastRow int64 | |
| ) | |
| for rows.Next() { | |
| count++ | |
| if count > pageSize { | |
| break | |
| } | |
| var ( | |
| rowID int64 | |
| externalID string | |
| dstEntitlementID string | |
| principalRTID string | |
| principalRID string | |
| dataBlob []byte | |
| needsExpansionInt int | |
| ) | |
| if err := rows.Scan( | |
| &rowID, | |
| &externalID, | |
| &dstEntitlementID, | |
| &principalRTID, | |
| &principalRID, | |
| &dataBlob, | |
| &needsExpansionInt, | |
| ); err != nil { | |
| return nil, "", err | |
| } | |
| lastRow = rowID | |
| var g v2.Grant | |
| if err := proto.Unmarshal(dataBlob, &g); err != nil { | |
| return nil, "", fmt.Errorf("invalid grant data for %q: %w", externalID, err) | |
| } | |
| annos := annotations.Annotations(g.GetAnnotations()) | |
| ge := &v2.GrantExpandable{} | |
| ok, err := annos.Pick(ge) | |
| if err != nil { | |
| return nil, "", fmt.Errorf("failed to extract GrantExpandable from grant %q: %w", externalID, err) | |
| } | |
| if !ok { | |
| return nil, "", fmt.Errorf("missing GrantExpandable annotation for grant %q", externalID) | |
| } | |
| defs = append(defs, &ExpandableGrantDef{ | |
| RowID: rowID, | |
| GrantExternalID: externalID, | |
| DstEntitlementID: dstEntitlementID, | |
| PrincipalResourceTypeID: principalRTID, | |
| PrincipalResourceID: principalRID, | |
| SrcEntitlementIDs: ge.GetEntitlementIds(), | |
| Shallow: ge.GetShallow(), | |
| PrincipalResourceTypeIDs: ge.GetResourceTypeIds(), | |
| NeedsExpansion: needsExpansionInt != 0, | |
| }) | |
| } | |
| if err := rows.Err(); err != nil { | |
| return nil, "", err | |
| } | |
| nextPageToken := "" | |
| if count > pageSize { | |
| nextPageToken = strconv.FormatInt(lastRow+1, 10) | |
| } | |
| return defs, nextPageToken, nil | |
| } |
🤖 Prompt for AI Agents
In `@pkg/dotc1z/grants_expandable_query.go` around lines 57 - 185, In
ListExpandableGrants, handle the case where
annotations.Annotations(g.GetAnnotations()).Pick(ge) returns ok==false instead
of ignoring it; update the annos.Pick call (in function ListExpandableGrants
within pkg/dotc1z/grants_expandable_query.go) to capture the boolean result and
if ok is false return an explicit error (including the grant external ID)
indicating the GrantExpandable annotation is missing/corrupt so we don't
silently emit empty SrcEntitlementIDs or other fields for rows marked
expandable.
a5a7e1d to
6220648
Compare
1. Just include resource types in upsert syncs because every uplift step wants them and this is easier for now. 2. Track the expandable state at the row level so we don't ahve to unmarshal literally every proto. 3. Adds logic for graph invalidation for diffing (for partial expansion of grants).
6220648 to
bf502f1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/sync/incrementalexpansion/invalidate.go`:
- Around line 51-112: The buffered grant updates in the loop over delta.Removed
can clobber each other when multiple removed edges touch the same grant; to fix
this, ensure you flush the pending updates after finishing each edge so removals
accumulate correctly: after the inner paging loop for each entry in
delta.Removed (the block that reads via store.ListGrantsForEntitlement and
appends to updates), call flush() (the same function already used when updates
>= chunkSize) to persist and clear the updates buffer before moving to the next
edge; keep existing logic for DeleteGrant and GrantImmutable checks (g.GetId(),
annotations.Annotations, GrantImmutable) unchanged.
🧹 Nitpick comments (4)
pkg/sync/expand/graph.go (1)
205-226: Avoid duplicated node-add logic to prevent future drift.
AddEntitlementIDrepeats the same steps asAddEntitlement. Consider consolidating into a shared helper (or haveAddEntitlementcallAddEntitlementID) so any future changes stay consistent.♻️ Possible refactor (route AddEntitlement through AddEntitlementID)
func (g *EntitlementGraph) AddEntitlement(entitlement *v2.Entitlement) { - // If the entitlement is already in the graph, fail silently. - found := g.GetNode(entitlement.GetId()) - if found != nil { - return - } - g.HasNoCycles = false // Reset this since we're changing the graph. - - // Start at 1 in case we don't initialize something and try to get node 0. - g.NextNodeID++ - - // Create a new node. - node := Node{ - Id: g.NextNodeID, - EntitlementIDs: []string{entitlement.GetId()}, - } - - // Add the node to the data structures. - g.Nodes[node.Id] = node - g.EntitlementsToNodes[entitlement.GetId()] = node.Id + g.AddEntitlementID(entitlement.GetId()) }pkg/dotc1z/grants_needs_expansion.go (1)
10-43: Consider batching the IN clause if grantExternalIDs becomes large, but note this pattern exists elsewhere in the codebase without batching.SQLite has a default parameter limit (~999 placeholders), and large
grantExternalIDsslices could exceed this. However,GetMany()insession_store.gouses the same IN clause pattern without batching, suggesting this may not be a practical concern for the application's current usage patterns. If parameter limits become an issue, batching should be applied consistently across similar operations (GetMany, SetNeedsExpansionForGrants, etc.) rather than in isolation.pkg/dotc1z/grants_diff_helpers.go (1)
131-132: Consider removing or clarifying the misleading compile-time check.The comment says "sanity compile check: ensure tables referenced exist in this package" but
var _ = fmt.Sprintfonly verifies thatfmtis imported—it doesn't validate thatgrants,entitlements, orresourcestable descriptors exist or are correctly referenced.Consider removing or fixing
-// sanity compile check: ensure tables referenced exist in this package. -var _ = fmt.SprintfOr if you want actual compile-time checks:
// Compile-time checks: ensure table descriptors are accessible. var ( _ = grants.Name _ = entitlements.Name _ = resources.Name )pkg/sync/incrementalexpansion/edge_delta.go (1)
21-33: MakeEdge.Keydeterministic across ordering ofPrincipalResourceTypeIDs.
If the slice order isn’t stable between syncs, identical edges can hash to different keys and cause spurious add/remove churn. Sorting a copy avoids that.♻️ Suggested fix
import ( "context" "fmt" + "sort" "strings" "github.com/conductorone/baton-sdk/pkg/dotc1z" ) @@ func (e Edge) Key() string { // Use an unlikely separator to avoid accidental collisions. sep := "\x1f" + ids := append([]string(nil), e.PrincipalResourceTypeIDs...) + sort.Strings(ids) shallow := "0" if e.Shallow { shallow = "1" } return strings.Join([]string{ e.SrcEntitlementID, e.DstEntitlementID, shallow, - strings.Join(e.PrincipalResourceTypeIDs, sep), + strings.Join(ids, sep), }, sep) }
| for _, edge := range delta.Removed { | ||
| // Entitlement is only used for filtering by entitlement_id; ID is sufficient. | ||
| ent := v2.Entitlement_builder{Id: edge.DstEntitlementID}.Build() | ||
| pageToken := "" | ||
| for { | ||
| resp, err := store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ | ||
| Entitlement: ent, | ||
| PageToken: pageToken, | ||
| PrincipalResourceTypeIds: edge.PrincipalResourceTypeIDs, | ||
| }.Build()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for _, g := range resp.GetList() { | ||
| srcs := g.GetSources().GetSources() | ||
| if len(srcs) == 0 { | ||
| continue | ||
| } | ||
| if _, ok := srcs[edge.SrcEntitlementID]; !ok { | ||
| continue | ||
| } | ||
|
|
||
| delete(srcs, edge.SrcEntitlementID) | ||
|
|
||
| // The expander adds a "self-source" (destination entitlement ID) to mark | ||
| // that a grant was originally direct. When all expansion sources are removed, | ||
| // we should also remove the self-source so the grant matches a fresh full | ||
| // expansion (which would have no sources for a direct grant). | ||
| selfSourceID := g.GetEntitlement().GetId() | ||
| if len(srcs) == 1 { | ||
| delete(srcs, selfSourceID) | ||
| } | ||
|
|
||
| if len(srcs) == 0 { | ||
| annos := annotations.Annotations(g.GetAnnotations()) | ||
| if annos.Contains(&v2.GrantImmutable{}) { | ||
| if err := store.DeleteGrant(ctx, g.GetId()); err != nil { | ||
| return err | ||
| } | ||
| continue | ||
| } | ||
| // Direct grant: clear sources entirely. | ||
| g.SetSources(nil) | ||
| } else { | ||
| g.SetSources(v2.GrantSources_builder{Sources: srcs}.Build()) | ||
| } | ||
|
|
||
| updates = append(updates, g) | ||
| if len(updates) >= chunkSize { | ||
| if err := flush(); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pageToken = resp.GetNextPageToken() | ||
| if pageToken == "" { | ||
| break | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Prevent clobbered grant updates when multiple removed edges touch the same grant.
Because updates are buffered across edges, a grant can be read and updated multiple times from stale data. That can overwrite earlier removals, and for GrantImmutable it can even reinsert a grant after a later delete. Consider flushing per edge (or deduplicating updates by grant ID) so removals accumulate correctly.
✅ Minimal correctness fix (flush per edge)
for _, edge := range delta.Removed {
// Entitlement is only used for filtering by entitlement_id; ID is sufficient.
ent := v2.Entitlement_builder{Id: edge.DstEntitlementID}.Build()
pageToken := ""
for {
resp, err := store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{
Entitlement: ent,
PageToken: pageToken,
PrincipalResourceTypeIds: edge.PrincipalResourceTypeIDs,
}.Build())
if err != nil {
return err
}
@@
if pageToken == "" {
break
}
}
+ // Ensure updates from this edge are persisted before processing the next edge.
+ if err := flush(); err != nil {
+ return err
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, edge := range delta.Removed { | |
| // Entitlement is only used for filtering by entitlement_id; ID is sufficient. | |
| ent := v2.Entitlement_builder{Id: edge.DstEntitlementID}.Build() | |
| pageToken := "" | |
| for { | |
| resp, err := store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ | |
| Entitlement: ent, | |
| PageToken: pageToken, | |
| PrincipalResourceTypeIds: edge.PrincipalResourceTypeIDs, | |
| }.Build()) | |
| if err != nil { | |
| return err | |
| } | |
| for _, g := range resp.GetList() { | |
| srcs := g.GetSources().GetSources() | |
| if len(srcs) == 0 { | |
| continue | |
| } | |
| if _, ok := srcs[edge.SrcEntitlementID]; !ok { | |
| continue | |
| } | |
| delete(srcs, edge.SrcEntitlementID) | |
| // The expander adds a "self-source" (destination entitlement ID) to mark | |
| // that a grant was originally direct. When all expansion sources are removed, | |
| // we should also remove the self-source so the grant matches a fresh full | |
| // expansion (which would have no sources for a direct grant). | |
| selfSourceID := g.GetEntitlement().GetId() | |
| if len(srcs) == 1 { | |
| delete(srcs, selfSourceID) | |
| } | |
| if len(srcs) == 0 { | |
| annos := annotations.Annotations(g.GetAnnotations()) | |
| if annos.Contains(&v2.GrantImmutable{}) { | |
| if err := store.DeleteGrant(ctx, g.GetId()); err != nil { | |
| return err | |
| } | |
| continue | |
| } | |
| // Direct grant: clear sources entirely. | |
| g.SetSources(nil) | |
| } else { | |
| g.SetSources(v2.GrantSources_builder{Sources: srcs}.Build()) | |
| } | |
| updates = append(updates, g) | |
| if len(updates) >= chunkSize { | |
| if err := flush(); err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| pageToken = resp.GetNextPageToken() | |
| if pageToken == "" { | |
| break | |
| } | |
| } | |
| } | |
| for _, edge := range delta.Removed { | |
| // Entitlement is only used for filtering by entitlement_id; ID is sufficient. | |
| ent := v2.Entitlement_builder{Id: edge.DstEntitlementID}.Build() | |
| pageToken := "" | |
| for { | |
| resp, err := store.ListGrantsForEntitlement(ctx, reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest_builder{ | |
| Entitlement: ent, | |
| PageToken: pageToken, | |
| PrincipalResourceTypeIds: edge.PrincipalResourceTypeIDs, | |
| }.Build()) | |
| if err != nil { | |
| return err | |
| } | |
| for _, g := range resp.GetList() { | |
| srcs := g.GetSources().GetSources() | |
| if len(srcs) == 0 { | |
| continue | |
| } | |
| if _, ok := srcs[edge.SrcEntitlementID]; !ok { | |
| continue | |
| } | |
| delete(srcs, edge.SrcEntitlementID) | |
| // The expander adds a "self-source" (destination entitlement ID) to mark | |
| // that a grant was originally direct. When all expansion sources are removed, | |
| // we should also remove the self-source so the grant matches a fresh full | |
| // expansion (which would have no sources for a direct grant). | |
| selfSourceID := g.GetEntitlement().GetId() | |
| if len(srcs) == 1 { | |
| delete(srcs, selfSourceID) | |
| } | |
| if len(srcs) == 0 { | |
| annos := annotations.Annotations(g.GetAnnotations()) | |
| if annos.Contains(&v2.GrantImmutable{}) { | |
| if err := store.DeleteGrant(ctx, g.GetId()); err != nil { | |
| return err | |
| } | |
| continue | |
| } | |
| // Direct grant: clear sources entirely. | |
| g.SetSources(nil) | |
| } else { | |
| g.SetSources(v2.GrantSources_builder{Sources: srcs}.Build()) | |
| } | |
| updates = append(updates, g) | |
| if len(updates) >= chunkSize { | |
| if err := flush(); err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| pageToken = resp.GetNextPageToken() | |
| if pageToken == "" { | |
| break | |
| } | |
| } | |
| // Ensure updates from this edge are persisted before processing the next edge. | |
| if err := flush(); err != nil { | |
| return err | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@pkg/sync/incrementalexpansion/invalidate.go` around lines 51 - 112, The
buffered grant updates in the loop over delta.Removed can clobber each other
when multiple removed edges touch the same grant; to fix this, ensure you flush
the pending updates after finishing each edge so removals accumulate correctly:
after the inner paging loop for each entry in delta.Removed (the block that
reads via store.ListGrantsForEntitlement and appends to updates), call flush()
(the same function already used when updates >= chunkSize) to persist and clear
the updates buffer before moving to the next edge; keep existing logic for
DeleteGrant and GrantImmutable checks (g.GetId(), annotations.Annotations,
GrantImmutable) unchanged.
Summary by CodeRabbit