From 74e058885a9e3c6808be5b257416cd3598efaf46 Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 13:17:25 -0700 Subject: [PATCH 1/9] Add is_expandable and needs_expansion columns to grants table for expansion graph --- pkg/dotc1z/grants.go | 220 +++++++++++++++++++++++++- pkg/dotc1z/grants_expandable_query.go | 205 ++++++++++++++++++++++++ pkg/dotc1z/grants_test.go | 75 +++++++++ pkg/sync/expand/graph.go | 23 +++ 4 files changed, 520 insertions(+), 3 deletions(-) create mode 100644 pkg/dotc1z/grants_expandable_query.go create mode 100644 pkg/dotc1z/grants_test.go diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index c091d7471..94edf9687 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -3,8 +3,10 @@ package dotc1z import ( "context" "fmt" + "strings" "github.com/doug-martin/goqu/v9" + "google.golang.org/protobuf/proto" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" @@ -22,6 +24,8 @@ create table if not exists %s ( principal_resource_type_id text not null, principal_resource_id text not null, external_id text not null, + is_expandable integer not null default 0, -- 1 if data contains a GrantExpandable annotation; used to build the expansion graph. + needs_expansion integer not null default 0, -- 1 if grant should be processed during expansion. data blob not null, sync_id text not null, discovered_at datetime not null @@ -59,8 +63,37 @@ func (r *grantsTable) Schema() (string, []any) { } } +// isAlreadyExistsError returns true if err is a SQLite "duplicate column name" error. +func isAlreadyExistsError(err error) bool { + return err != nil && strings.Contains(err.Error(), "duplicate column name") +} + func (r *grantsTable) Migrations(ctx context.Context, db *goqu.Database) error { - return nil + // Add is_expandable column if missing (for older files). + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "alter table %s add column is_expandable integer not null default 0", r.Name(), + )); err != nil && !isAlreadyExistsError(err) { + return err + } + + // Add needs_expansion column if missing. + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "alter table %s add column needs_expansion integer not null default 0", r.Name(), + )); err != nil && !isAlreadyExistsError(err) { + return err + } + + // Create the index only after the columns exist. + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "create index if not exists %s on %s (sync_id, needs_expansion)", + fmt.Sprintf("idx_grants_sync_needs_expansion_v%s", r.Version()), + r.Name(), + )); err != nil { + return err + } + + // Backfill from stored grant bytes for rows that haven't been classified yet. + return backfillGrantExpandableColumns(ctx, db, r.Name()) } func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error) { @@ -154,14 +187,14 @@ func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error { ctx, span := tracer.Start(ctx, "C1File.PutGrants") defer span.End() - return c.putGrantsInternal(ctx, bulkPutConnectorObject, bulkGrants...) + return c.putGrantsInternal(ctx, bulkPutGrants, bulkGrants...) } func (c *C1File) PutGrantsIfNewer(ctx context.Context, bulkGrants ...*v2.Grant) error { ctx, span := tracer.Start(ctx, "C1File.PutGrantsIfNewer") defer span.End() - return c.putGrantsInternal(ctx, bulkPutConnectorObjectIfNewer, bulkGrants...) + return c.putGrantsInternal(ctx, bulkPutGrantsIfNewer, bulkGrants...) } type grantPutFunc func(context.Context, *C1File, string, func(m *v2.Grant) (goqu.Record, error), ...*v2.Grant) error @@ -173,12 +206,16 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran err := f(ctx, c, grants.Name(), func(grant *v2.Grant) (goqu.Record, error) { + isExpandable, needsExpansion := grantExpandableColumns(grant) + return goqu.Record{ "resource_type_id": grant.GetEntitlement().GetResource().GetId().GetResourceType(), "resource_id": grant.GetEntitlement().GetResource().GetId().GetResource(), "entitlement_id": grant.GetEntitlement().GetId(), "principal_resource_type_id": grant.GetPrincipal().GetId().GetResourceType(), "principal_resource_id": grant.GetPrincipal().GetId().GetResource(), + "is_expandable": isExpandable, + "needs_expansion": needsExpansion, }, nil }, bulkGrants..., @@ -190,6 +227,183 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran return nil } +// grantExpandableColumns returns (is_expandable, needs_expansion). +// is_expandable is 1 if the grant has a valid GrantExpandable annotation, 0 otherwise. +func grantExpandableColumns(grant *v2.Grant) (int, int) { + annos := annotations.Annotations(grant.GetAnnotations()) + expandable := &v2.GrantExpandable{} + ok, err := annos.Pick(expandable) + if err != nil || !ok || len(expandable.GetEntitlementIds()) == 0 { + return 0, 0 + } + + // Check that there's at least one non-whitespace entitlement ID. + for _, id := range expandable.GetEntitlementIds() { + if strings.TrimSpace(id) != "" { + // On initial insert, we want expandable grants to be picked up by expansion. + // On updates, bulkPutGrants* preserves needs_expansion unless is_expandable changes. + return 1, 1 + } + } + return 0, 0 +} + +func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tableName string) error { + // Scan for rows that contain "GrantExpandable" in the proto blob but haven't been + // backfilled yet (is_expandable=0). The LIKE filter skips the 99%+ of rows that + // don't have expandable annotations, making this fast even on large tables. + for { + rows, err := db.QueryContext(ctx, fmt.Sprintf( + `SELECT id, data FROM %s + WHERE is_expandable=0 AND data LIKE '%%GrantExpandable%%' + LIMIT 1000`, + tableName, + )) + if err != nil { + return err + } + + type row struct { + id int64 + data []byte + } + batch := make([]row, 0, 1000) + for rows.Next() { + var r row + if err := rows.Scan(&r.id, &r.data); err != nil { + _ = rows.Close() + return err + } + batch = append(batch, r) + } + if err := rows.Err(); err != nil { + _ = rows.Close() + return err + } + _ = rows.Close() + + if len(batch) == 0 { + return nil + } + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + stmt, err := tx.PrepareContext(ctx, fmt.Sprintf( + `UPDATE %s SET is_expandable=?, needs_expansion=? WHERE id=?`, + tableName, + )) + if err != nil { + _ = tx.Rollback() + return err + } + + for _, r := range batch { + g := &v2.Grant{} + if err := proto.Unmarshal(r.data, g); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + isExpandable, needsExpansion := grantExpandableColumns(g) + // Only update if we found a valid expandable annotation. + // Rows with "GrantExpandable" in the blob but no valid annotation are skipped. + if isExpandable == 0 { + continue + } + if _, err := stmt.ExecContext(ctx, isExpandable, needsExpansion, r.id); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + } + + _ = stmt.Close() + if err := tx.Commit(); err != nil { + return err + } + } +} + +func bulkPutGrants( + ctx context.Context, c *C1File, + tableName string, + extractFields func(m *v2.Grant) (goqu.Record, error), + msgs ...*v2.Grant, +) error { + return bulkPutGrantsInternal(ctx, c, tableName, extractFields, false, msgs...) +} + +func bulkPutGrantsIfNewer( + ctx context.Context, c *C1File, + tableName string, + extractFields func(m *v2.Grant) (goqu.Record, error), + msgs ...*v2.Grant, +) error { + return bulkPutGrantsInternal(ctx, c, tableName, extractFields, true, msgs...) +} + +func bulkPutGrantsInternal( + ctx context.Context, c *C1File, + tableName string, + extractFields func(m *v2.Grant) (goqu.Record, error), + ifNewer bool, + msgs ...*v2.Grant, +) error { + if len(msgs) == 0 { + return nil + } + ctx, span := tracer.Start(ctx, "C1File.bulkPutGrants") + defer span.End() + + if err := c.validateSyncDb(ctx); err != nil { + return err + } + + // Prepare rows. + rows, err := prepareConnectorObjectRows(c, msgs, extractFields) + if err != nil { + return err + } + + // needs_expansion should only flip to 1 when is_expandable changes. + // If a grant is no longer expandable (is_expandable=0), needs_expansion should be forced to 0. + needsExpansionExpr := goqu.L( + `CASE + WHEN EXCLUDED.is_expandable = 0 THEN 0 + WHEN EXCLUDED.is_expandable != ?.is_expandable THEN 1 + ELSE ?.needs_expansion + END`, + goqu.I(tableName), goqu.I(tableName), + ) + + buildQueryFn := func(insertDs *goqu.InsertDataset, chunkedRows []*goqu.Record) (*goqu.InsertDataset, error) { + update := goqu.Record{ + "data": goqu.I("EXCLUDED.data"), + "is_expandable": goqu.I("EXCLUDED.is_expandable"), + "needs_expansion": needsExpansionExpr, + } + if ifNewer { + update["discovered_at"] = goqu.I("EXCLUDED.discovered_at") + return insertDs. + OnConflict(goqu.DoUpdate("external_id, sync_id", update).Where( + goqu.L("EXCLUDED.discovered_at > ?.discovered_at", goqu.I(tableName)), + )). + Rows(chunkedRows). + Prepared(true), nil + } + + return insertDs. + OnConflict(goqu.DoUpdate("external_id, sync_id", update)). + Rows(chunkedRows). + Prepared(true), nil + } + + return executeChunkedInsert(ctx, c, tableName, rows, buildQueryFn) +} + func (c *C1File) DeleteGrant(ctx context.Context, grantId string) error { ctx, span := tracer.Start(ctx, "C1File.DeleteGrant") defer span.End() diff --git a/pkg/dotc1z/grants_expandable_query.go b/pkg/dotc1z/grants_expandable_query.go new file mode 100644 index 000000000..5c17c58ec --- /dev/null +++ b/pkg/dotc1z/grants_expandable_query.go @@ -0,0 +1,205 @@ +package dotc1z + +import ( + "context" + "database/sql" + "fmt" + "strconv" + + "github.com/doug-martin/goqu/v9" + "google.golang.org/protobuf/proto" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/annotations" +) + +// ExpandableGrantDef is a lightweight representation of an expandable grant row, +// using queryable columns instead of unmarshalling the full grant proto. +type ExpandableGrantDef struct { + RowID int64 + GrantExternalID string + DstEntitlementID string + PrincipalResourceTypeID string + PrincipalResourceID string + SrcEntitlementIDs []string + Shallow bool + PrincipalResourceTypeIDs []string + NeedsExpansion bool +} + +type listExpandableGrantsOptions struct { + pageToken string + pageSize uint32 + needsExpansionOnly bool + syncID string +} + +type ListExpandableGrantsOption func(*listExpandableGrantsOptions) + +func WithExpandableGrantsPageToken(t string) ListExpandableGrantsOption { + return func(o *listExpandableGrantsOptions) { o.pageToken = t } +} + +func WithExpandableGrantsPageSize(n uint32) ListExpandableGrantsOption { + return func(o *listExpandableGrantsOptions) { o.pageSize = n } +} + +func WithExpandableGrantsNeedsExpansionOnly(b bool) ListExpandableGrantsOption { + return func(o *listExpandableGrantsOptions) { o.needsExpansionOnly = b } +} + +// WithExpandableGrantsSyncID forces listing expandable grants for a specific sync id. +// If omitted, we default to the current sync id, then view sync id, then latest finished sync. +func WithExpandableGrantsSyncID(syncID string) ListExpandableGrantsOption { + return func(o *listExpandableGrantsOptions) { o.syncID = syncID } +} + +// ListExpandableGrants lists expandable grants using the grants table's queryable columns. +// It avoids scanning/unmarshalling all grants. +func (c *C1File) ListExpandableGrants(ctx context.Context, opts ...ListExpandableGrantsOption) ([]*ExpandableGrantDef, string, error) { + ctx, span := tracer.Start(ctx, "C1File.ListExpandableGrants") + defer span.End() + + if err := c.validateDb(ctx); err != nil { + return nil, "", err + } + + o := &listExpandableGrantsOptions{} + for _, opt := range opts { + opt(o) + } + + syncID, err := c.resolveSyncIDForInternalQuery(ctx, o.syncID) + if err != nil { + return nil, "", err + } + + q := c.db.From(grants.Name()).Prepared(true) + q = q.Select( + "id", + "external_id", + "entitlement_id", + "principal_resource_type_id", + "principal_resource_id", + "data", + "needs_expansion", + ) + q = q.Where(goqu.C("sync_id").Eq(syncID)) + q = q.Where(goqu.C("is_expandable").Eq(1)) + if o.needsExpansionOnly { + q = q.Where(goqu.C("needs_expansion").Eq(1)) + } + + if o.pageToken != "" { + // Page token is the grants table row ID. + id, err := strconv.ParseInt(o.pageToken, 10, 64) + if err != nil { + return nil, "", fmt.Errorf("invalid expandable grants page token %q: %w", o.pageToken, err) + } + q = q.Where(goqu.C("id").Gte(id)) + } + + pageSize := o.pageSize + if pageSize > maxPageSize || pageSize == 0 { + pageSize = maxPageSize + } + q = q.Order(goqu.C("id").Asc()).Limit(uint(pageSize + 1)) + + query, args, err := q.ToSQL() + if err != nil { + return nil, "", err + } + + rows, err := c.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, "", err + } + defer rows.Close() + + defs := make([]*ExpandableGrantDef, 0, pageSize) + var ( + count uint32 + lastRow int64 + ) + for rows.Next() { + count++ + if count > pageSize { + break + } + + var ( + rowID int64 + externalID string + dstEntitlementID string + principalRTID string + principalRID string + dataBlob []byte + needsExpansionInt int + ) + + if err := rows.Scan( + &rowID, + &externalID, + &dstEntitlementID, + &principalRTID, + &principalRID, + &dataBlob, + &needsExpansionInt, + ); err != nil { + return nil, "", err + } + lastRow = rowID + + var g v2.Grant + if err := proto.Unmarshal(dataBlob, &g); err != nil { + return nil, "", fmt.Errorf("invalid grant data for %q: %w", externalID, err) + } + + annos := annotations.Annotations(g.GetAnnotations()) + ge := &v2.GrantExpandable{} + if _, err := annos.Pick(ge); err != nil { + return nil, "", fmt.Errorf("failed to extract GrantExpandable from grant %q: %w", externalID, err) + } + + defs = append(defs, &ExpandableGrantDef{ + RowID: rowID, + GrantExternalID: externalID, + DstEntitlementID: dstEntitlementID, + PrincipalResourceTypeID: principalRTID, + PrincipalResourceID: principalRID, + SrcEntitlementIDs: ge.GetEntitlementIds(), + Shallow: ge.GetShallow(), + PrincipalResourceTypeIDs: ge.GetResourceTypeIds(), + NeedsExpansion: needsExpansionInt != 0, + }) + } + if err := rows.Err(); err != nil { + return nil, "", err + } + + nextPageToken := "" + if count > pageSize { + nextPageToken = strconv.FormatInt(lastRow+1, 10) + } + return defs, nextPageToken, nil +} + +func (c *C1File) resolveSyncIDForInternalQuery(ctx context.Context, forced string) (string, error) { + switch { + case forced != "": + return forced, nil + case c.currentSyncID != "": + return c.currentSyncID, nil + case c.viewSyncID != "": + return c.viewSyncID, nil + default: + latest, err := c.getCachedViewSyncRun(ctx) + if err != nil { + return "", err + } + if latest == nil { + return "", sql.ErrNoRows + } + return latest.ID, nil + } +} diff --git a/pkg/dotc1z/grants_test.go b/pkg/dotc1z/grants_test.go new file mode 100644 index 000000000..c4310d261 --- /dev/null +++ b/pkg/dotc1z/grants_test.go @@ -0,0 +1,75 @@ +package dotc1z + +import ( + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" +) + +func TestGrantExpandableColumns_WhitespaceOnlyEntitlementIDs(t *testing.T) { + // Create a GrantExpandable annotation with only whitespace entitlement IDs. + // After checking, this should result in is_expandable=0 because + // there are no valid source entitlements. + expandable := v2.GrantExpandable_builder{ + EntitlementIds: []string{" ", "\t", " \n "}, + }.Build() + + expandableAny, err := anypb.New(expandable) + require.NoError(t, err) + + grant := v2.Grant_builder{ + Id: "test-grant", + Entitlement: v2.Entitlement_builder{ + Id: "test-entitlement", + }.Build(), + Principal: v2.Resource_builder{ + Id: v2.ResourceId_builder{ + ResourceType: "user", + Resource: "user1", + }.Build(), + }.Build(), + Annotations: []*anypb.Any{expandableAny}, + }.Build() + + isExpandable, needsExpansion := grantExpandableColumns(grant) + + // After checking, this grant should NOT be marked as expandable + // since there are no valid source entitlements. + require.Equal(t, 0, isExpandable, "grant with only whitespace entitlement IDs should not be expandable") + require.Equal(t, 0, needsExpansion, "grant with no valid source entitlements should not need expansion") +} + +func TestGrantExpandableColumns_MixedWhitespaceAndValidIDs(t *testing.T) { + // Create a GrantExpandable annotation with a mix of whitespace and valid IDs. + // The grant should still be expandable with only the valid IDs. + expandable := v2.GrantExpandable_builder{ + EntitlementIds: []string{" ", "valid-entitlement-id", "\t"}, + Shallow: true, + }.Build() + + expandableAny, err := anypb.New(expandable) + require.NoError(t, err) + + grant := v2.Grant_builder{ + Id: "test-grant", + Entitlement: v2.Entitlement_builder{ + Id: "test-entitlement", + }.Build(), + Principal: v2.Resource_builder{ + Id: v2.ResourceId_builder{ + ResourceType: "user", + Resource: "user1", + }.Build(), + }.Build(), + Annotations: []*anypb.Any{expandableAny}, + }.Build() + + isExpandable, needsExpansion := grantExpandableColumns(grant) + + // Should still be expandable because there's at least one valid entitlement ID. + require.Equal(t, 1, isExpandable, "grant with valid entitlement ID should be expandable") + require.Equal(t, 1, needsExpansion, "expandable grant should need expansion on insert") +} diff --git a/pkg/sync/expand/graph.go b/pkg/sync/expand/graph.go index 4dce1bafd..6ff1051f3 100644 --- a/pkg/sync/expand/graph.go +++ b/pkg/sync/expand/graph.go @@ -202,6 +202,29 @@ func (g *EntitlementGraph) AddEntitlement(entitlement *v2.Entitlement) { g.EntitlementsToNodes[entitlement.GetId()] = node.Id } +// AddEntitlementID adds an entitlement ID as an unconnected node in the graph. +// This is a convenience for callers that already have entitlement IDs and do not +// want to fetch/unmarshal full entitlement protos. +func (g *EntitlementGraph) AddEntitlementID(entitlementID string) { + // If the entitlement is already in the graph, fail silently. + found := g.GetNode(entitlementID) + if found != nil { + return + } + g.HasNoCycles = false // Reset this since we're changing the graph. + + // Start at 1 in case we don't initialize something and try to get node 0. + g.NextNodeID++ + + node := Node{ + Id: g.NextNodeID, + EntitlementIDs: []string{entitlementID}, + } + + g.Nodes[node.Id] = node + g.EntitlementsToNodes[entitlementID] = node.Id +} + // GetEntitlements returns a combined list of _all_ entitlements from all nodes. func (g *EntitlementGraph) GetEntitlements() []string { var entitlements []string From f16c9a885ce2579fb6e1408095e747abc4cfa873 Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 14:46:11 -0700 Subject: [PATCH 2/9] expand graph using the new functionality (columns) --- pkg/connectorstore/connectorstore.go | 7 ++ pkg/dotc1z/grants.go | 25 +++-- pkg/dotc1z/grants_test.go | 22 ++--- pkg/dotc1z/sync_runs.go | 78 ++++++++++++---- pkg/sync/syncer.go | 131 +++++++++++++++++++++------ pkg/sync/syncer_test.go | 9 +- 6 files changed, 189 insertions(+), 83 deletions(-) diff --git a/pkg/connectorstore/connectorstore.go b/pkg/connectorstore/connectorstore.go index 65793949e..0718eb987 100644 --- a/pkg/connectorstore/connectorstore.go +++ b/pkg/connectorstore/connectorstore.go @@ -70,3 +70,10 @@ type Writer interface { PutEntitlements(ctx context.Context, entitlements ...*v2.Entitlement) error DeleteGrant(ctx context.Context, grantId string) error } + +// ExpansionStore provides methods for grant expansion operations. +// Not all store implementations support expansion; callers should type-assert. +type ExpansionStore interface { + // SetExpansionStarted marks the given sync as having started expansion. + SetExpansionStarted(ctx context.Context, syncID string) error +} diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index 94edf9687..e99af42f5 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -206,7 +206,7 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran err := f(ctx, c, grants.Name(), func(grant *v2.Grant) (goqu.Record, error) { - isExpandable, needsExpansion := grantExpandableColumns(grant) + expandable := isGrantExpandable(grant) return goqu.Record{ "resource_type_id": grant.GetEntitlement().GetResource().GetId().GetResourceType(), @@ -214,8 +214,8 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran "entitlement_id": grant.GetEntitlement().GetId(), "principal_resource_type_id": grant.GetPrincipal().GetId().GetResourceType(), "principal_resource_id": grant.GetPrincipal().GetId().GetResource(), - "is_expandable": isExpandable, - "needs_expansion": needsExpansion, + "is_expandable": expandable, + "needs_expansion": expandable, }, nil }, bulkGrants..., @@ -227,25 +227,23 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran return nil } -// grantExpandableColumns returns (is_expandable, needs_expansion). -// is_expandable is 1 if the grant has a valid GrantExpandable annotation, 0 otherwise. -func grantExpandableColumns(grant *v2.Grant) (int, int) { +// isGrantExpandable returns true if the grant has a valid GrantExpandable annotation +// with at least one non-whitespace entitlement ID. +func isGrantExpandable(grant *v2.Grant) bool { annos := annotations.Annotations(grant.GetAnnotations()) expandable := &v2.GrantExpandable{} ok, err := annos.Pick(expandable) if err != nil || !ok || len(expandable.GetEntitlementIds()) == 0 { - return 0, 0 + return false } // Check that there's at least one non-whitespace entitlement ID. for _, id := range expandable.GetEntitlementIds() { if strings.TrimSpace(id) != "" { - // On initial insert, we want expandable grants to be picked up by expansion. - // On updates, bulkPutGrants* preserves needs_expansion unless is_expandable changes. - return 1, 1 + return true } } - return 0, 0 + return false } func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tableName string) error { @@ -307,13 +305,12 @@ func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tabl _ = tx.Rollback() return err } - isExpandable, needsExpansion := grantExpandableColumns(g) // Only update if we found a valid expandable annotation. // Rows with "GrantExpandable" in the blob but no valid annotation are skipped. - if isExpandable == 0 { + if !isGrantExpandable(g) { continue } - if _, err := stmt.ExecContext(ctx, isExpandable, needsExpansion, r.id); err != nil { + if _, err := stmt.ExecContext(ctx, true, true, r.id); err != nil { _ = stmt.Close() _ = tx.Rollback() return err diff --git a/pkg/dotc1z/grants_test.go b/pkg/dotc1z/grants_test.go index c4310d261..6b5e4d754 100644 --- a/pkg/dotc1z/grants_test.go +++ b/pkg/dotc1z/grants_test.go @@ -9,10 +9,9 @@ import ( v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" ) -func TestGrantExpandableColumns_WhitespaceOnlyEntitlementIDs(t *testing.T) { +func TestIsGrantExpandable_WhitespaceOnlyEntitlementIDs(t *testing.T) { // Create a GrantExpandable annotation with only whitespace entitlement IDs. - // After checking, this should result in is_expandable=0 because - // there are no valid source entitlements. + // This should return false because there are no valid source entitlements. expandable := v2.GrantExpandable_builder{ EntitlementIds: []string{" ", "\t", " \n "}, }.Build() @@ -34,17 +33,12 @@ func TestGrantExpandableColumns_WhitespaceOnlyEntitlementIDs(t *testing.T) { Annotations: []*anypb.Any{expandableAny}, }.Build() - isExpandable, needsExpansion := grantExpandableColumns(grant) - - // After checking, this grant should NOT be marked as expandable - // since there are no valid source entitlements. - require.Equal(t, 0, isExpandable, "grant with only whitespace entitlement IDs should not be expandable") - require.Equal(t, 0, needsExpansion, "grant with no valid source entitlements should not need expansion") + require.False(t, isGrantExpandable(grant), "grant with only whitespace entitlement IDs should not be expandable") } -func TestGrantExpandableColumns_MixedWhitespaceAndValidIDs(t *testing.T) { +func TestIsGrantExpandable_MixedWhitespaceAndValidIDs(t *testing.T) { // Create a GrantExpandable annotation with a mix of whitespace and valid IDs. - // The grant should still be expandable with only the valid IDs. + // The grant should still be expandable because there's at least one valid ID. expandable := v2.GrantExpandable_builder{ EntitlementIds: []string{" ", "valid-entitlement-id", "\t"}, Shallow: true, @@ -67,9 +61,5 @@ func TestGrantExpandableColumns_MixedWhitespaceAndValidIDs(t *testing.T) { Annotations: []*anypb.Any{expandableAny}, }.Build() - isExpandable, needsExpansion := grantExpandableColumns(grant) - - // Should still be expandable because there's at least one valid entitlement ID. - require.Equal(t, 1, isExpandable, "grant with valid entitlement ID should be expandable") - require.Equal(t, 1, needsExpansion, "expandable grant should need expansion on insert") + require.True(t, isGrantExpandable(grant), "grant with valid entitlement ID should be expandable") } diff --git a/pkg/dotc1z/sync_runs.go b/pkg/dotc1z/sync_runs.go index 4426a3e57..9408e5157 100644 --- a/pkg/dotc1z/sync_runs.go +++ b/pkg/dotc1z/sync_runs.go @@ -33,7 +33,8 @@ create table if not exists %s ( sync_token text not null, sync_type text not null default 'full', parent_sync_id text not null default '', - linked_sync_id text not null default '' + linked_sync_id text not null default '', + expansion_started_at datetime ); create unique index if not exists %s on %s (sync_id);` @@ -97,17 +98,31 @@ func (r *syncRunsTable) Migrations(ctx context.Context, db *goqu.Database) error } } + // Check if expansion_started_at column exists + var expansionStartedAtExists int + err = db.QueryRowContext(ctx, fmt.Sprintf("select count(*) from pragma_table_info('%s') where name='expansion_started_at'", r.Name())).Scan(&expansionStartedAtExists) + if err != nil { + return err + } + if expansionStartedAtExists == 0 { + _, err = db.ExecContext(ctx, fmt.Sprintf("alter table %s add column expansion_started_at datetime", r.Name())) + if err != nil { + return err + } + } + return nil } type syncRun struct { - ID string - StartedAt *time.Time - EndedAt *time.Time - SyncToken string - Type connectorstore.SyncType - ParentSyncID string - LinkedSyncID string + ID string + StartedAt *time.Time + EndedAt *time.Time + SyncToken string + Type connectorstore.SyncType + ParentSyncID string + LinkedSyncID string + ExpansionStartedAt *time.Time } // getCachedViewSyncRun returns the cached sync run for read operations. @@ -159,7 +174,7 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connector oneWeekAgo := time.Now().AddDate(0, 0, -7) ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") q = q.Where(goqu.C("ended_at").IsNull()) q = q.Where(goqu.C("started_at").Gte(oneWeekAgo)) q = q.Order(goqu.C("started_at").Desc()) @@ -175,7 +190,7 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connector row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.ExpansionStartedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -202,7 +217,7 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint, syncType conn ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") q = q.Where(goqu.C("ended_at").IsNotNull()) if syncType != connectorstore.SyncTypeAny { q = q.Where(goqu.C("sync_type").Eq(syncType)) @@ -221,7 +236,7 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint, syncType conn row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.ExpansionStartedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -242,7 +257,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } q := c.db.From(syncRuns.Name()).Prepared(true) - q = q.Select("id", "sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("id", "sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") if pageToken != "" { q = q.Where(goqu.C("id").Gte(pageToken)) @@ -277,7 +292,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } rowId := 0 data := &syncRun{} - err := rows.Scan(&rowId, &data.ID, &data.StartedAt, &data.EndedAt, &data.SyncToken, &data.Type, &data.ParentSyncID, &data.LinkedSyncID) + err := rows.Scan(&rowId, &data.ID, &data.StartedAt, &data.EndedAt, &data.SyncToken, &data.Type, &data.ParentSyncID, &data.LinkedSyncID, &data.ExpansionStartedAt) if err != nil { return nil, "", err } @@ -366,7 +381,7 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") q = q.Where(goqu.C("sync_id").Eq(syncID)) query, args, err := q.ToSQL() @@ -374,7 +389,7 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { return nil, err } row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.ExpansionStartedAt) if err != nil { return nil, err } @@ -658,6 +673,37 @@ func (c *C1File) endSyncRun(ctx context.Context, syncID string) error { return nil } +// SetExpansionStarted marks the given sync as having started expansion. +// This marker is used to detect syncs that expanded with older code that dropped annotations. +func (c *C1File) SetExpansionStarted(ctx context.Context, syncID string) error { + ctx, span := tracer.Start(ctx, "C1File.SetExpansionStarted") + defer span.End() + + if c.readOnly { + return ErrReadOnly + } + + q := c.db.Update(syncRuns.Name()) + q = q.Set(goqu.Record{ + "expansion_started_at": time.Now().Format("2006-01-02 15:04:05.999999999"), + }) + q = q.Where(goqu.C("sync_id").Eq(syncID)) + q = q.Where(goqu.C("expansion_started_at").IsNull()) + + query, args, err := q.ToSQL() + if err != nil { + return err + } + + _, err = c.db.ExecContext(ctx, query, args...) + if err != nil { + return err + } + c.dbUpdated = true + + return nil +} + func (c *C1File) Cleanup(ctx context.Context) error { ctx, span := tracer.Start(ctx, "C1File.Cleanup") defer span.End() diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 532cc8e47..281aaba23 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -704,6 +704,15 @@ func (s *syncer) Sync(ctx context.Context) error { continue case SyncGrantExpansionOp: + // Mark the sync as having reached the expansion phase. This is set unconditionally + // so that diffs can detect syncs created with older code that dropped annotations. + if expansionStore, ok := s.store.(connectorstore.ExpansionStore); ok { + if err := expansionStore.SetExpansionStarted(ctx, s.syncID); err != nil { + l.Error("failed to set expansion started marker", zap.Error(err)) + return err + } + } + if s.dontExpandGrants || !s.state.NeedsExpansion() { l.Debug("skipping grant expansion, no grants to expand") s.state.FinishAction(ctx) @@ -1725,6 +1734,88 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle s.handleInitialActionForStep(ctx, *s.state.Current()) } + // Prefer a fast-path that scans only expandable grants using queryable columns, + // avoiding full protobuf unmarshalling across all grants. + type expandableGrantLister interface { + ListExpandableGrants(ctx context.Context, opts ...dotc1z.ListExpandableGrantsOption) ([]*dotc1z.ExpandableGrantDef, string, error) + } + if lister, ok := s.store.(expandableGrantLister); ok { + defs, nextPageToken, err := lister.ListExpandableGrants( + ctx, + dotc1z.WithExpandableGrantsPageToken(pageToken), + dotc1z.WithExpandableGrantsNeedsExpansionOnly(true), + ) + if err != nil { + return err + } + + // Handle pagination + if nextPageToken != "" { + if err := s.state.NextPage(ctx, nextPageToken); err != nil { + return err + } + } else { + l.Debug("Finished loading expandable grants to expand") + graph.Loaded = true + } + + for _, def := range defs { + principalID := v2.ResourceId_builder{ + ResourceType: def.PrincipalResourceTypeID, + Resource: def.PrincipalResourceID, + }.Build() + + for _, srcEntitlementID := range def.SrcEntitlementIDs { + // Preserve the same validation semantics as processGrantForGraph. + srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ + EntitlementId: srcEntitlementID, + }.Build()) + if err != nil { + // Only skip not-found entitlements; propagate other errors + // to avoid silently dropping edges and yielding incorrect expansions. + if errors.Is(err, sql.ErrNoRows) { + l.Debug("source entitlement not found, skipping edge", + zap.String("src_entitlement_id", srcEntitlementID), + zap.String("dst_entitlement_id", def.DstEntitlementID), + ) + continue + } + l.Error("error fetching source entitlement", + zap.String("src_entitlement_id", srcEntitlementID), + zap.String("dst_entitlement_id", def.DstEntitlementID), + zap.Error(err), + ) + return err + } + + sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() + if sourceEntitlementResourceID == nil { + return fmt.Errorf("source entitlement resource id was nil") + } + if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || + principalID.GetResource() != sourceEntitlementResourceID.GetResource() { + l.Error( + "source entitlement resource id did not match grant principal id", + zap.String("grant_principal_id", principalID.String()), + zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String())) + + return fmt.Errorf("source entitlement resource id did not match grant principal id") + } + + graph.AddEntitlementID(def.DstEntitlementID) + graph.AddEntitlementID(srcEntitlementID) + if err := graph.AddEdge(ctx, srcEntitlementID, def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs); err != nil { + return fmt.Errorf("error adding edge to graph: %w", err) + } + } + } + + if graph.Loaded { + l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) + } + return nil + } + resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build()) if err != nil { return err @@ -1742,40 +1833,11 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle } // Process grants and add edges to the graph - updatedGrants := make([]*v2.Grant, 0) for _, grant := range resp.GetList() { err := s.processGrantForGraph(ctx, grant, graph) if err != nil { return err } - - // Remove expandable annotation from descendant grant now that we've added it to the graph. - // That way if this sync is part of a compaction, expanding grants at the end of compaction won't redo work. - newAnnos := make(annotations.Annotations, 0) - updated := false - for _, anno := range grant.GetAnnotations() { - if anno.MessageIs(&v2.GrantExpandable{}) { - updated = true - } else { - newAnnos = append(newAnnos, anno) - } - } - if !updated { - continue - } - - grant.SetAnnotations(newAnnos) - l.Debug("removed expandable annotation from grant", zap.String("grant_id", grant.GetId())) - updatedGrants = append(updatedGrants, grant) - updatedGrants, err = expand.PutGrantsInChunks(ctx, s.store, updatedGrants, 10000) - if err != nil { - return err - } - } - - _, err = expand.PutGrantsInChunks(ctx, s.store, updatedGrants, 0) - if err != nil { - return err } if graph.Loaded { @@ -1814,12 +1876,21 @@ func (s *syncer) processGrantForGraph(ctx context.Context, grant *v2.Grant, grap EntitlementId: srcEntitlementID, }.Build()) if err != nil { + // Only skip not-found entitlements; propagate other errors + // to avoid silently dropping edges and yielding incorrect expansions. + if errors.Is(err, sql.ErrNoRows) { + l.Debug("source entitlement not found, skipping edge", + zap.String("src_entitlement_id", srcEntitlementID), + zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), + ) + continue + } l.Error("error fetching source entitlement", zap.String("src_entitlement_id", srcEntitlementID), zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), zap.Error(err), ) - continue + return err } // The expand annotation points at entitlements by id. Those entitlements' resource should match diff --git a/pkg/sync/syncer_test.go b/pkg/sync/syncer_test.go index 8ea1b85f8..2324c3a25 100644 --- a/pkg/sync/syncer_test.go +++ b/pkg/sync/syncer_test.go @@ -108,13 +108,8 @@ func TestExpandGrants(t *testing.T) { } } require.Len(t, allGrants, expectedGrantCount, "should have %d grants but got %d", expectedGrantCount, len(allGrants)) - for _, grant := range allGrants { - annos := annotations.Annotations(grant.GetAnnotations()) - expandable := &v2.GrantExpandable{} - ok, err := annos.Pick(expandable) - require.NoError(t, err) - require.False(t, ok, "grants are expanded, but grant %s has expandable annotation with entitlement ids %v", grant.GetId(), expandable.GetEntitlementIds()) - } + // Note: We no longer strip GrantExpandable from stored grants during expansion. + // Expansion bookkeeping lives outside the grant proto so diffs can safely compare data bytes. } func TestInvalidResourceTypeFilter(t *testing.T) { From e792376185e9ba2c8bfde881111266a2696410c5 Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 15:12:40 -0700 Subject: [PATCH 3/9] remove dead code --- pkg/sync/syncer.go | 215 ++++++++++++--------------------------------- 1 file changed, 56 insertions(+), 159 deletions(-) diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 281aaba23..80dbfac1c 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1739,188 +1739,85 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle type expandableGrantLister interface { ListExpandableGrants(ctx context.Context, opts ...dotc1z.ListExpandableGrantsOption) ([]*dotc1z.ExpandableGrantDef, string, error) } - if lister, ok := s.store.(expandableGrantLister); ok { - defs, nextPageToken, err := lister.ListExpandableGrants( - ctx, - dotc1z.WithExpandableGrantsPageToken(pageToken), - dotc1z.WithExpandableGrantsNeedsExpansionOnly(true), - ) - if err != nil { - return err - } - // Handle pagination - if nextPageToken != "" { - if err := s.state.NextPage(ctx, nextPageToken); err != nil { - return err - } - } else { - l.Debug("Finished loading expandable grants to expand") - graph.Loaded = true - } - - for _, def := range defs { - principalID := v2.ResourceId_builder{ - ResourceType: def.PrincipalResourceTypeID, - Resource: def.PrincipalResourceID, - }.Build() - - for _, srcEntitlementID := range def.SrcEntitlementIDs { - // Preserve the same validation semantics as processGrantForGraph. - srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ - EntitlementId: srcEntitlementID, - }.Build()) - if err != nil { - // Only skip not-found entitlements; propagate other errors - // to avoid silently dropping edges and yielding incorrect expansions. - if errors.Is(err, sql.ErrNoRows) { - l.Debug("source entitlement not found, skipping edge", - zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", def.DstEntitlementID), - ) - continue - } - l.Error("error fetching source entitlement", - zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", def.DstEntitlementID), - zap.Error(err), - ) - return err - } - - sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() - if sourceEntitlementResourceID == nil { - return fmt.Errorf("source entitlement resource id was nil") - } - if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || - principalID.GetResource() != sourceEntitlementResourceID.GetResource() { - l.Error( - "source entitlement resource id did not match grant principal id", - zap.String("grant_principal_id", principalID.String()), - zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String())) - - return fmt.Errorf("source entitlement resource id did not match grant principal id") - } - - graph.AddEntitlementID(def.DstEntitlementID) - graph.AddEntitlementID(srcEntitlementID) - if err := graph.AddEdge(ctx, srcEntitlementID, def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs); err != nil { - return fmt.Errorf("error adding edge to graph: %w", err) - } - } - } - - if graph.Loaded { - l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) - } - return nil + lister, ok := s.store.(expandableGrantLister) + if !ok { + return fmt.Errorf("store does not support ListExpandableGrants") } - resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build()) + defs, nextPageToken, err := lister.ListExpandableGrants( + ctx, + dotc1z.WithExpandableGrantsPageToken(pageToken), + dotc1z.WithExpandableGrantsNeedsExpansionOnly(true), + ) if err != nil { return err } // Handle pagination - if resp.GetNextPageToken() != "" { - err = s.state.NextPage(ctx, resp.GetNextPageToken()) - if err != nil { + if nextPageToken != "" { + if err := s.state.NextPage(ctx, nextPageToken); err != nil { return err } } else { - l.Debug("Finished loading grants to expand") + l.Debug("Finished loading expandable grants to expand") graph.Loaded = true } - // Process grants and add edges to the graph - for _, grant := range resp.GetList() { - err := s.processGrantForGraph(ctx, grant, graph) - if err != nil { - return err - } - } - - if graph.Loaded { - l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) - } - return nil -} - -// processGrantForGraph examines a grant for expandable annotations and adds edges to the graph. -func (s *syncer) processGrantForGraph(ctx context.Context, grant *v2.Grant, graph *expand.EntitlementGraph) error { - l := ctxzap.Extract(ctx) - - annos := annotations.Annotations(grant.GetAnnotations()) - expandable := &v2.GrantExpandable{} - _, err := annos.Pick(expandable) - if err != nil { - return err - } - if len(expandable.GetEntitlementIds()) == 0 { - return nil - } - - principalID := grant.GetPrincipal().GetId() - if principalID == nil { - return fmt.Errorf("principal id was nil") - } - - for _, srcEntitlementID := range expandable.GetEntitlementIds() { - l.Debug( - "Expandable entitlement found", - zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), - ) + for _, def := range defs { + principalID := v2.ResourceId_builder{ + ResourceType: def.PrincipalResourceTypeID, + Resource: def.PrincipalResourceID, + }.Build() - srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ - EntitlementId: srcEntitlementID, - }.Build()) - if err != nil { - // Only skip not-found entitlements; propagate other errors - // to avoid silently dropping edges and yielding incorrect expansions. - if errors.Is(err, sql.ErrNoRows) { - l.Debug("source entitlement not found, skipping edge", + for _, srcEntitlementID := range def.SrcEntitlementIDs { + // Validate that the source entitlement's resource matches the grant's principal. + srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ + EntitlementId: srcEntitlementID, + }.Build()) + if err != nil { + // Only skip not-found entitlements; propagate other errors + // to avoid silently dropping edges and yielding incorrect expansions. + if errors.Is(err, sql.ErrNoRows) { + l.Debug("source entitlement not found, skipping edge", + zap.String("src_entitlement_id", srcEntitlementID), + zap.String("dst_entitlement_id", def.DstEntitlementID), + ) + continue + } + l.Error("error fetching source entitlement", zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), + zap.String("dst_entitlement_id", def.DstEntitlementID), + zap.Error(err), ) - continue + return err } - l.Error("error fetching source entitlement", - zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), - zap.Error(err), - ) - return err - } - // The expand annotation points at entitlements by id. Those entitlements' resource should match - // the current grant's principal, so we don't allow expanding arbitrary entitlements. - sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() - if sourceEntitlementResourceID == nil { - return fmt.Errorf("source entitlement resource id was nil") - } - if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || - principalID.GetResource() != sourceEntitlementResourceID.GetResource() { - l.Error( - "source entitlement resource id did not match grant principal id", - zap.String("grant_principal_id", principalID.String()), - zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String())) + sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() + if sourceEntitlementResourceID == nil { + return fmt.Errorf("source entitlement resource id was nil") + } + if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || + principalID.GetResource() != sourceEntitlementResourceID.GetResource() { + l.Error( + "source entitlement resource id did not match grant principal id", + zap.String("grant_principal_id", principalID.String()), + zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String())) - return fmt.Errorf("source entitlement resource id did not match grant principal id") - } + return fmt.Errorf("source entitlement resource id did not match grant principal id") + } - graph.AddEntitlement(grant.GetEntitlement()) - graph.AddEntitlement(srcEntitlement.GetEntitlement()) - err = graph.AddEdge(ctx, - srcEntitlement.GetEntitlement().GetId(), - grant.GetEntitlement().GetId(), - expandable.GetShallow(), - expandable.GetResourceTypeIds(), - ) - if err != nil { - return fmt.Errorf("error adding edge to graph: %w", err) + graph.AddEntitlementID(def.DstEntitlementID) + graph.AddEntitlementID(srcEntitlementID) + if err := graph.AddEdge(ctx, srcEntitlementID, def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs); err != nil { + return fmt.Errorf("error adding edge to graph: %w", err) + } } } + + if graph.Loaded { + l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) + } return nil } From d5e3e47371e0d505b2f76c48284ef26c58856afe Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 16:10:06 -0700 Subject: [PATCH 4/9] cleanup and a test --- pkg/dotc1z/grants_expandable_query_test.go | 177 +++++++++++++++++++++ pkg/sync/syncer.go | 3 +- 2 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 pkg/dotc1z/grants_expandable_query_test.go diff --git a/pkg/dotc1z/grants_expandable_query_test.go b/pkg/dotc1z/grants_expandable_query_test.go new file mode 100644 index 000000000..473442545 --- /dev/null +++ b/pkg/dotc1z/grants_expandable_query_test.go @@ -0,0 +1,177 @@ +package dotc1z + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/annotations" +) + +// TestListExpandableGrants_MatchesFullScan verifies that ListExpandableGrants +// returns the same grants as manually scanning all grants and filtering by GrantExpandable annotation. +func TestListExpandableGrants_MatchesFullScan(t *testing.T) { + ctx := context.Background() + + tmpFile, err := os.CreateTemp("", "test-expandable-*.c1z") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + tmpFile.Close() + + c1f, err := NewC1ZFile(ctx, tmpFile.Name()) + require.NoError(t, err) + defer c1f.Close(ctx) + + _, err = c1f.StartNewSync(ctx, "full", "") + require.NoError(t, err) + + // Create test data: mix of expandable and non-expandable grants + userRT := v2.ResourceType_builder{Id: "user"}.Build() + groupRT := v2.ResourceType_builder{Id: "group"}.Build() + require.NoError(t, c1f.PutResourceTypes(ctx, userRT, groupRT)) + + group1 := v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "g1"}.Build()}.Build() + group2 := v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "g2"}.Build()}.Build() + user1 := v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "u1"}.Build()}.Build() + require.NoError(t, c1f.PutResources(ctx, group1, group2, user1)) + + ent1 := v2.Entitlement_builder{Id: "ent1", Resource: group1}.Build() + ent2 := v2.Entitlement_builder{Id: "ent2", Resource: group2}.Build() + ent3 := v2.Entitlement_builder{Id: "ent3", Resource: group1}.Build() + require.NoError(t, c1f.PutEntitlements(ctx, ent1, ent2, ent3)) + + // Grant 1: WITHOUT expandable annotation + normalGrant := v2.Grant_builder{ + Id: "grant-normal", + Entitlement: ent1, + Principal: user1, + }.Build() + + // Grant 2: WITH expandable annotation (single source) + expandableAnno1, err := anypb.New(v2.GrantExpandable_builder{ + EntitlementIds: []string{"ent1"}, + Shallow: true, + ResourceTypeIds: []string{"user"}, + }.Build()) + require.NoError(t, err) + expandableGrant1 := v2.Grant_builder{ + Id: "grant-expandable-1", + Entitlement: ent2, + Principal: group1, + Annotations: []*anypb.Any{expandableAnno1}, + }.Build() + + // Grant 3: WITH expandable annotation (multiple sources) + expandableAnno2, err := anypb.New(v2.GrantExpandable_builder{ + EntitlementIds: []string{"ent1", "ent2"}, + Shallow: false, + ResourceTypeIds: []string{"user", "group"}, + }.Build()) + require.NoError(t, err) + expandableGrant2 := v2.Grant_builder{ + Id: "grant-expandable-2", + Entitlement: ent3, + Principal: group2, + Annotations: []*anypb.Any{expandableAnno2}, + }.Build() + + // Grant 4: WITH empty expandable annotation (should NOT be expandable) + emptyExpandableAnno, err := anypb.New(v2.GrantExpandable_builder{ + EntitlementIds: []string{}, + }.Build()) + require.NoError(t, err) + emptyExpandableGrant := v2.Grant_builder{ + Id: "grant-empty-expandable", + Entitlement: ent1, + Principal: group2, + Annotations: []*anypb.Any{emptyExpandableAnno}, + }.Build() + + require.NoError(t, c1f.PutGrants(ctx, normalGrant, expandableGrant1, expandableGrant2, emptyExpandableGrant)) + + // Method 1: ListExpandableGrants (new optimized path) + var expandableFromQuery []*ExpandableGrantDef + pageToken := "" + for { + defs, nextToken, err := c1f.ListExpandableGrants(ctx, WithExpandableGrantsPageToken(pageToken)) + require.NoError(t, err) + expandableFromQuery = append(expandableFromQuery, defs...) + if nextToken == "" { + break + } + pageToken = nextToken + } + + // Method 2: Full scan and filter (simulates old behavior) + type scannedGrant struct { + externalID string + srcEntitlementIDs []string + shallow bool + resourceTypeIDs []string + dstEntitlementID string + principalRTID string + principalRID string + } + expandableFromScan := make(map[string]*scannedGrant) + pageToken = "" + for { + resp, err := c1f.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build()) + require.NoError(t, err) + for _, g := range resp.GetList() { + annos := annotations.Annotations(g.GetAnnotations()) + ge := &v2.GrantExpandable{} + if _, err := annos.Pick(ge); err == nil && len(ge.GetEntitlementIds()) > 0 { + expandableFromScan[g.GetId()] = &scannedGrant{ + externalID: g.GetId(), + srcEntitlementIDs: ge.GetEntitlementIds(), + shallow: ge.GetShallow(), + resourceTypeIDs: ge.GetResourceTypeIds(), + dstEntitlementID: g.GetEntitlement().GetId(), + principalRTID: g.GetPrincipal().GetId().GetResourceType(), + principalRID: g.GetPrincipal().GetId().GetResource(), + } + } + } + pageToken = resp.GetNextPageToken() + if pageToken == "" { + break + } + } + + // Compare results: same count + require.Len(t, expandableFromQuery, len(expandableFromScan), + "ListExpandableGrants returned %d grants, full scan found %d", + len(expandableFromQuery), len(expandableFromScan)) + + // Compare results: same grants with matching fields + for _, def := range expandableFromQuery { + scanned, ok := expandableFromScan[def.GrantExternalID] + require.True(t, ok, "grant %s found by ListExpandableGrants but not by full scan", def.GrantExternalID) + + require.Equal(t, scanned.dstEntitlementID, def.DstEntitlementID, + "DstEntitlementID mismatch for grant %s", def.GrantExternalID) + require.Equal(t, scanned.principalRTID, def.PrincipalResourceTypeID, + "PrincipalResourceTypeID mismatch for grant %s", def.GrantExternalID) + require.Equal(t, scanned.principalRID, def.PrincipalResourceID, + "PrincipalResourceID mismatch for grant %s", def.GrantExternalID) + require.ElementsMatch(t, scanned.srcEntitlementIDs, def.SrcEntitlementIDs, + "SrcEntitlementIDs mismatch for grant %s", def.GrantExternalID) + require.Equal(t, scanned.shallow, def.Shallow, + "Shallow mismatch for grant %s", def.GrantExternalID) + require.ElementsMatch(t, scanned.resourceTypeIDs, def.PrincipalResourceTypeIDs, + "PrincipalResourceTypeIDs mismatch for grant %s", def.GrantExternalID) + } + + // Verify we found exactly the expected expandable grants + require.Len(t, expandableFromQuery, 2, "should have exactly 2 expandable grants") + + grantIDs := make([]string, len(expandableFromQuery)) + for i, def := range expandableFromQuery { + grantIDs[i] = def.GrantExternalID + } + require.ElementsMatch(t, []string{"grant-expandable-1", "grant-expandable-2"}, grantIDs) +} diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 80dbfac1c..142c5d42d 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1809,7 +1809,8 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle graph.AddEntitlementID(def.DstEntitlementID) graph.AddEntitlementID(srcEntitlementID) - if err := graph.AddEdge(ctx, srcEntitlementID, def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs); err != nil { + err = graph.AddEdge(ctx, srcEntitlementID, def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs) + if err != nil { return fmt.Errorf("error adding edge to graph: %w", err) } } From 1cecec8baf8aa84eaefcd7ddff05176322682248 Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 16:15:37 -0700 Subject: [PATCH 5/9] dont explode sync if we dont have the right interface --- pkg/connectorstore/connectorstore.go | 4 +- pkg/dotc1z/sync_runs.go | 58 ++++++++++++---------------- pkg/sync/syncer.go | 39 ++++++++++--------- 3 files changed, 48 insertions(+), 53 deletions(-) diff --git a/pkg/connectorstore/connectorstore.go b/pkg/connectorstore/connectorstore.go index 0718eb987..0b85801ce 100644 --- a/pkg/connectorstore/connectorstore.go +++ b/pkg/connectorstore/connectorstore.go @@ -74,6 +74,6 @@ type Writer interface { // ExpansionStore provides methods for grant expansion operations. // Not all store implementations support expansion; callers should type-assert. type ExpansionStore interface { - // SetExpansionStarted marks the given sync as having started expansion. - SetExpansionStarted(ctx context.Context, syncID string) error + // SetSupportsDiff marks the sync as supporting diff operations. + SetSupportsDiff(ctx context.Context, syncID string) error } diff --git a/pkg/dotc1z/sync_runs.go b/pkg/dotc1z/sync_runs.go index 9408e5157..a94e8faec 100644 --- a/pkg/dotc1z/sync_runs.go +++ b/pkg/dotc1z/sync_runs.go @@ -34,7 +34,7 @@ create table if not exists %s ( sync_type text not null default 'full', parent_sync_id text not null default '', linked_sync_id text not null default '', - expansion_started_at datetime + supports_diff integer not null default 0 ); create unique index if not exists %s on %s (sync_id);` @@ -98,31 +98,23 @@ func (r *syncRunsTable) Migrations(ctx context.Context, db *goqu.Database) error } } - // Check if expansion_started_at column exists - var expansionStartedAtExists int - err = db.QueryRowContext(ctx, fmt.Sprintf("select count(*) from pragma_table_info('%s') where name='expansion_started_at'", r.Name())).Scan(&expansionStartedAtExists) - if err != nil { + // Add supports_diff column if missing (for older files). + if _, err = db.ExecContext(ctx, fmt.Sprintf("alter table %s add column supports_diff integer not null default 0", r.Name())); err != nil && !isAlreadyExistsError(err) { return err } - if expansionStartedAtExists == 0 { - _, err = db.ExecContext(ctx, fmt.Sprintf("alter table %s add column expansion_started_at datetime", r.Name())) - if err != nil { - return err - } - } return nil } type syncRun struct { - ID string - StartedAt *time.Time - EndedAt *time.Time - SyncToken string - Type connectorstore.SyncType - ParentSyncID string - LinkedSyncID string - ExpansionStartedAt *time.Time + ID string + StartedAt *time.Time + EndedAt *time.Time + SyncToken string + Type connectorstore.SyncType + ParentSyncID string + LinkedSyncID string + SupportsDiff bool } // getCachedViewSyncRun returns the cached sync run for read operations. @@ -174,7 +166,7 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connector oneWeekAgo := time.Now().AddDate(0, 0, -7) ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") q = q.Where(goqu.C("ended_at").IsNull()) q = q.Where(goqu.C("started_at").Gte(oneWeekAgo)) q = q.Order(goqu.C("started_at").Desc()) @@ -190,7 +182,7 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connector row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.ExpansionStartedAt) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.SupportsDiff) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -217,7 +209,7 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint, syncType conn ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") q = q.Where(goqu.C("ended_at").IsNotNull()) if syncType != connectorstore.SyncTypeAny { q = q.Where(goqu.C("sync_type").Eq(syncType)) @@ -236,7 +228,7 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint, syncType conn row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.ExpansionStartedAt) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.SupportsDiff) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -257,7 +249,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } q := c.db.From(syncRuns.Name()).Prepared(true) - q = q.Select("id", "sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") + q = q.Select("id", "sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") if pageToken != "" { q = q.Where(goqu.C("id").Gte(pageToken)) @@ -292,7 +284,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } rowId := 0 data := &syncRun{} - err := rows.Scan(&rowId, &data.ID, &data.StartedAt, &data.EndedAt, &data.SyncToken, &data.Type, &data.ParentSyncID, &data.LinkedSyncID, &data.ExpansionStartedAt) + err := rows.Scan(&rowId, &data.ID, &data.StartedAt, &data.EndedAt, &data.SyncToken, &data.Type, &data.ParentSyncID, &data.LinkedSyncID, &data.SupportsDiff) if err != nil { return nil, "", err } @@ -381,7 +373,7 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "expansion_started_at") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") q = q.Where(goqu.C("sync_id").Eq(syncID)) query, args, err := q.ToSQL() @@ -389,7 +381,7 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { return nil, err } row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.ExpansionStartedAt) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.SupportsDiff) if err != nil { return nil, err } @@ -673,10 +665,10 @@ func (c *C1File) endSyncRun(ctx context.Context, syncID string) error { return nil } -// SetExpansionStarted marks the given sync as having started expansion. -// This marker is used to detect syncs that expanded with older code that dropped annotations. -func (c *C1File) SetExpansionStarted(ctx context.Context, syncID string) error { - ctx, span := tracer.Start(ctx, "C1File.SetExpansionStarted") +// SetSupportsDiff marks the given sync as supporting diff operations. +// This indicates the sync has SQL-layer grant metadata (is_expandable) properly populated. +func (c *C1File) SetSupportsDiff(ctx context.Context, syncID string) error { + ctx, span := tracer.Start(ctx, "C1File.SetSupportsDiff") defer span.End() if c.readOnly { @@ -685,10 +677,10 @@ func (c *C1File) SetExpansionStarted(ctx context.Context, syncID string) error { q := c.db.Update(syncRuns.Name()) q = q.Set(goqu.Record{ - "expansion_started_at": time.Now().Format("2006-01-02 15:04:05.999999999"), + "supports_diff": 1, }) q = q.Where(goqu.C("sync_id").Eq(syncID)) - q = q.Where(goqu.C("expansion_started_at").IsNull()) + q = q.Where(goqu.C("supports_diff").Eq(0)) query, args, err := q.ToSQL() if err != nil { diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 142c5d42d..554bf27cd 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -704,12 +704,17 @@ func (s *syncer) Sync(ctx context.Context) error { continue case SyncGrantExpansionOp: - // Mark the sync as having reached the expansion phase. This is set unconditionally - // so that diffs can detect syncs created with older code that dropped annotations. - if expansionStore, ok := s.store.(connectorstore.ExpansionStore); ok { - if err := expansionStore.SetExpansionStarted(ctx, s.syncID); err != nil { - l.Error("failed to set expansion started marker", zap.Error(err)) - return err + // Mark the sync as supporting diff, but only if we're starting fresh. + // If we're resuming (graph has edges or a page token), we may be continuing + // from old code that didn't have this marker, so we must not set it. + entitlementGraph := s.state.EntitlementGraph(ctx) + isResumingExpansion := entitlementGraph.Loaded || len(entitlementGraph.Edges) > 0 || s.state.PageToken(ctx) != "" + if !isResumingExpansion { + if expansionStore, ok := s.store.(connectorstore.ExpansionStore); ok { + if err := expansionStore.SetSupportsDiff(ctx, s.syncID); err != nil { + l.Error("failed to set supports_diff marker", zap.Error(err)) + return err + } } } @@ -1742,7 +1747,8 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle lister, ok := s.store.(expandableGrantLister) if !ok { - return fmt.Errorf("store does not support ListExpandableGrants") + l.Warn("store does not support ListExpandableGrants") + return nil } defs, nextPageToken, err := lister.ListExpandableGrants( @@ -1754,16 +1760,6 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle return err } - // Handle pagination - if nextPageToken != "" { - if err := s.state.NextPage(ctx, nextPageToken); err != nil { - return err - } - } else { - l.Debug("Finished loading expandable grants to expand") - graph.Loaded = true - } - for _, def := range defs { principalID := v2.ResourceId_builder{ ResourceType: def.PrincipalResourceTypeID, @@ -1816,9 +1812,16 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle } } - if graph.Loaded { + // Handle pagination + if nextPageToken != "" { + if err := s.state.NextPage(ctx, nextPageToken); err != nil { + return err + } + } else { + graph.Loaded = true l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) } + return nil } From 91ff1c883f7d5ebce3d756b238bc5677746c70bc Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 17:48:58 -0700 Subject: [PATCH 6/9] update existing interface instead of making a new one --- pb/c1/connector/v2/grant.pb.go | 54 +++++- pb/c1/connector/v2/grant.pb.validate.go | 4 + pb/c1/connector/v2/grant_protoopaque.pb.go | 52 +++++- pkg/dotc1z/grants_expandable_query.go | 205 --------------------- pkg/dotc1z/grants_expandable_query_test.go | 177 ------------------ pkg/dotc1z/sql_helpers.go | 18 ++ pkg/sync/syncer.go | 52 +++--- proto/c1/connector/v2/grant.proto | 4 + 8 files changed, 137 insertions(+), 429 deletions(-) delete mode 100644 pkg/dotc1z/grants_expandable_query.go delete mode 100644 pkg/dotc1z/grants_expandable_query_test.go diff --git a/pb/c1/connector/v2/grant.pb.go b/pb/c1/connector/v2/grant.pb.go index 485decbee..8562ea93d 100644 --- a/pb/c1/connector/v2/grant.pb.go +++ b/pb/c1/connector/v2/grant.pb.go @@ -228,14 +228,18 @@ func (b0 Grant_builder) Build() *Grant { } type GrantsServiceListGrantsRequest struct { - state protoimpl.MessageState `protogen:"hybrid.v1"` - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` - PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` - PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` - Annotations []*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty"` - ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3" json:"active_sync_id,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"hybrid.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` + Annotations []*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty"` + ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3" json:"active_sync_id,omitempty"` + // If true, only return grants that are expandable (have GrantExpandable annotation). + ExpandableOnly bool `protobuf:"varint,6,opt,name=expandable_only,json=expandableOnly,proto3" json:"expandable_only,omitempty"` + // If true, only return grants that need expansion processing. + NeedsExpansionOnly bool `protobuf:"varint,7,opt,name=needs_expansion_only,json=needsExpansionOnly,proto3" json:"needs_expansion_only,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GrantsServiceListGrantsRequest) Reset() { @@ -298,6 +302,20 @@ func (x *GrantsServiceListGrantsRequest) GetActiveSyncId() string { return "" } +func (x *GrantsServiceListGrantsRequest) GetExpandableOnly() bool { + if x != nil { + return x.ExpandableOnly + } + return false +} + +func (x *GrantsServiceListGrantsRequest) GetNeedsExpansionOnly() bool { + if x != nil { + return x.NeedsExpansionOnly + } + return false +} + func (x *GrantsServiceListGrantsRequest) SetResource(v *Resource) { x.Resource = v } @@ -318,6 +336,14 @@ func (x *GrantsServiceListGrantsRequest) SetActiveSyncId(v string) { x.ActiveSyncId = v } +func (x *GrantsServiceListGrantsRequest) SetExpandableOnly(v bool) { + x.ExpandableOnly = v +} + +func (x *GrantsServiceListGrantsRequest) SetNeedsExpansionOnly(v bool) { + x.NeedsExpansionOnly = v +} + func (x *GrantsServiceListGrantsRequest) HasResource() bool { if x == nil { return false @@ -337,6 +363,10 @@ type GrantsServiceListGrantsRequest_builder struct { PageToken string Annotations []*anypb.Any ActiveSyncId string + // If true, only return grants that are expandable (have GrantExpandable annotation). + ExpandableOnly bool + // If true, only return grants that need expansion processing. + NeedsExpansionOnly bool } func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrantsRequest { @@ -348,6 +378,8 @@ func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrant x.PageToken = b.PageToken x.Annotations = b.Annotations x.ActiveSyncId = b.ActiveSyncId + x.ExpandableOnly = b.ExpandableOnly + x.NeedsExpansionOnly = b.NeedsExpansionOnly return m0 } @@ -813,7 +845,7 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "\x02id\x18\x03 \x01(\tB\n" + "\xfaB\ar\x05 \x01(\x80\bR\x02id\x12A\n" + "\asources\x18\x05 \x01(\v2\x1d.c1.connector.v2.GrantSourcesB\b\xfaB\x05\x8a\x01\x02\x10\x00R\asources\x126\n" + - "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xa6\x02\n" + + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\x81\x03\n" + "\x1eGrantsServiceListGrantsRequest\x12?\n" + "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceB\b\xfaB\x05\x8a\x01\x02\x10\x01R\bresource\x12'\n" + "\tpage_size\x18\x02 \x01(\rB\n" + @@ -822,7 +854,9 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "page_token\x18\x03 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\tpageToken\x126\n" + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\x123\n" + "\x0eactive_sync_id\x18\x05 \x01(\tB\r\xfaB\n" + - "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\"\xbd\x01\n" + + "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\x12'\n" + + "\x0fexpandable_only\x18\x06 \x01(\bR\x0eexpandableOnly\x120\n" + + "\x14needs_expansion_only\x18\a \x01(\bR\x12needsExpansionOnly\"\xbd\x01\n" + "\x1fGrantsServiceListGrantsResponse\x12*\n" + "\x04list\x18\x01 \x03(\v2\x16.c1.connector.v2.GrantR\x04list\x126\n" + "\x0fnext_page_token\x18\x02 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\rnextPageToken\x126\n" + diff --git a/pb/c1/connector/v2/grant.pb.validate.go b/pb/c1/connector/v2/grant.pb.validate.go index a66db3e1f..3ca3c1108 100644 --- a/pb/c1/connector/v2/grant.pb.validate.go +++ b/pb/c1/connector/v2/grant.pb.validate.go @@ -573,6 +573,10 @@ func (m *GrantsServiceListGrantsRequest) validate(all bool) error { } + // no validation rules for ExpandableOnly + + // no validation rules for NeedsExpansionOnly + if len(errors) > 0 { return GrantsServiceListGrantsRequestMultiError(errors) } diff --git a/pb/c1/connector/v2/grant_protoopaque.pb.go b/pb/c1/connector/v2/grant_protoopaque.pb.go index 2aa8d446e..19d3c97cd 100644 --- a/pb/c1/connector/v2/grant_protoopaque.pb.go +++ b/pb/c1/connector/v2/grant_protoopaque.pb.go @@ -230,14 +230,16 @@ func (b0 Grant_builder) Build() *Grant { } type GrantsServiceListGrantsRequest struct { - state protoimpl.MessageState `protogen:"opaque.v1"` - xxx_hidden_Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3"` - xxx_hidden_PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3"` - xxx_hidden_PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3"` - xxx_hidden_Annotations *[]*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3"` - xxx_hidden_ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"opaque.v1"` + xxx_hidden_Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3"` + xxx_hidden_PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3"` + xxx_hidden_PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3"` + xxx_hidden_Annotations *[]*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3"` + xxx_hidden_ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3"` + xxx_hidden_ExpandableOnly bool `protobuf:"varint,6,opt,name=expandable_only,json=expandableOnly,proto3"` + xxx_hidden_NeedsExpansionOnly bool `protobuf:"varint,7,opt,name=needs_expansion_only,json=needsExpansionOnly,proto3"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GrantsServiceListGrantsRequest) Reset() { @@ -302,6 +304,20 @@ func (x *GrantsServiceListGrantsRequest) GetActiveSyncId() string { return "" } +func (x *GrantsServiceListGrantsRequest) GetExpandableOnly() bool { + if x != nil { + return x.xxx_hidden_ExpandableOnly + } + return false +} + +func (x *GrantsServiceListGrantsRequest) GetNeedsExpansionOnly() bool { + if x != nil { + return x.xxx_hidden_NeedsExpansionOnly + } + return false +} + func (x *GrantsServiceListGrantsRequest) SetResource(v *Resource) { x.xxx_hidden_Resource = v } @@ -322,6 +338,14 @@ func (x *GrantsServiceListGrantsRequest) SetActiveSyncId(v string) { x.xxx_hidden_ActiveSyncId = v } +func (x *GrantsServiceListGrantsRequest) SetExpandableOnly(v bool) { + x.xxx_hidden_ExpandableOnly = v +} + +func (x *GrantsServiceListGrantsRequest) SetNeedsExpansionOnly(v bool) { + x.xxx_hidden_NeedsExpansionOnly = v +} + func (x *GrantsServiceListGrantsRequest) HasResource() bool { if x == nil { return false @@ -341,6 +365,10 @@ type GrantsServiceListGrantsRequest_builder struct { PageToken string Annotations []*anypb.Any ActiveSyncId string + // If true, only return grants that are expandable (have GrantExpandable annotation). + ExpandableOnly bool + // If true, only return grants that need expansion processing. + NeedsExpansionOnly bool } func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrantsRequest { @@ -352,6 +380,8 @@ func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrant x.xxx_hidden_PageToken = b.PageToken x.xxx_hidden_Annotations = &b.Annotations x.xxx_hidden_ActiveSyncId = b.ActiveSyncId + x.xxx_hidden_ExpandableOnly = b.ExpandableOnly + x.xxx_hidden_NeedsExpansionOnly = b.NeedsExpansionOnly return m0 } @@ -831,7 +861,7 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "\x02id\x18\x03 \x01(\tB\n" + "\xfaB\ar\x05 \x01(\x80\bR\x02id\x12A\n" + "\asources\x18\x05 \x01(\v2\x1d.c1.connector.v2.GrantSourcesB\b\xfaB\x05\x8a\x01\x02\x10\x00R\asources\x126\n" + - "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xa6\x02\n" + + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\x81\x03\n" + "\x1eGrantsServiceListGrantsRequest\x12?\n" + "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceB\b\xfaB\x05\x8a\x01\x02\x10\x01R\bresource\x12'\n" + "\tpage_size\x18\x02 \x01(\rB\n" + @@ -840,7 +870,9 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "page_token\x18\x03 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\tpageToken\x126\n" + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\x123\n" + "\x0eactive_sync_id\x18\x05 \x01(\tB\r\xfaB\n" + - "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\"\xbd\x01\n" + + "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\x12'\n" + + "\x0fexpandable_only\x18\x06 \x01(\bR\x0eexpandableOnly\x120\n" + + "\x14needs_expansion_only\x18\a \x01(\bR\x12needsExpansionOnly\"\xbd\x01\n" + "\x1fGrantsServiceListGrantsResponse\x12*\n" + "\x04list\x18\x01 \x03(\v2\x16.c1.connector.v2.GrantR\x04list\x126\n" + "\x0fnext_page_token\x18\x02 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\rnextPageToken\x126\n" + diff --git a/pkg/dotc1z/grants_expandable_query.go b/pkg/dotc1z/grants_expandable_query.go deleted file mode 100644 index 5c17c58ec..000000000 --- a/pkg/dotc1z/grants_expandable_query.go +++ /dev/null @@ -1,205 +0,0 @@ -package dotc1z - -import ( - "context" - "database/sql" - "fmt" - "strconv" - - "github.com/doug-martin/goqu/v9" - "google.golang.org/protobuf/proto" - - v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" - "github.com/conductorone/baton-sdk/pkg/annotations" -) - -// ExpandableGrantDef is a lightweight representation of an expandable grant row, -// using queryable columns instead of unmarshalling the full grant proto. -type ExpandableGrantDef struct { - RowID int64 - GrantExternalID string - DstEntitlementID string - PrincipalResourceTypeID string - PrincipalResourceID string - SrcEntitlementIDs []string - Shallow bool - PrincipalResourceTypeIDs []string - NeedsExpansion bool -} - -type listExpandableGrantsOptions struct { - pageToken string - pageSize uint32 - needsExpansionOnly bool - syncID string -} - -type ListExpandableGrantsOption func(*listExpandableGrantsOptions) - -func WithExpandableGrantsPageToken(t string) ListExpandableGrantsOption { - return func(o *listExpandableGrantsOptions) { o.pageToken = t } -} - -func WithExpandableGrantsPageSize(n uint32) ListExpandableGrantsOption { - return func(o *listExpandableGrantsOptions) { o.pageSize = n } -} - -func WithExpandableGrantsNeedsExpansionOnly(b bool) ListExpandableGrantsOption { - return func(o *listExpandableGrantsOptions) { o.needsExpansionOnly = b } -} - -// WithExpandableGrantsSyncID forces listing expandable grants for a specific sync id. -// If omitted, we default to the current sync id, then view sync id, then latest finished sync. -func WithExpandableGrantsSyncID(syncID string) ListExpandableGrantsOption { - return func(o *listExpandableGrantsOptions) { o.syncID = syncID } -} - -// ListExpandableGrants lists expandable grants using the grants table's queryable columns. -// It avoids scanning/unmarshalling all grants. -func (c *C1File) ListExpandableGrants(ctx context.Context, opts ...ListExpandableGrantsOption) ([]*ExpandableGrantDef, string, error) { - ctx, span := tracer.Start(ctx, "C1File.ListExpandableGrants") - defer span.End() - - if err := c.validateDb(ctx); err != nil { - return nil, "", err - } - - o := &listExpandableGrantsOptions{} - for _, opt := range opts { - opt(o) - } - - syncID, err := c.resolveSyncIDForInternalQuery(ctx, o.syncID) - if err != nil { - return nil, "", err - } - - q := c.db.From(grants.Name()).Prepared(true) - q = q.Select( - "id", - "external_id", - "entitlement_id", - "principal_resource_type_id", - "principal_resource_id", - "data", - "needs_expansion", - ) - q = q.Where(goqu.C("sync_id").Eq(syncID)) - q = q.Where(goqu.C("is_expandable").Eq(1)) - if o.needsExpansionOnly { - q = q.Where(goqu.C("needs_expansion").Eq(1)) - } - - if o.pageToken != "" { - // Page token is the grants table row ID. - id, err := strconv.ParseInt(o.pageToken, 10, 64) - if err != nil { - return nil, "", fmt.Errorf("invalid expandable grants page token %q: %w", o.pageToken, err) - } - q = q.Where(goqu.C("id").Gte(id)) - } - - pageSize := o.pageSize - if pageSize > maxPageSize || pageSize == 0 { - pageSize = maxPageSize - } - q = q.Order(goqu.C("id").Asc()).Limit(uint(pageSize + 1)) - - query, args, err := q.ToSQL() - if err != nil { - return nil, "", err - } - - rows, err := c.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, "", err - } - defer rows.Close() - - defs := make([]*ExpandableGrantDef, 0, pageSize) - var ( - count uint32 - lastRow int64 - ) - for rows.Next() { - count++ - if count > pageSize { - break - } - - var ( - rowID int64 - externalID string - dstEntitlementID string - principalRTID string - principalRID string - dataBlob []byte - needsExpansionInt int - ) - - if err := rows.Scan( - &rowID, - &externalID, - &dstEntitlementID, - &principalRTID, - &principalRID, - &dataBlob, - &needsExpansionInt, - ); err != nil { - return nil, "", err - } - lastRow = rowID - - var g v2.Grant - if err := proto.Unmarshal(dataBlob, &g); err != nil { - return nil, "", fmt.Errorf("invalid grant data for %q: %w", externalID, err) - } - - annos := annotations.Annotations(g.GetAnnotations()) - ge := &v2.GrantExpandable{} - if _, err := annos.Pick(ge); err != nil { - return nil, "", fmt.Errorf("failed to extract GrantExpandable from grant %q: %w", externalID, err) - } - - defs = append(defs, &ExpandableGrantDef{ - RowID: rowID, - GrantExternalID: externalID, - DstEntitlementID: dstEntitlementID, - PrincipalResourceTypeID: principalRTID, - PrincipalResourceID: principalRID, - SrcEntitlementIDs: ge.GetEntitlementIds(), - Shallow: ge.GetShallow(), - PrincipalResourceTypeIDs: ge.GetResourceTypeIds(), - NeedsExpansion: needsExpansionInt != 0, - }) - } - if err := rows.Err(); err != nil { - return nil, "", err - } - - nextPageToken := "" - if count > pageSize { - nextPageToken = strconv.FormatInt(lastRow+1, 10) - } - return defs, nextPageToken, nil -} - -func (c *C1File) resolveSyncIDForInternalQuery(ctx context.Context, forced string) (string, error) { - switch { - case forced != "": - return forced, nil - case c.currentSyncID != "": - return c.currentSyncID, nil - case c.viewSyncID != "": - return c.viewSyncID, nil - default: - latest, err := c.getCachedViewSyncRun(ctx) - if err != nil { - return "", err - } - if latest == nil { - return "", sql.ErrNoRows - } - return latest.ID, nil - } -} diff --git a/pkg/dotc1z/grants_expandable_query_test.go b/pkg/dotc1z/grants_expandable_query_test.go deleted file mode 100644 index 473442545..000000000 --- a/pkg/dotc1z/grants_expandable_query_test.go +++ /dev/null @@ -1,177 +0,0 @@ -package dotc1z - -import ( - "context" - "os" - "testing" - - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/anypb" - - v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" - "github.com/conductorone/baton-sdk/pkg/annotations" -) - -// TestListExpandableGrants_MatchesFullScan verifies that ListExpandableGrants -// returns the same grants as manually scanning all grants and filtering by GrantExpandable annotation. -func TestListExpandableGrants_MatchesFullScan(t *testing.T) { - ctx := context.Background() - - tmpFile, err := os.CreateTemp("", "test-expandable-*.c1z") - require.NoError(t, err) - defer os.Remove(tmpFile.Name()) - tmpFile.Close() - - c1f, err := NewC1ZFile(ctx, tmpFile.Name()) - require.NoError(t, err) - defer c1f.Close(ctx) - - _, err = c1f.StartNewSync(ctx, "full", "") - require.NoError(t, err) - - // Create test data: mix of expandable and non-expandable grants - userRT := v2.ResourceType_builder{Id: "user"}.Build() - groupRT := v2.ResourceType_builder{Id: "group"}.Build() - require.NoError(t, c1f.PutResourceTypes(ctx, userRT, groupRT)) - - group1 := v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "g1"}.Build()}.Build() - group2 := v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "group", Resource: "g2"}.Build()}.Build() - user1 := v2.Resource_builder{Id: v2.ResourceId_builder{ResourceType: "user", Resource: "u1"}.Build()}.Build() - require.NoError(t, c1f.PutResources(ctx, group1, group2, user1)) - - ent1 := v2.Entitlement_builder{Id: "ent1", Resource: group1}.Build() - ent2 := v2.Entitlement_builder{Id: "ent2", Resource: group2}.Build() - ent3 := v2.Entitlement_builder{Id: "ent3", Resource: group1}.Build() - require.NoError(t, c1f.PutEntitlements(ctx, ent1, ent2, ent3)) - - // Grant 1: WITHOUT expandable annotation - normalGrant := v2.Grant_builder{ - Id: "grant-normal", - Entitlement: ent1, - Principal: user1, - }.Build() - - // Grant 2: WITH expandable annotation (single source) - expandableAnno1, err := anypb.New(v2.GrantExpandable_builder{ - EntitlementIds: []string{"ent1"}, - Shallow: true, - ResourceTypeIds: []string{"user"}, - }.Build()) - require.NoError(t, err) - expandableGrant1 := v2.Grant_builder{ - Id: "grant-expandable-1", - Entitlement: ent2, - Principal: group1, - Annotations: []*anypb.Any{expandableAnno1}, - }.Build() - - // Grant 3: WITH expandable annotation (multiple sources) - expandableAnno2, err := anypb.New(v2.GrantExpandable_builder{ - EntitlementIds: []string{"ent1", "ent2"}, - Shallow: false, - ResourceTypeIds: []string{"user", "group"}, - }.Build()) - require.NoError(t, err) - expandableGrant2 := v2.Grant_builder{ - Id: "grant-expandable-2", - Entitlement: ent3, - Principal: group2, - Annotations: []*anypb.Any{expandableAnno2}, - }.Build() - - // Grant 4: WITH empty expandable annotation (should NOT be expandable) - emptyExpandableAnno, err := anypb.New(v2.GrantExpandable_builder{ - EntitlementIds: []string{}, - }.Build()) - require.NoError(t, err) - emptyExpandableGrant := v2.Grant_builder{ - Id: "grant-empty-expandable", - Entitlement: ent1, - Principal: group2, - Annotations: []*anypb.Any{emptyExpandableAnno}, - }.Build() - - require.NoError(t, c1f.PutGrants(ctx, normalGrant, expandableGrant1, expandableGrant2, emptyExpandableGrant)) - - // Method 1: ListExpandableGrants (new optimized path) - var expandableFromQuery []*ExpandableGrantDef - pageToken := "" - for { - defs, nextToken, err := c1f.ListExpandableGrants(ctx, WithExpandableGrantsPageToken(pageToken)) - require.NoError(t, err) - expandableFromQuery = append(expandableFromQuery, defs...) - if nextToken == "" { - break - } - pageToken = nextToken - } - - // Method 2: Full scan and filter (simulates old behavior) - type scannedGrant struct { - externalID string - srcEntitlementIDs []string - shallow bool - resourceTypeIDs []string - dstEntitlementID string - principalRTID string - principalRID string - } - expandableFromScan := make(map[string]*scannedGrant) - pageToken = "" - for { - resp, err := c1f.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build()) - require.NoError(t, err) - for _, g := range resp.GetList() { - annos := annotations.Annotations(g.GetAnnotations()) - ge := &v2.GrantExpandable{} - if _, err := annos.Pick(ge); err == nil && len(ge.GetEntitlementIds()) > 0 { - expandableFromScan[g.GetId()] = &scannedGrant{ - externalID: g.GetId(), - srcEntitlementIDs: ge.GetEntitlementIds(), - shallow: ge.GetShallow(), - resourceTypeIDs: ge.GetResourceTypeIds(), - dstEntitlementID: g.GetEntitlement().GetId(), - principalRTID: g.GetPrincipal().GetId().GetResourceType(), - principalRID: g.GetPrincipal().GetId().GetResource(), - } - } - } - pageToken = resp.GetNextPageToken() - if pageToken == "" { - break - } - } - - // Compare results: same count - require.Len(t, expandableFromQuery, len(expandableFromScan), - "ListExpandableGrants returned %d grants, full scan found %d", - len(expandableFromQuery), len(expandableFromScan)) - - // Compare results: same grants with matching fields - for _, def := range expandableFromQuery { - scanned, ok := expandableFromScan[def.GrantExternalID] - require.True(t, ok, "grant %s found by ListExpandableGrants but not by full scan", def.GrantExternalID) - - require.Equal(t, scanned.dstEntitlementID, def.DstEntitlementID, - "DstEntitlementID mismatch for grant %s", def.GrantExternalID) - require.Equal(t, scanned.principalRTID, def.PrincipalResourceTypeID, - "PrincipalResourceTypeID mismatch for grant %s", def.GrantExternalID) - require.Equal(t, scanned.principalRID, def.PrincipalResourceID, - "PrincipalResourceID mismatch for grant %s", def.GrantExternalID) - require.ElementsMatch(t, scanned.srcEntitlementIDs, def.SrcEntitlementIDs, - "SrcEntitlementIDs mismatch for grant %s", def.GrantExternalID) - require.Equal(t, scanned.shallow, def.Shallow, - "Shallow mismatch for grant %s", def.GrantExternalID) - require.ElementsMatch(t, scanned.resourceTypeIDs, def.PrincipalResourceTypeIDs, - "PrincipalResourceTypeIDs mismatch for grant %s", def.GrantExternalID) - } - - // Verify we found exactly the expected expandable grants - require.Len(t, expandableFromQuery, 2, "should have exactly 2 expandable grants") - - grantIDs := make([]string, len(expandableFromQuery)) - for i, def := range expandableFromQuery { - grantIDs[i] = def.GrantExternalID - } - require.ElementsMatch(t, []string{"grant-expandable-1", "grant-expandable-2"}, grantIDs) -} diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 8528537b5..218cb1488 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -88,6 +88,16 @@ type hasParentResourceIdListRequest interface { GetParentResourceId() *v2.ResourceId } +type hasExpandableOnlyListRequest interface { + listRequest + GetExpandableOnly() bool +} + +type hasNeedsExpansionOnlyListRequest interface { + listRequest + GetNeedsExpansionOnly() bool +} + type protoHasID interface { proto.Message GetId() string @@ -202,6 +212,14 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table } } + if expandableReq, ok := req.(hasExpandableOnlyListRequest); ok && expandableReq.GetExpandableOnly() { + q = q.Where(goqu.C("is_expandable").Eq(1)) + } + + if needsExpansionReq, ok := req.(hasNeedsExpansionOnlyListRequest); ok && needsExpansionReq.GetNeedsExpansionOnly() { + q = q.Where(goqu.C("needs_expansion").Eq(1)) + } + // If a sync is running, be sure we only select from the current values switch { case reqSyncID != "": diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 554bf27cd..e6ab1ed1e 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1739,34 +1739,31 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle s.handleInitialActionForStep(ctx, *s.state.Current()) } - // Prefer a fast-path that scans only expandable grants using queryable columns, - // avoiding full protobuf unmarshalling across all grants. - type expandableGrantLister interface { - ListExpandableGrants(ctx context.Context, opts ...dotc1z.ListExpandableGrantsOption) ([]*dotc1z.ExpandableGrantDef, string, error) - } - - lister, ok := s.store.(expandableGrantLister) - if !ok { - l.Warn("store does not support ListExpandableGrants") - return nil - } - - defs, nextPageToken, err := lister.ListExpandableGrants( - ctx, - dotc1z.WithExpandableGrantsPageToken(pageToken), - dotc1z.WithExpandableGrantsNeedsExpansionOnly(true), - ) + // List only expandable grants that need expansion using SQL-layer filtering. + resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{ + PageToken: pageToken, + ExpandableOnly: true, + NeedsExpansionOnly: true, + }.Build()) if err != nil { return err } - for _, def := range defs { - principalID := v2.ResourceId_builder{ - ResourceType: def.PrincipalResourceTypeID, - Resource: def.PrincipalResourceID, - }.Build() + for _, grant := range resp.GetList() { + // Extract GrantExpandable annotation from the grant. + annos := annotations.Annotations(grant.GetAnnotations()) + expandable := &v2.GrantExpandable{} + ok, err := annos.Pick(expandable) + if err != nil || !ok { + // This shouldn't happen since we filtered by is_expandable=1, + // but skip gracefully if the annotation is missing. + continue + } + + principalID := grant.GetPrincipal().GetId() + dstEntitlementID := grant.GetEntitlement().GetId() - for _, srcEntitlementID := range def.SrcEntitlementIDs { + for _, srcEntitlementID := range expandable.GetEntitlementIds() { // Validate that the source entitlement's resource matches the grant's principal. srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ EntitlementId: srcEntitlementID, @@ -1777,13 +1774,13 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle if errors.Is(err, sql.ErrNoRows) { l.Debug("source entitlement not found, skipping edge", zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", def.DstEntitlementID), + zap.String("dst_entitlement_id", dstEntitlementID), ) continue } l.Error("error fetching source entitlement", zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", def.DstEntitlementID), + zap.String("dst_entitlement_id", dstEntitlementID), zap.Error(err), ) return err @@ -1803,9 +1800,9 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle return fmt.Errorf("source entitlement resource id did not match grant principal id") } - graph.AddEntitlementID(def.DstEntitlementID) + graph.AddEntitlementID(dstEntitlementID) graph.AddEntitlementID(srcEntitlementID) - err = graph.AddEdge(ctx, srcEntitlementID, def.DstEntitlementID, def.Shallow, def.PrincipalResourceTypeIDs) + err = graph.AddEdge(ctx, srcEntitlementID, dstEntitlementID, expandable.GetShallow(), expandable.GetResourceTypeIds()) if err != nil { return fmt.Errorf("error adding edge to graph: %w", err) } @@ -1813,6 +1810,7 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle } // Handle pagination + nextPageToken := resp.GetNextPageToken() if nextPageToken != "" { if err := s.state.NextPage(ctx, nextPageToken); err != nil { return err diff --git a/proto/c1/connector/v2/grant.proto b/proto/c1/connector/v2/grant.proto index 4cb3a47f4..6fa347b4d 100644 --- a/proto/c1/connector/v2/grant.proto +++ b/proto/c1/connector/v2/grant.proto @@ -46,6 +46,10 @@ message GrantsServiceListGrantsRequest { min_bytes: 1 max_bytes: 1024 }]; + // If true, only return grants that are expandable (have GrantExpandable annotation). + bool expandable_only = 6; + // If true, only return grants that need expansion processing. + bool needs_expansion_only = 7; } message GrantsServiceListGrantsResponse { From 7492ccacf83398fd5657752241201a21445754b7 Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 18:10:18 -0700 Subject: [PATCH 7/9] hoist the expandable annotation to teh row level --- pkg/dotc1z/c1file_attached.go | 17 ++- pkg/dotc1z/grants.go | 249 ++++++++++++++++++++++++++++++---- pkg/dotc1z/grants_test.go | 17 ++- pkg/dotc1z/sql_helpers.go | 2 +- 4 files changed, 247 insertions(+), 38 deletions(-) diff --git a/pkg/dotc1z/c1file_attached.go b/pkg/dotc1z/c1file_attached.go index 54e64f4f2..b0c45e5a3 100644 --- a/pkg/dotc1z/c1file_attached.go +++ b/pkg/dotc1z/c1file_attached.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "strings" "time" reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" @@ -374,6 +375,18 @@ func (c *C1FileAttached) diffTableFromMainTx(ctx context.Context, tx *sql.Tx, ta // 1. Not in attached (OLD) - additions // 2. In attached but with different data - modifications // newSyncID is in main, oldSyncID is in attached + // + // For grants, we also compare the expansion column since GrantExpandable + // annotation is stored separately from data. + var dataCompare string + if strings.Contains(tableName, "grants") { + // For grants: compare both data AND expansion columns. + // Use IFNULL to handle NULL expansion values. + dataCompare = "(a.data != m.data OR IFNULL(a.expansion, X'') != IFNULL(m.expansion, X''))" + } else { + dataCompare = "a.data != m.data" + } + //nolint:gosec // table names are from hardcoded list, not user input query := fmt.Sprintf(` INSERT INTO main.%s (%s) @@ -389,10 +402,10 @@ func (c *C1FileAttached) diffTableFromMainTx(ctx context.Context, tx *sql.Tx, ta SELECT 1 FROM attached.%s AS a WHERE a.external_id = m.external_id AND a.sync_id = ? - AND a.data != m.data + AND %s ) ) - `, tableName, columnList, selectList, tableName, tableName, tableName) + `, tableName, columnList, selectList, tableName, tableName, tableName, dataCompare) _, err = tx.ExecContext(ctx, query, targetSyncID, newSyncID, oldSyncID, oldSyncID) return err diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index e99af42f5..74879199e 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -24,7 +24,7 @@ create table if not exists %s ( principal_resource_type_id text not null, principal_resource_id text not null, external_id text not null, - is_expandable integer not null default 0, -- 1 if data contains a GrantExpandable annotation; used to build the expansion graph. + expansion blob, -- Serialized GrantExpandable proto; NULL if grant is not expandable. needs_expansion integer not null default 0, -- 1 if grant should be processed during expansion. data blob not null, sync_id text not null, @@ -69,9 +69,9 @@ func isAlreadyExistsError(err error) bool { } func (r *grantsTable) Migrations(ctx context.Context, db *goqu.Database) error { - // Add is_expandable column if missing (for older files). + // Add expansion column if missing (for older files). if _, err := db.ExecContext(ctx, fmt.Sprintf( - "alter table %s add column is_expandable integer not null default 0", r.Name(), + "alter table %s add column expansion blob", r.Name(), )); err != nil && !isAlreadyExistsError(err) { return err } @@ -83,7 +83,16 @@ func (r *grantsTable) Migrations(ctx context.Context, db *goqu.Database) error { return err } - // Create the index only after the columns exist. + // Create partial index for efficient queries on expandable grants. + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "create index if not exists %s on %s (sync_id) where expansion is not null", + fmt.Sprintf("idx_grants_sync_expansion_v%s", r.Version()), + r.Name(), + )); err != nil { + return err + } + + // Create index for needs_expansion queries. if _, err := db.ExecContext(ctx, fmt.Sprintf( "create index if not exists %s on %s (sync_id, needs_expansion)", fmt.Sprintf("idx_grants_sync_needs_expansion_v%s", r.Version()), @@ -92,15 +101,15 @@ func (r *grantsTable) Migrations(ctx context.Context, db *goqu.Database) error { return err } - // Backfill from stored grant bytes for rows that haven't been classified yet. - return backfillGrantExpandableColumns(ctx, db, r.Name()) + // Backfill expansion column from stored grant bytes. + return backfillGrantExpansionColumn(ctx, db, r.Name()) } func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error) { ctx, span := tracer.Start(ctx, "C1File.ListGrants") defer span.End() - ret, nextPageToken, err := listConnectorObjects(ctx, c, grants.Name(), request, func() *v2.Grant { return &v2.Grant{} }) + ret, nextPageToken, err := listGrantsWithExpansion(ctx, c, grants.Name(), request) if err != nil { return nil, fmt.Errorf("error listing grants: %w", err) } @@ -111,6 +120,154 @@ func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGr }.Build(), nil } +// listGrantsWithExpansion lists grants and re-attaches the expansion annotation from the +// separate column back to the grant's annotations. This allows callers to access the +// GrantExpandable annotation as if it were stored in the data blob. +func listGrantsWithExpansion( + ctx context.Context, + c *C1File, + tableName string, + request *v2.GrantsServiceListGrantsRequest, +) ([]*v2.Grant, string, error) { + // Use generic listing but post-process to attach expansion + grants, nextPageToken, err := listGrantsInternal(ctx, c, tableName, request) + if err != nil { + return nil, "", err + } + return grants, nextPageToken, nil +} + +// listGrantsInternal performs the actual grant listing with expansion column handling. +func listGrantsInternal( + ctx context.Context, + c *C1File, + tableName string, + req *v2.GrantsServiceListGrantsRequest, +) ([]*v2.Grant, string, error) { + annoSyncID, err := annotations.GetSyncIdFromAnnotations(req.GetAnnotations()) + if err != nil { + return nil, "", fmt.Errorf("error getting sync id from annotations for list request: %w", err) + } + + var reqSyncID string + switch { + case annoSyncID != "": + reqSyncID = annoSyncID + case c.currentSyncID != "": + reqSyncID = c.currentSyncID + case c.viewSyncID != "": + reqSyncID = c.viewSyncID + default: + reqSyncID = "" + } + + q := c.db.From(tableName).Prepared(true) + q = q.Select("id", "data", "expansion") + + // Apply resource filters + r := req.GetResource() + if r != nil { + q = q.Where(goqu.C("resource_id").Eq(r.GetId().GetResource())) + q = q.Where(goqu.C("resource_type_id").Eq(r.GetId().GetResourceType())) + } + + // Apply expandable/needs_expansion filters + if req.GetExpandableOnly() { + q = q.Where(goqu.C("expansion").IsNotNull()) + } + if req.GetNeedsExpansionOnly() { + q = q.Where(goqu.C("needs_expansion").Eq(1)) + } + + // Apply sync ID filter + switch { + case reqSyncID != "": + q = q.Where(goqu.C("sync_id").Eq(reqSyncID)) + default: + latestSyncRun, err := c.getCachedViewSyncRun(ctx) + if err != nil { + return nil, "", err + } + if latestSyncRun != nil { + q = q.Where(goqu.C("sync_id").Eq(latestSyncRun.ID)) + } + } + + // Handle pagination + if req.GetPageToken() != "" { + q = q.Where(goqu.C("id").Gte(req.GetPageToken())) + } + + pageSize := req.GetPageSize() + if pageSize > maxPageSize || pageSize == 0 { + pageSize = maxPageSize + } + q = q.Order(goqu.C("id").Asc()) + q = q.Limit(uint(pageSize + 1)) + + query, args, err := q.ToSQL() + if err != nil { + return nil, "", err + } + + rows, err := c.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, "", err + } + defer rows.Close() + + var unmarshalerOptions = proto.UnmarshalOptions{ + Merge: true, + DiscardUnknown: true, + } + + var count uint32 + var lastRow int + var ret []*v2.Grant + + for rows.Next() { + count++ + if count > pageSize { + break + } + + var id int + var data []byte + var expansion []byte + if err := rows.Scan(&id, &data, &expansion); err != nil { + return nil, "", err + } + lastRow = id + + grant := &v2.Grant{} + if err := unmarshalerOptions.Unmarshal(data, grant); err != nil { + return nil, "", err + } + + // Re-attach expansion annotation if present. + if len(expansion) > 0 { + expandable := &v2.GrantExpandable{} + if err := proto.Unmarshal(expansion, expandable); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal expansion: %w", err) + } + annos := annotations.Annotations(grant.GetAnnotations()) + annos.Append(expandable) + grant.SetAnnotations(annos) + } + + ret = append(ret, grant) + } + if err := rows.Err(); err != nil { + return nil, "", err + } + + nextPageToken := "" + if count > pageSize { + nextPageToken = fmt.Sprintf("%d", lastRow+1) + } + return ret, nextPageToken, nil +} + func (c *C1File) GetGrant(ctx context.Context, request *reader_v2.GrantsReaderServiceGetGrantRequest) (*reader_v2.GrantsReaderServiceGetGrantResponse, error) { ctx, span := tracer.Start(ctx, "C1File.GetGrant") defer span.End() @@ -206,7 +363,7 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran err := f(ctx, c, grants.Name(), func(grant *v2.Grant) (goqu.Record, error) { - expandable := isGrantExpandable(grant) + expansionBytes, needsExpansion := extractAndStripExpansion(grant) return goqu.Record{ "resource_type_id": grant.GetEntitlement().GetResource().GetId().GetResourceType(), @@ -214,8 +371,8 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran "entitlement_id": grant.GetEntitlement().GetId(), "principal_resource_type_id": grant.GetPrincipal().GetId().GetResourceType(), "principal_resource_id": grant.GetPrincipal().GetId().GetResource(), - "is_expandable": expandable, - "needs_expansion": expandable, + "expansion": expansionBytes, // nil for non-expandable grants + "needs_expansion": needsExpansion, }, nil }, bulkGrants..., @@ -227,33 +384,54 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran return nil } -// isGrantExpandable returns true if the grant has a valid GrantExpandable annotation -// with at least one non-whitespace entitlement ID. -func isGrantExpandable(grant *v2.Grant) bool { +// extractAndStripExpansion extracts the GrantExpandable annotation from the grant, +// removes it from the grant's annotations, and returns the serialized proto bytes. +// Returns (nil, false) if the grant has no valid GrantExpandable annotation. +func extractAndStripExpansion(grant *v2.Grant) ([]byte, bool) { annos := annotations.Annotations(grant.GetAnnotations()) expandable := &v2.GrantExpandable{} ok, err := annos.Pick(expandable) if err != nil || !ok || len(expandable.GetEntitlementIds()) == 0 { - return false + return nil, false } // Check that there's at least one non-whitespace entitlement ID. + hasValid := false for _, id := range expandable.GetEntitlementIds() { if strings.TrimSpace(id) != "" { - return true + hasValid = true + break } } - return false + if !hasValid { + return nil, false + } + + // Strip the GrantExpandable annotation from the grant by filtering it out. + filtered := annotations.Annotations{} + for _, a := range annos { + if !a.MessageIs(expandable) { + filtered = append(filtered, a) + } + } + grant.SetAnnotations(filtered) + + // Serialize the expandable annotation. + data, err := proto.Marshal(expandable) + if err != nil { + return nil, false + } + return data, true } -func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tableName string) error { +func backfillGrantExpansionColumn(ctx context.Context, db *goqu.Database, tableName string) error { // Scan for rows that contain "GrantExpandable" in the proto blob but haven't been - // backfilled yet (is_expandable=0). The LIKE filter skips the 99%+ of rows that + // backfilled yet (expansion IS NULL). The LIKE filter skips the 99%+ of rows that // don't have expandable annotations, making this fast even on large tables. for { rows, err := db.QueryContext(ctx, fmt.Sprintf( `SELECT id, data FROM %s - WHERE is_expandable=0 AND data LIKE '%%GrantExpandable%%' + WHERE expansion IS NULL AND data LIKE '%%GrantExpandable%%' LIMIT 1000`, tableName, )) @@ -290,7 +468,7 @@ func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tabl } stmt, err := tx.PrepareContext(ctx, fmt.Sprintf( - `UPDATE %s SET is_expandable=?, needs_expansion=? WHERE id=?`, + `UPDATE %s SET expansion=?, needs_expansion=?, data=? WHERE id=?`, tableName, )) if err != nil { @@ -305,12 +483,22 @@ func backfillGrantExpandableColumns(ctx context.Context, db *goqu.Database, tabl _ = tx.Rollback() return err } - // Only update if we found a valid expandable annotation. - // Rows with "GrantExpandable" in the blob but no valid annotation are skipped. - if !isGrantExpandable(g) { + + expansionBytes, needsExpansion := extractAndStripExpansion(g) + if expansionBytes == nil { + // No valid expandable annotation - skip. continue } - if _, err := stmt.ExecContext(ctx, true, true, r.id); err != nil { + + // Re-serialize the grant with the annotation stripped. + newData, err := proto.Marshal(g) + if err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + + if _, err := stmt.ExecContext(ctx, expansionBytes, needsExpansion, newData, r.id); err != nil { _ = stmt.Close() _ = tx.Rollback() return err @@ -365,21 +553,22 @@ func bulkPutGrantsInternal( return err } - // needs_expansion should only flip to 1 when is_expandable changes. - // If a grant is no longer expandable (is_expandable=0), needs_expansion should be forced to 0. + // needs_expansion should only flip to 1 when expansion changes from NULL to non-NULL. + // If a grant is no longer expandable (expansion IS NULL), needs_expansion should be forced to 0. needsExpansionExpr := goqu.L( `CASE - WHEN EXCLUDED.is_expandable = 0 THEN 0 - WHEN EXCLUDED.is_expandable != ?.is_expandable THEN 1 + WHEN EXCLUDED.expansion IS NULL THEN 0 + WHEN ?.expansion IS NULL AND EXCLUDED.expansion IS NOT NULL THEN 1 + WHEN ?.expansion IS NOT NULL AND EXCLUDED.expansion IS NOT NULL AND ?.expansion != EXCLUDED.expansion THEN 1 ELSE ?.needs_expansion END`, - goqu.I(tableName), goqu.I(tableName), + goqu.I(tableName), goqu.I(tableName), goqu.I(tableName), goqu.I(tableName), ) buildQueryFn := func(insertDs *goqu.InsertDataset, chunkedRows []*goqu.Record) (*goqu.InsertDataset, error) { update := goqu.Record{ "data": goqu.I("EXCLUDED.data"), - "is_expandable": goqu.I("EXCLUDED.is_expandable"), + "expansion": goqu.I("EXCLUDED.expansion"), "needs_expansion": needsExpansionExpr, } if ifNewer { diff --git a/pkg/dotc1z/grants_test.go b/pkg/dotc1z/grants_test.go index 6b5e4d754..1b17b73b8 100644 --- a/pkg/dotc1z/grants_test.go +++ b/pkg/dotc1z/grants_test.go @@ -9,9 +9,9 @@ import ( v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" ) -func TestIsGrantExpandable_WhitespaceOnlyEntitlementIDs(t *testing.T) { +func TestExtractAndStripExpansion_WhitespaceOnlyEntitlementIDs(t *testing.T) { // Create a GrantExpandable annotation with only whitespace entitlement IDs. - // This should return false because there are no valid source entitlements. + // This should return nil because there are no valid source entitlements. expandable := v2.GrantExpandable_builder{ EntitlementIds: []string{" ", "\t", " \n "}, }.Build() @@ -33,10 +33,12 @@ func TestIsGrantExpandable_WhitespaceOnlyEntitlementIDs(t *testing.T) { Annotations: []*anypb.Any{expandableAny}, }.Build() - require.False(t, isGrantExpandable(grant), "grant with only whitespace entitlement IDs should not be expandable") + expansionBytes, isExpandable := extractAndStripExpansion(grant) + require.False(t, isExpandable, "grant with only whitespace entitlement IDs should not be expandable") + require.Nil(t, expansionBytes, "expansion bytes should be nil for non-expandable grant") } -func TestIsGrantExpandable_MixedWhitespaceAndValidIDs(t *testing.T) { +func TestExtractAndStripExpansion_MixedWhitespaceAndValidIDs(t *testing.T) { // Create a GrantExpandable annotation with a mix of whitespace and valid IDs. // The grant should still be expandable because there's at least one valid ID. expandable := v2.GrantExpandable_builder{ @@ -61,5 +63,10 @@ func TestIsGrantExpandable_MixedWhitespaceAndValidIDs(t *testing.T) { Annotations: []*anypb.Any{expandableAny}, }.Build() - require.True(t, isGrantExpandable(grant), "grant with valid entitlement ID should be expandable") + expansionBytes, isExpandable := extractAndStripExpansion(grant) + require.True(t, isExpandable, "grant with valid entitlement ID should be expandable") + require.NotNil(t, expansionBytes, "expansion bytes should not be nil for expandable grant") + + // Verify that the annotation was stripped from the grant. + require.Len(t, grant.GetAnnotations(), 0, "GrantExpandable annotation should be stripped from grant") } diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 218cb1488..860daa914 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -213,7 +213,7 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table } if expandableReq, ok := req.(hasExpandableOnlyListRequest); ok && expandableReq.GetExpandableOnly() { - q = q.Where(goqu.C("is_expandable").Eq(1)) + q = q.Where(goqu.C("expansion").IsNotNull()) } if needsExpansionReq, ok := req.(hasNeedsExpansionOnlyListRequest); ok && needsExpansionReq.GetNeedsExpansionOnly() { From f504eced01e059ca6aadc9fa5e8e23baccf0b74a Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Wed, 4 Feb 2026 18:17:44 -0700 Subject: [PATCH 8/9] relax constraint on resource for listing grants --- pb/c1/connector/v2/grant.pb.go | 6 +++--- pb/c1/connector/v2/grant.pb.validate.go | 11 ----------- pb/c1/connector/v2/grant_protoopaque.pb.go | 6 +++--- pkg/connectorbuilder/resource_syncer.go | 7 +++++++ pkg/ratelimit/grpc.go | 12 +++++++----- proto/c1/connector/v2/grant.proto | 2 +- 6 files changed, 21 insertions(+), 23 deletions(-) diff --git a/pb/c1/connector/v2/grant.pb.go b/pb/c1/connector/v2/grant.pb.go index 8562ea93d..5fb9a29b1 100644 --- a/pb/c1/connector/v2/grant.pb.go +++ b/pb/c1/connector/v2/grant.pb.go @@ -845,9 +845,9 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "\x02id\x18\x03 \x01(\tB\n" + "\xfaB\ar\x05 \x01(\x80\bR\x02id\x12A\n" + "\asources\x18\x05 \x01(\v2\x1d.c1.connector.v2.GrantSourcesB\b\xfaB\x05\x8a\x01\x02\x10\x00R\asources\x126\n" + - "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\x81\x03\n" + - "\x1eGrantsServiceListGrantsRequest\x12?\n" + - "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceB\b\xfaB\x05\x8a\x01\x02\x10\x01R\bresource\x12'\n" + + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xf7\x02\n" + + "\x1eGrantsServiceListGrantsRequest\x125\n" + + "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceR\bresource\x12'\n" + "\tpage_size\x18\x02 \x01(\rB\n" + "\xfaB\a*\x05\x18\xfa\x01@\x01R\bpageSize\x12-\n" + "\n" + diff --git a/pb/c1/connector/v2/grant.pb.validate.go b/pb/c1/connector/v2/grant.pb.validate.go index 3ca3c1108..8d62e13c4 100644 --- a/pb/c1/connector/v2/grant.pb.validate.go +++ b/pb/c1/connector/v2/grant.pb.validate.go @@ -454,17 +454,6 @@ func (m *GrantsServiceListGrantsRequest) validate(all bool) error { var errors []error - if m.GetResource() == nil { - err := GrantsServiceListGrantsRequestValidationError{ - field: "Resource", - reason: "value is required", - } - if !all { - return err - } - errors = append(errors, err) - } - if all { switch v := interface{}(m.GetResource()).(type) { case interface{ ValidateAll() error }: diff --git a/pb/c1/connector/v2/grant_protoopaque.pb.go b/pb/c1/connector/v2/grant_protoopaque.pb.go index 19d3c97cd..540ba93ec 100644 --- a/pb/c1/connector/v2/grant_protoopaque.pb.go +++ b/pb/c1/connector/v2/grant_protoopaque.pb.go @@ -861,9 +861,9 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "\x02id\x18\x03 \x01(\tB\n" + "\xfaB\ar\x05 \x01(\x80\bR\x02id\x12A\n" + "\asources\x18\x05 \x01(\v2\x1d.c1.connector.v2.GrantSourcesB\b\xfaB\x05\x8a\x01\x02\x10\x00R\asources\x126\n" + - "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\x81\x03\n" + - "\x1eGrantsServiceListGrantsRequest\x12?\n" + - "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceB\b\xfaB\x05\x8a\x01\x02\x10\x01R\bresource\x12'\n" + + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xf7\x02\n" + + "\x1eGrantsServiceListGrantsRequest\x125\n" + + "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceR\bresource\x12'\n" + "\tpage_size\x18\x02 \x01(\rB\n" + "\xfaB\a*\x05\x18\xfa\x01@\x01R\bpageSize\x12-\n" + "\n" + diff --git a/pkg/connectorbuilder/resource_syncer.go b/pkg/connectorbuilder/resource_syncer.go index 4378730f9..b72796741 100644 --- a/pkg/connectorbuilder/resource_syncer.go +++ b/pkg/connectorbuilder/resource_syncer.go @@ -286,6 +286,13 @@ func (b *builder) ListGrants(ctx context.Context, request *v2.GrantsServiceListG start := b.nowFunc() tt := tasks.ListGrantsType + + if request.GetResource() == nil { + err := fmt.Errorf("error: list grants requires a resource") + b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start), err) + return nil, err + } + rid := request.GetResource().GetId() rb, ok := b.resourceSyncers[rid.GetResourceType()] if !ok { diff --git a/pkg/ratelimit/grpc.go b/pkg/ratelimit/grpc.go index cd6351cb7..411af3f6f 100644 --- a/pkg/ratelimit/grpc.go +++ b/pkg/ratelimit/grpc.go @@ -63,11 +63,13 @@ func getRatelimitDescriptors(ctx context.Context, method string, in interface{}, // ListEntitlements, ListGrants if req, ok := in.(hasResource); ok { - if resourceType := req.GetResource().GetId().GetResourceType(); resourceType != "" { - ret.SetEntries(append(ret.GetEntries(), ratelimitV1.RateLimitDescriptors_Entry_builder{ - Key: descriptorKeyConnectorResourceType, - Value: resourceType, - }.Build())) + if r := req.GetResource(); r != nil { + if resourceType := r.GetId().GetResourceType(); resourceType != "" { + ret.SetEntries(append(ret.GetEntries(), ratelimitV1.RateLimitDescriptors_Entry_builder{ + Key: descriptorKeyConnectorResourceType, + Value: resourceType, + }.Build())) + } } return ret } diff --git a/proto/c1/connector/v2/grant.proto b/proto/c1/connector/v2/grant.proto index 6fa347b4d..9af90c153 100644 --- a/proto/c1/connector/v2/grant.proto +++ b/proto/c1/connector/v2/grant.proto @@ -30,7 +30,7 @@ message Grant { } message GrantsServiceListGrantsRequest { - c1.connector.v2.Resource resource = 1 [(validate.rules).message = {required: true}]; + c1.connector.v2.Resource resource = 1; uint32 page_size = 2 [(validate.rules).uint32 = { ignore_empty: true lte: 250 From 5d369aa0f6e78569056a55a9f0524844f35bfd5f Mon Sep 17 00:00:00 2001 From: Matt Kaniaris Date: Thu, 5 Feb 2026 10:01:06 -0700 Subject: [PATCH 9/9] dont run migration forever --- pkg/dotc1z/c1file_attached.go | 4 ++++ pkg/dotc1z/grants.go | 41 +++++++++++++++++++++++++++++------ pkg/dotc1z/sql_helpers.go | 2 +- pkg/dotc1z/sync_runs.go | 1 + 4 files changed, 40 insertions(+), 8 deletions(-) diff --git a/pkg/dotc1z/c1file_attached.go b/pkg/dotc1z/c1file_attached.go index b0c45e5a3..56adccd4a 100644 --- a/pkg/dotc1z/c1file_attached.go +++ b/pkg/dotc1z/c1file_attached.go @@ -228,6 +228,8 @@ func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID "sync_type": connectorstore.SyncTypePartialDeletions, "parent_sync_id": oldSyncID, "linked_sync_id": upsertsSyncID, + // This sync is generated by the new SQL-layer diff logic, so it is safe for diff operations. + "supports_diff": 1, }) query, args, err := deletionsInsert.ToSQL() if err != nil { @@ -245,6 +247,8 @@ func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID "sync_type": connectorstore.SyncTypePartialUpserts, "parent_sync_id": oldSyncID, "linked_sync_id": deletionsSyncID, + // This sync is generated by the new SQL-layer diff logic, so it is safe for diff operations. + "supports_diff": 1, }) query, args, err = upsertsInsert.ToSQL() if err != nil { diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index 74879199e..7651ae917 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -425,15 +425,21 @@ func extractAndStripExpansion(grant *v2.Grant) ([]byte, bool) { } func backfillGrantExpansionColumn(ctx context.Context, db *goqu.Database, tableName string) error { - // Scan for rows that contain "GrantExpandable" in the proto blob but haven't been - // backfilled yet (expansion IS NULL). The LIKE filter skips the 99%+ of rows that - // don't have expandable annotations, making this fast even on large tables. + // Only backfill grants from syncs that don't support diff (old syncs created before + // this code change). New syncs set supports_diff=1 at creation and write grants with + // the expansion column populated correctly, so they don't need backfilling. + // + // The LIKE filter skips the 99%+ of rows that don't have expandable annotations, + // making this fast even on large tables. for { rows, err := db.QueryContext(ctx, fmt.Sprintf( - `SELECT id, data FROM %s - WHERE expansion IS NULL AND data LIKE '%%GrantExpandable%%' + `SELECT g.id, g.data FROM %s g + JOIN %s sr ON g.sync_id = sr.sync_id + WHERE g.expansion IS NULL + AND g.data LIKE '%%GrantExpandable%%' + AND sr.supports_diff = 0 LIMIT 1000`, - tableName, + tableName, syncRuns.Name(), )) if err != nil { return err @@ -486,7 +492,28 @@ func backfillGrantExpansionColumn(ctx context.Context, db *goqu.Database, tableN expansionBytes, needsExpansion := extractAndStripExpansion(g) if expansionBytes == nil { - // No valid expandable annotation - skip. + // Strip GrantExpandable so this row won't be retried forever. + annos := annotations.Annotations(g.GetAnnotations()) + filtered := annotations.Annotations{} + expandable := &v2.GrantExpandable{} + for _, a := range annos { + if !a.MessageIs(expandable) { + filtered = append(filtered, a) + } + } + g.SetAnnotations(filtered) + + newData, err := proto.Marshal(g) + if err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + if _, err := stmt.ExecContext(ctx, nil, 0, newData, r.id); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } continue } diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 860daa914..e6828b5e4 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -33,8 +33,8 @@ var allTableDescriptors = []tableDescriptor{ resourceTypes, resources, entitlements, + syncRuns, // Must be before grants since grants migration joins sync_runs. grants, - syncRuns, assets, sessionStore, } diff --git a/pkg/dotc1z/sync_runs.go b/pkg/dotc1z/sync_runs.go index a94e8faec..e0b6614ab 100644 --- a/pkg/dotc1z/sync_runs.go +++ b/pkg/dotc1z/sync_runs.go @@ -596,6 +596,7 @@ func (c *C1File) insertSyncRunWithLink(ctx context.Context, syncID string, syncT "sync_type": syncType, "parent_sync_id": parentSyncID, "linked_sync_id": linkedSyncID, + "supports_diff": 1, // New code writes grants with expansion column populated correctly. }) query, args, err := q.ToSQL()