diff --git a/pb/c1/connector/v2/grant.pb.go b/pb/c1/connector/v2/grant.pb.go index 485decbee..5fb9a29b1 100644 --- a/pb/c1/connector/v2/grant.pb.go +++ b/pb/c1/connector/v2/grant.pb.go @@ -228,14 +228,18 @@ func (b0 Grant_builder) Build() *Grant { } type GrantsServiceListGrantsRequest struct { - state protoimpl.MessageState `protogen:"hybrid.v1"` - Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` - PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` - PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` - Annotations []*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty"` - ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3" json:"active_sync_id,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"hybrid.v1"` + Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` + Annotations []*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty"` + ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3" json:"active_sync_id,omitempty"` + // If true, only return grants that are expandable (have GrantExpandable annotation). + ExpandableOnly bool `protobuf:"varint,6,opt,name=expandable_only,json=expandableOnly,proto3" json:"expandable_only,omitempty"` + // If true, only return grants that need expansion processing. + NeedsExpansionOnly bool `protobuf:"varint,7,opt,name=needs_expansion_only,json=needsExpansionOnly,proto3" json:"needs_expansion_only,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GrantsServiceListGrantsRequest) Reset() { @@ -298,6 +302,20 @@ func (x *GrantsServiceListGrantsRequest) GetActiveSyncId() string { return "" } +func (x *GrantsServiceListGrantsRequest) GetExpandableOnly() bool { + if x != nil { + return x.ExpandableOnly + } + return false +} + +func (x *GrantsServiceListGrantsRequest) GetNeedsExpansionOnly() bool { + if x != nil { + return x.NeedsExpansionOnly + } + return false +} + func (x *GrantsServiceListGrantsRequest) SetResource(v *Resource) { x.Resource = v } @@ -318,6 +336,14 @@ func (x *GrantsServiceListGrantsRequest) SetActiveSyncId(v string) { x.ActiveSyncId = v } +func (x *GrantsServiceListGrantsRequest) SetExpandableOnly(v bool) { + x.ExpandableOnly = v +} + +func (x *GrantsServiceListGrantsRequest) SetNeedsExpansionOnly(v bool) { + x.NeedsExpansionOnly = v +} + func (x *GrantsServiceListGrantsRequest) HasResource() bool { if x == nil { return false @@ -337,6 +363,10 @@ type GrantsServiceListGrantsRequest_builder struct { PageToken string Annotations []*anypb.Any ActiveSyncId string + // If true, only return grants that are expandable (have GrantExpandable annotation). + ExpandableOnly bool + // If true, only return grants that need expansion processing. + NeedsExpansionOnly bool } func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrantsRequest { @@ -348,6 +378,8 @@ func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrant x.PageToken = b.PageToken x.Annotations = b.Annotations x.ActiveSyncId = b.ActiveSyncId + x.ExpandableOnly = b.ExpandableOnly + x.NeedsExpansionOnly = b.NeedsExpansionOnly return m0 } @@ -813,16 +845,18 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "\x02id\x18\x03 \x01(\tB\n" + "\xfaB\ar\x05 \x01(\x80\bR\x02id\x12A\n" + "\asources\x18\x05 \x01(\v2\x1d.c1.connector.v2.GrantSourcesB\b\xfaB\x05\x8a\x01\x02\x10\x00R\asources\x126\n" + - "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xa6\x02\n" + - "\x1eGrantsServiceListGrantsRequest\x12?\n" + - "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceB\b\xfaB\x05\x8a\x01\x02\x10\x01R\bresource\x12'\n" + + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xf7\x02\n" + + "\x1eGrantsServiceListGrantsRequest\x125\n" + + "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceR\bresource\x12'\n" + "\tpage_size\x18\x02 \x01(\rB\n" + "\xfaB\a*\x05\x18\xfa\x01@\x01R\bpageSize\x12-\n" + "\n" + "page_token\x18\x03 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\tpageToken\x126\n" + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\x123\n" + "\x0eactive_sync_id\x18\x05 \x01(\tB\r\xfaB\n" + - "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\"\xbd\x01\n" + + "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\x12'\n" + + "\x0fexpandable_only\x18\x06 \x01(\bR\x0eexpandableOnly\x120\n" + + "\x14needs_expansion_only\x18\a \x01(\bR\x12needsExpansionOnly\"\xbd\x01\n" + "\x1fGrantsServiceListGrantsResponse\x12*\n" + "\x04list\x18\x01 \x03(\v2\x16.c1.connector.v2.GrantR\x04list\x126\n" + "\x0fnext_page_token\x18\x02 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\rnextPageToken\x126\n" + diff --git a/pb/c1/connector/v2/grant.pb.validate.go b/pb/c1/connector/v2/grant.pb.validate.go index a66db3e1f..8d62e13c4 100644 --- a/pb/c1/connector/v2/grant.pb.validate.go +++ b/pb/c1/connector/v2/grant.pb.validate.go @@ -454,17 +454,6 @@ func (m *GrantsServiceListGrantsRequest) validate(all bool) error { var errors []error - if m.GetResource() == nil { - err := GrantsServiceListGrantsRequestValidationError{ - field: "Resource", - reason: "value is required", - } - if !all { - return err - } - errors = append(errors, err) - } - if all { switch v := interface{}(m.GetResource()).(type) { case interface{ ValidateAll() error }: @@ -573,6 +562,10 @@ func (m *GrantsServiceListGrantsRequest) validate(all bool) error { } + // no validation rules for ExpandableOnly + + // no validation rules for NeedsExpansionOnly + if len(errors) > 0 { return GrantsServiceListGrantsRequestMultiError(errors) } diff --git a/pb/c1/connector/v2/grant_protoopaque.pb.go b/pb/c1/connector/v2/grant_protoopaque.pb.go index 2aa8d446e..540ba93ec 100644 --- a/pb/c1/connector/v2/grant_protoopaque.pb.go +++ b/pb/c1/connector/v2/grant_protoopaque.pb.go @@ -230,14 +230,16 @@ func (b0 Grant_builder) Build() *Grant { } type GrantsServiceListGrantsRequest struct { - state protoimpl.MessageState `protogen:"opaque.v1"` - xxx_hidden_Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3"` - xxx_hidden_PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3"` - xxx_hidden_PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3"` - xxx_hidden_Annotations *[]*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3"` - xxx_hidden_ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"opaque.v1"` + xxx_hidden_Resource *Resource `protobuf:"bytes,1,opt,name=resource,proto3"` + xxx_hidden_PageSize uint32 `protobuf:"varint,2,opt,name=page_size,json=pageSize,proto3"` + xxx_hidden_PageToken string `protobuf:"bytes,3,opt,name=page_token,json=pageToken,proto3"` + xxx_hidden_Annotations *[]*anypb.Any `protobuf:"bytes,4,rep,name=annotations,proto3"` + xxx_hidden_ActiveSyncId string `protobuf:"bytes,5,opt,name=active_sync_id,json=activeSyncId,proto3"` + xxx_hidden_ExpandableOnly bool `protobuf:"varint,6,opt,name=expandable_only,json=expandableOnly,proto3"` + xxx_hidden_NeedsExpansionOnly bool `protobuf:"varint,7,opt,name=needs_expansion_only,json=needsExpansionOnly,proto3"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GrantsServiceListGrantsRequest) Reset() { @@ -302,6 +304,20 @@ func (x *GrantsServiceListGrantsRequest) GetActiveSyncId() string { return "" } +func (x *GrantsServiceListGrantsRequest) GetExpandableOnly() bool { + if x != nil { + return x.xxx_hidden_ExpandableOnly + } + return false +} + +func (x *GrantsServiceListGrantsRequest) GetNeedsExpansionOnly() bool { + if x != nil { + return x.xxx_hidden_NeedsExpansionOnly + } + return false +} + func (x *GrantsServiceListGrantsRequest) SetResource(v *Resource) { x.xxx_hidden_Resource = v } @@ -322,6 +338,14 @@ func (x *GrantsServiceListGrantsRequest) SetActiveSyncId(v string) { x.xxx_hidden_ActiveSyncId = v } +func (x *GrantsServiceListGrantsRequest) SetExpandableOnly(v bool) { + x.xxx_hidden_ExpandableOnly = v +} + +func (x *GrantsServiceListGrantsRequest) SetNeedsExpansionOnly(v bool) { + x.xxx_hidden_NeedsExpansionOnly = v +} + func (x *GrantsServiceListGrantsRequest) HasResource() bool { if x == nil { return false @@ -341,6 +365,10 @@ type GrantsServiceListGrantsRequest_builder struct { PageToken string Annotations []*anypb.Any ActiveSyncId string + // If true, only return grants that are expandable (have GrantExpandable annotation). + ExpandableOnly bool + // If true, only return grants that need expansion processing. + NeedsExpansionOnly bool } func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrantsRequest { @@ -352,6 +380,8 @@ func (b0 GrantsServiceListGrantsRequest_builder) Build() *GrantsServiceListGrant x.xxx_hidden_PageToken = b.PageToken x.xxx_hidden_Annotations = &b.Annotations x.xxx_hidden_ActiveSyncId = b.ActiveSyncId + x.xxx_hidden_ExpandableOnly = b.ExpandableOnly + x.xxx_hidden_NeedsExpansionOnly = b.NeedsExpansionOnly return m0 } @@ -831,16 +861,18 @@ const file_c1_connector_v2_grant_proto_rawDesc = "" + "\x02id\x18\x03 \x01(\tB\n" + "\xfaB\ar\x05 \x01(\x80\bR\x02id\x12A\n" + "\asources\x18\x05 \x01(\v2\x1d.c1.connector.v2.GrantSourcesB\b\xfaB\x05\x8a\x01\x02\x10\x00R\asources\x126\n" + - "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xa6\x02\n" + - "\x1eGrantsServiceListGrantsRequest\x12?\n" + - "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceB\b\xfaB\x05\x8a\x01\x02\x10\x01R\bresource\x12'\n" + + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\"\xf7\x02\n" + + "\x1eGrantsServiceListGrantsRequest\x125\n" + + "\bresource\x18\x01 \x01(\v2\x19.c1.connector.v2.ResourceR\bresource\x12'\n" + "\tpage_size\x18\x02 \x01(\rB\n" + "\xfaB\a*\x05\x18\xfa\x01@\x01R\bpageSize\x12-\n" + "\n" + "page_token\x18\x03 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\tpageToken\x126\n" + "\vannotations\x18\x04 \x03(\v2\x14.google.protobuf.AnyR\vannotations\x123\n" + "\x0eactive_sync_id\x18\x05 \x01(\tB\r\xfaB\n" + - "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\"\xbd\x01\n" + + "r\b \x01(\x80\b\xd0\x01\x01R\factiveSyncId\x12'\n" + + "\x0fexpandable_only\x18\x06 \x01(\bR\x0eexpandableOnly\x120\n" + + "\x14needs_expansion_only\x18\a \x01(\bR\x12needsExpansionOnly\"\xbd\x01\n" + "\x1fGrantsServiceListGrantsResponse\x12*\n" + "\x04list\x18\x01 \x03(\v2\x16.c1.connector.v2.GrantR\x04list\x126\n" + "\x0fnext_page_token\x18\x02 \x01(\tB\x0e\xfaB\vr\t \x01(\x80\x80@\xd0\x01\x01R\rnextPageToken\x126\n" + diff --git a/pkg/connectorbuilder/resource_syncer.go b/pkg/connectorbuilder/resource_syncer.go index 4378730f9..b72796741 100644 --- a/pkg/connectorbuilder/resource_syncer.go +++ b/pkg/connectorbuilder/resource_syncer.go @@ -286,6 +286,13 @@ func (b *builder) ListGrants(ctx context.Context, request *v2.GrantsServiceListG start := b.nowFunc() tt := tasks.ListGrantsType + + if request.GetResource() == nil { + err := fmt.Errorf("error: list grants requires a resource") + b.m.RecordTaskFailure(ctx, tt, b.nowFunc().Sub(start), err) + return nil, err + } + rid := request.GetResource().GetId() rb, ok := b.resourceSyncers[rid.GetResourceType()] if !ok { diff --git a/pkg/connectorstore/connectorstore.go b/pkg/connectorstore/connectorstore.go index 65793949e..0b85801ce 100644 --- a/pkg/connectorstore/connectorstore.go +++ b/pkg/connectorstore/connectorstore.go @@ -70,3 +70,10 @@ type Writer interface { PutEntitlements(ctx context.Context, entitlements ...*v2.Entitlement) error DeleteGrant(ctx context.Context, grantId string) error } + +// ExpansionStore provides methods for grant expansion operations. +// Not all store implementations support expansion; callers should type-assert. +type ExpansionStore interface { + // SetSupportsDiff marks the sync as supporting diff operations. + SetSupportsDiff(ctx context.Context, syncID string) error +} diff --git a/pkg/dotc1z/c1file_attached.go b/pkg/dotc1z/c1file_attached.go index 54e64f4f2..56adccd4a 100644 --- a/pkg/dotc1z/c1file_attached.go +++ b/pkg/dotc1z/c1file_attached.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "strings" "time" reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" @@ -227,6 +228,8 @@ func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID "sync_type": connectorstore.SyncTypePartialDeletions, "parent_sync_id": oldSyncID, "linked_sync_id": upsertsSyncID, + // This sync is generated by the new SQL-layer diff logic, so it is safe for diff operations. + "supports_diff": 1, }) query, args, err := deletionsInsert.ToSQL() if err != nil { @@ -244,6 +247,8 @@ func (c *C1FileAttached) GenerateSyncDiffFromFile(ctx context.Context, oldSyncID "sync_type": connectorstore.SyncTypePartialUpserts, "parent_sync_id": oldSyncID, "linked_sync_id": deletionsSyncID, + // This sync is generated by the new SQL-layer diff logic, so it is safe for diff operations. + "supports_diff": 1, }) query, args, err = upsertsInsert.ToSQL() if err != nil { @@ -374,6 +379,18 @@ func (c *C1FileAttached) diffTableFromMainTx(ctx context.Context, tx *sql.Tx, ta // 1. Not in attached (OLD) - additions // 2. In attached but with different data - modifications // newSyncID is in main, oldSyncID is in attached + // + // For grants, we also compare the expansion column since GrantExpandable + // annotation is stored separately from data. + var dataCompare string + if strings.Contains(tableName, "grants") { + // For grants: compare both data AND expansion columns. + // Use IFNULL to handle NULL expansion values. + dataCompare = "(a.data != m.data OR IFNULL(a.expansion, X'') != IFNULL(m.expansion, X''))" + } else { + dataCompare = "a.data != m.data" + } + //nolint:gosec // table names are from hardcoded list, not user input query := fmt.Sprintf(` INSERT INTO main.%s (%s) @@ -389,10 +406,10 @@ func (c *C1FileAttached) diffTableFromMainTx(ctx context.Context, tx *sql.Tx, ta SELECT 1 FROM attached.%s AS a WHERE a.external_id = m.external_id AND a.sync_id = ? - AND a.data != m.data + AND %s ) ) - `, tableName, columnList, selectList, tableName, tableName, tableName) + `, tableName, columnList, selectList, tableName, tableName, tableName, dataCompare) _, err = tx.ExecContext(ctx, query, targetSyncID, newSyncID, oldSyncID, oldSyncID) return err diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index c091d7471..7651ae917 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -3,8 +3,10 @@ package dotc1z import ( "context" "fmt" + "strings" "github.com/doug-martin/goqu/v9" + "google.golang.org/protobuf/proto" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" reader_v2 "github.com/conductorone/baton-sdk/pb/c1/reader/v2" @@ -22,6 +24,8 @@ create table if not exists %s ( principal_resource_type_id text not null, principal_resource_id text not null, external_id text not null, + expansion blob, -- Serialized GrantExpandable proto; NULL if grant is not expandable. + needs_expansion integer not null default 0, -- 1 if grant should be processed during expansion. data blob not null, sync_id text not null, discovered_at datetime not null @@ -59,15 +63,53 @@ func (r *grantsTable) Schema() (string, []any) { } } +// isAlreadyExistsError returns true if err is a SQLite "duplicate column name" error. +func isAlreadyExistsError(err error) bool { + return err != nil && strings.Contains(err.Error(), "duplicate column name") +} + func (r *grantsTable) Migrations(ctx context.Context, db *goqu.Database) error { - return nil + // Add expansion column if missing (for older files). + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "alter table %s add column expansion blob", r.Name(), + )); err != nil && !isAlreadyExistsError(err) { + return err + } + + // Add needs_expansion column if missing. + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "alter table %s add column needs_expansion integer not null default 0", r.Name(), + )); err != nil && !isAlreadyExistsError(err) { + return err + } + + // Create partial index for efficient queries on expandable grants. + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "create index if not exists %s on %s (sync_id) where expansion is not null", + fmt.Sprintf("idx_grants_sync_expansion_v%s", r.Version()), + r.Name(), + )); err != nil { + return err + } + + // Create index for needs_expansion queries. + if _, err := db.ExecContext(ctx, fmt.Sprintf( + "create index if not exists %s on %s (sync_id, needs_expansion)", + fmt.Sprintf("idx_grants_sync_needs_expansion_v%s", r.Version()), + r.Name(), + )); err != nil { + return err + } + + // Backfill expansion column from stored grant bytes. + return backfillGrantExpansionColumn(ctx, db, r.Name()) } func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGrantsRequest) (*v2.GrantsServiceListGrantsResponse, error) { ctx, span := tracer.Start(ctx, "C1File.ListGrants") defer span.End() - ret, nextPageToken, err := listConnectorObjects(ctx, c, grants.Name(), request, func() *v2.Grant { return &v2.Grant{} }) + ret, nextPageToken, err := listGrantsWithExpansion(ctx, c, grants.Name(), request) if err != nil { return nil, fmt.Errorf("error listing grants: %w", err) } @@ -78,6 +120,154 @@ func (c *C1File) ListGrants(ctx context.Context, request *v2.GrantsServiceListGr }.Build(), nil } +// listGrantsWithExpansion lists grants and re-attaches the expansion annotation from the +// separate column back to the grant's annotations. This allows callers to access the +// GrantExpandable annotation as if it were stored in the data blob. +func listGrantsWithExpansion( + ctx context.Context, + c *C1File, + tableName string, + request *v2.GrantsServiceListGrantsRequest, +) ([]*v2.Grant, string, error) { + // Use generic listing but post-process to attach expansion + grants, nextPageToken, err := listGrantsInternal(ctx, c, tableName, request) + if err != nil { + return nil, "", err + } + return grants, nextPageToken, nil +} + +// listGrantsInternal performs the actual grant listing with expansion column handling. +func listGrantsInternal( + ctx context.Context, + c *C1File, + tableName string, + req *v2.GrantsServiceListGrantsRequest, +) ([]*v2.Grant, string, error) { + annoSyncID, err := annotations.GetSyncIdFromAnnotations(req.GetAnnotations()) + if err != nil { + return nil, "", fmt.Errorf("error getting sync id from annotations for list request: %w", err) + } + + var reqSyncID string + switch { + case annoSyncID != "": + reqSyncID = annoSyncID + case c.currentSyncID != "": + reqSyncID = c.currentSyncID + case c.viewSyncID != "": + reqSyncID = c.viewSyncID + default: + reqSyncID = "" + } + + q := c.db.From(tableName).Prepared(true) + q = q.Select("id", "data", "expansion") + + // Apply resource filters + r := req.GetResource() + if r != nil { + q = q.Where(goqu.C("resource_id").Eq(r.GetId().GetResource())) + q = q.Where(goqu.C("resource_type_id").Eq(r.GetId().GetResourceType())) + } + + // Apply expandable/needs_expansion filters + if req.GetExpandableOnly() { + q = q.Where(goqu.C("expansion").IsNotNull()) + } + if req.GetNeedsExpansionOnly() { + q = q.Where(goqu.C("needs_expansion").Eq(1)) + } + + // Apply sync ID filter + switch { + case reqSyncID != "": + q = q.Where(goqu.C("sync_id").Eq(reqSyncID)) + default: + latestSyncRun, err := c.getCachedViewSyncRun(ctx) + if err != nil { + return nil, "", err + } + if latestSyncRun != nil { + q = q.Where(goqu.C("sync_id").Eq(latestSyncRun.ID)) + } + } + + // Handle pagination + if req.GetPageToken() != "" { + q = q.Where(goqu.C("id").Gte(req.GetPageToken())) + } + + pageSize := req.GetPageSize() + if pageSize > maxPageSize || pageSize == 0 { + pageSize = maxPageSize + } + q = q.Order(goqu.C("id").Asc()) + q = q.Limit(uint(pageSize + 1)) + + query, args, err := q.ToSQL() + if err != nil { + return nil, "", err + } + + rows, err := c.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, "", err + } + defer rows.Close() + + var unmarshalerOptions = proto.UnmarshalOptions{ + Merge: true, + DiscardUnknown: true, + } + + var count uint32 + var lastRow int + var ret []*v2.Grant + + for rows.Next() { + count++ + if count > pageSize { + break + } + + var id int + var data []byte + var expansion []byte + if err := rows.Scan(&id, &data, &expansion); err != nil { + return nil, "", err + } + lastRow = id + + grant := &v2.Grant{} + if err := unmarshalerOptions.Unmarshal(data, grant); err != nil { + return nil, "", err + } + + // Re-attach expansion annotation if present. + if len(expansion) > 0 { + expandable := &v2.GrantExpandable{} + if err := proto.Unmarshal(expansion, expandable); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal expansion: %w", err) + } + annos := annotations.Annotations(grant.GetAnnotations()) + annos.Append(expandable) + grant.SetAnnotations(annos) + } + + ret = append(ret, grant) + } + if err := rows.Err(); err != nil { + return nil, "", err + } + + nextPageToken := "" + if count > pageSize { + nextPageToken = fmt.Sprintf("%d", lastRow+1) + } + return ret, nextPageToken, nil +} + func (c *C1File) GetGrant(ctx context.Context, request *reader_v2.GrantsReaderServiceGetGrantRequest) (*reader_v2.GrantsReaderServiceGetGrantResponse, error) { ctx, span := tracer.Start(ctx, "C1File.GetGrant") defer span.End() @@ -154,14 +344,14 @@ func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error { ctx, span := tracer.Start(ctx, "C1File.PutGrants") defer span.End() - return c.putGrantsInternal(ctx, bulkPutConnectorObject, bulkGrants...) + return c.putGrantsInternal(ctx, bulkPutGrants, bulkGrants...) } func (c *C1File) PutGrantsIfNewer(ctx context.Context, bulkGrants ...*v2.Grant) error { ctx, span := tracer.Start(ctx, "C1File.PutGrantsIfNewer") defer span.End() - return c.putGrantsInternal(ctx, bulkPutConnectorObjectIfNewer, bulkGrants...) + return c.putGrantsInternal(ctx, bulkPutGrantsIfNewer, bulkGrants...) } type grantPutFunc func(context.Context, *C1File, string, func(m *v2.Grant) (goqu.Record, error), ...*v2.Grant) error @@ -173,12 +363,16 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran err := f(ctx, c, grants.Name(), func(grant *v2.Grant) (goqu.Record, error) { + expansionBytes, needsExpansion := extractAndStripExpansion(grant) + return goqu.Record{ "resource_type_id": grant.GetEntitlement().GetResource().GetId().GetResourceType(), "resource_id": grant.GetEntitlement().GetResource().GetId().GetResource(), "entitlement_id": grant.GetEntitlement().GetId(), "principal_resource_type_id": grant.GetPrincipal().GetId().GetResourceType(), "principal_resource_id": grant.GetPrincipal().GetId().GetResource(), + "expansion": expansionBytes, // nil for non-expandable grants + "needs_expansion": needsExpansion, }, nil }, bulkGrants..., @@ -190,6 +384,239 @@ func (c *C1File) putGrantsInternal(ctx context.Context, f grantPutFunc, bulkGran return nil } +// extractAndStripExpansion extracts the GrantExpandable annotation from the grant, +// removes it from the grant's annotations, and returns the serialized proto bytes. +// Returns (nil, false) if the grant has no valid GrantExpandable annotation. +func extractAndStripExpansion(grant *v2.Grant) ([]byte, bool) { + annos := annotations.Annotations(grant.GetAnnotations()) + expandable := &v2.GrantExpandable{} + ok, err := annos.Pick(expandable) + if err != nil || !ok || len(expandable.GetEntitlementIds()) == 0 { + return nil, false + } + + // Check that there's at least one non-whitespace entitlement ID. + hasValid := false + for _, id := range expandable.GetEntitlementIds() { + if strings.TrimSpace(id) != "" { + hasValid = true + break + } + } + if !hasValid { + return nil, false + } + + // Strip the GrantExpandable annotation from the grant by filtering it out. + filtered := annotations.Annotations{} + for _, a := range annos { + if !a.MessageIs(expandable) { + filtered = append(filtered, a) + } + } + grant.SetAnnotations(filtered) + + // Serialize the expandable annotation. + data, err := proto.Marshal(expandable) + if err != nil { + return nil, false + } + return data, true +} + +func backfillGrantExpansionColumn(ctx context.Context, db *goqu.Database, tableName string) error { + // Only backfill grants from syncs that don't support diff (old syncs created before + // this code change). New syncs set supports_diff=1 at creation and write grants with + // the expansion column populated correctly, so they don't need backfilling. + // + // The LIKE filter skips the 99%+ of rows that don't have expandable annotations, + // making this fast even on large tables. + for { + rows, err := db.QueryContext(ctx, fmt.Sprintf( + `SELECT g.id, g.data FROM %s g + JOIN %s sr ON g.sync_id = sr.sync_id + WHERE g.expansion IS NULL + AND g.data LIKE '%%GrantExpandable%%' + AND sr.supports_diff = 0 + LIMIT 1000`, + tableName, syncRuns.Name(), + )) + if err != nil { + return err + } + + type row struct { + id int64 + data []byte + } + batch := make([]row, 0, 1000) + for rows.Next() { + var r row + if err := rows.Scan(&r.id, &r.data); err != nil { + _ = rows.Close() + return err + } + batch = append(batch, r) + } + if err := rows.Err(); err != nil { + _ = rows.Close() + return err + } + _ = rows.Close() + + if len(batch) == 0 { + return nil + } + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + stmt, err := tx.PrepareContext(ctx, fmt.Sprintf( + `UPDATE %s SET expansion=?, needs_expansion=?, data=? WHERE id=?`, + tableName, + )) + if err != nil { + _ = tx.Rollback() + return err + } + + for _, r := range batch { + g := &v2.Grant{} + if err := proto.Unmarshal(r.data, g); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + + expansionBytes, needsExpansion := extractAndStripExpansion(g) + if expansionBytes == nil { + // Strip GrantExpandable so this row won't be retried forever. + annos := annotations.Annotations(g.GetAnnotations()) + filtered := annotations.Annotations{} + expandable := &v2.GrantExpandable{} + for _, a := range annos { + if !a.MessageIs(expandable) { + filtered = append(filtered, a) + } + } + g.SetAnnotations(filtered) + + newData, err := proto.Marshal(g) + if err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + if _, err := stmt.ExecContext(ctx, nil, 0, newData, r.id); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + continue + } + + // Re-serialize the grant with the annotation stripped. + newData, err := proto.Marshal(g) + if err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + + if _, err := stmt.ExecContext(ctx, expansionBytes, needsExpansion, newData, r.id); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return err + } + } + + _ = stmt.Close() + if err := tx.Commit(); err != nil { + return err + } + } +} + +func bulkPutGrants( + ctx context.Context, c *C1File, + tableName string, + extractFields func(m *v2.Grant) (goqu.Record, error), + msgs ...*v2.Grant, +) error { + return bulkPutGrantsInternal(ctx, c, tableName, extractFields, false, msgs...) +} + +func bulkPutGrantsIfNewer( + ctx context.Context, c *C1File, + tableName string, + extractFields func(m *v2.Grant) (goqu.Record, error), + msgs ...*v2.Grant, +) error { + return bulkPutGrantsInternal(ctx, c, tableName, extractFields, true, msgs...) +} + +func bulkPutGrantsInternal( + ctx context.Context, c *C1File, + tableName string, + extractFields func(m *v2.Grant) (goqu.Record, error), + ifNewer bool, + msgs ...*v2.Grant, +) error { + if len(msgs) == 0 { + return nil + } + ctx, span := tracer.Start(ctx, "C1File.bulkPutGrants") + defer span.End() + + if err := c.validateSyncDb(ctx); err != nil { + return err + } + + // Prepare rows. + rows, err := prepareConnectorObjectRows(c, msgs, extractFields) + if err != nil { + return err + } + + // needs_expansion should only flip to 1 when expansion changes from NULL to non-NULL. + // If a grant is no longer expandable (expansion IS NULL), needs_expansion should be forced to 0. + needsExpansionExpr := goqu.L( + `CASE + WHEN EXCLUDED.expansion IS NULL THEN 0 + WHEN ?.expansion IS NULL AND EXCLUDED.expansion IS NOT NULL THEN 1 + WHEN ?.expansion IS NOT NULL AND EXCLUDED.expansion IS NOT NULL AND ?.expansion != EXCLUDED.expansion THEN 1 + ELSE ?.needs_expansion + END`, + goqu.I(tableName), goqu.I(tableName), goqu.I(tableName), goqu.I(tableName), + ) + + buildQueryFn := func(insertDs *goqu.InsertDataset, chunkedRows []*goqu.Record) (*goqu.InsertDataset, error) { + update := goqu.Record{ + "data": goqu.I("EXCLUDED.data"), + "expansion": goqu.I("EXCLUDED.expansion"), + "needs_expansion": needsExpansionExpr, + } + if ifNewer { + update["discovered_at"] = goqu.I("EXCLUDED.discovered_at") + return insertDs. + OnConflict(goqu.DoUpdate("external_id, sync_id", update).Where( + goqu.L("EXCLUDED.discovered_at > ?.discovered_at", goqu.I(tableName)), + )). + Rows(chunkedRows). + Prepared(true), nil + } + + return insertDs. + OnConflict(goqu.DoUpdate("external_id, sync_id", update)). + Rows(chunkedRows). + Prepared(true), nil + } + + return executeChunkedInsert(ctx, c, tableName, rows, buildQueryFn) +} + func (c *C1File) DeleteGrant(ctx context.Context, grantId string) error { ctx, span := tracer.Start(ctx, "C1File.DeleteGrant") defer span.End() diff --git a/pkg/dotc1z/grants_test.go b/pkg/dotc1z/grants_test.go new file mode 100644 index 000000000..1b17b73b8 --- /dev/null +++ b/pkg/dotc1z/grants_test.go @@ -0,0 +1,72 @@ +package dotc1z + +import ( + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" +) + +func TestExtractAndStripExpansion_WhitespaceOnlyEntitlementIDs(t *testing.T) { + // Create a GrantExpandable annotation with only whitespace entitlement IDs. + // This should return nil because there are no valid source entitlements. + expandable := v2.GrantExpandable_builder{ + EntitlementIds: []string{" ", "\t", " \n "}, + }.Build() + + expandableAny, err := anypb.New(expandable) + require.NoError(t, err) + + grant := v2.Grant_builder{ + Id: "test-grant", + Entitlement: v2.Entitlement_builder{ + Id: "test-entitlement", + }.Build(), + Principal: v2.Resource_builder{ + Id: v2.ResourceId_builder{ + ResourceType: "user", + Resource: "user1", + }.Build(), + }.Build(), + Annotations: []*anypb.Any{expandableAny}, + }.Build() + + expansionBytes, isExpandable := extractAndStripExpansion(grant) + require.False(t, isExpandable, "grant with only whitespace entitlement IDs should not be expandable") + require.Nil(t, expansionBytes, "expansion bytes should be nil for non-expandable grant") +} + +func TestExtractAndStripExpansion_MixedWhitespaceAndValidIDs(t *testing.T) { + // Create a GrantExpandable annotation with a mix of whitespace and valid IDs. + // The grant should still be expandable because there's at least one valid ID. + expandable := v2.GrantExpandable_builder{ + EntitlementIds: []string{" ", "valid-entitlement-id", "\t"}, + Shallow: true, + }.Build() + + expandableAny, err := anypb.New(expandable) + require.NoError(t, err) + + grant := v2.Grant_builder{ + Id: "test-grant", + Entitlement: v2.Entitlement_builder{ + Id: "test-entitlement", + }.Build(), + Principal: v2.Resource_builder{ + Id: v2.ResourceId_builder{ + ResourceType: "user", + Resource: "user1", + }.Build(), + }.Build(), + Annotations: []*anypb.Any{expandableAny}, + }.Build() + + expansionBytes, isExpandable := extractAndStripExpansion(grant) + require.True(t, isExpandable, "grant with valid entitlement ID should be expandable") + require.NotNil(t, expansionBytes, "expansion bytes should not be nil for expandable grant") + + // Verify that the annotation was stripped from the grant. + require.Len(t, grant.GetAnnotations(), 0, "GrantExpandable annotation should be stripped from grant") +} diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 8528537b5..e6828b5e4 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -33,8 +33,8 @@ var allTableDescriptors = []tableDescriptor{ resourceTypes, resources, entitlements, + syncRuns, // Must be before grants since grants migration joins sync_runs. grants, - syncRuns, assets, sessionStore, } @@ -88,6 +88,16 @@ type hasParentResourceIdListRequest interface { GetParentResourceId() *v2.ResourceId } +type hasExpandableOnlyListRequest interface { + listRequest + GetExpandableOnly() bool +} + +type hasNeedsExpansionOnlyListRequest interface { + listRequest + GetNeedsExpansionOnly() bool +} + type protoHasID interface { proto.Message GetId() string @@ -202,6 +212,14 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table } } + if expandableReq, ok := req.(hasExpandableOnlyListRequest); ok && expandableReq.GetExpandableOnly() { + q = q.Where(goqu.C("expansion").IsNotNull()) + } + + if needsExpansionReq, ok := req.(hasNeedsExpansionOnlyListRequest); ok && needsExpansionReq.GetNeedsExpansionOnly() { + q = q.Where(goqu.C("needs_expansion").Eq(1)) + } + // If a sync is running, be sure we only select from the current values switch { case reqSyncID != "": diff --git a/pkg/dotc1z/sync_runs.go b/pkg/dotc1z/sync_runs.go index 4426a3e57..e0b6614ab 100644 --- a/pkg/dotc1z/sync_runs.go +++ b/pkg/dotc1z/sync_runs.go @@ -33,7 +33,8 @@ create table if not exists %s ( sync_token text not null, sync_type text not null default 'full', parent_sync_id text not null default '', - linked_sync_id text not null default '' + linked_sync_id text not null default '', + supports_diff integer not null default 0 ); create unique index if not exists %s on %s (sync_id);` @@ -97,6 +98,11 @@ func (r *syncRunsTable) Migrations(ctx context.Context, db *goqu.Database) error } } + // Add supports_diff column if missing (for older files). + if _, err = db.ExecContext(ctx, fmt.Sprintf("alter table %s add column supports_diff integer not null default 0", r.Name())); err != nil && !isAlreadyExistsError(err) { + return err + } + return nil } @@ -108,6 +114,7 @@ type syncRun struct { Type connectorstore.SyncType ParentSyncID string LinkedSyncID string + SupportsDiff bool } // getCachedViewSyncRun returns the cached sync run for read operations. @@ -159,7 +166,7 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connector oneWeekAgo := time.Now().AddDate(0, 0, -7) ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") q = q.Where(goqu.C("ended_at").IsNull()) q = q.Where(goqu.C("started_at").Gte(oneWeekAgo)) q = q.Order(goqu.C("started_at").Desc()) @@ -175,7 +182,7 @@ func (c *C1File) getLatestUnfinishedSync(ctx context.Context, syncType connector row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.SupportsDiff) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -202,7 +209,7 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint, syncType conn ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") q = q.Where(goqu.C("ended_at").IsNotNull()) if syncType != connectorstore.SyncTypeAny { q = q.Where(goqu.C("sync_type").Eq(syncType)) @@ -221,7 +228,7 @@ func (c *C1File) getFinishedSync(ctx context.Context, offset uint, syncType conn row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.SupportsDiff) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -242,7 +249,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } q := c.db.From(syncRuns.Name()).Prepared(true) - q = q.Select("id", "sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("id", "sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") if pageToken != "" { q = q.Where(goqu.C("id").Gte(pageToken)) @@ -277,7 +284,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui } rowId := 0 data := &syncRun{} - err := rows.Scan(&rowId, &data.ID, &data.StartedAt, &data.EndedAt, &data.SyncToken, &data.Type, &data.ParentSyncID, &data.LinkedSyncID) + err := rows.Scan(&rowId, &data.ID, &data.StartedAt, &data.EndedAt, &data.SyncToken, &data.Type, &data.ParentSyncID, &data.LinkedSyncID, &data.SupportsDiff) if err != nil { return nil, "", err } @@ -366,7 +373,7 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { ret := &syncRun{} q := c.db.From(syncRuns.Name()) - q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id") + q = q.Select("sync_id", "started_at", "ended_at", "sync_token", "sync_type", "parent_sync_id", "linked_sync_id", "supports_diff") q = q.Where(goqu.C("sync_id").Eq(syncID)) query, args, err := q.ToSQL() @@ -374,7 +381,7 @@ func (c *C1File) getSync(ctx context.Context, syncID string) (*syncRun, error) { return nil, err } row := c.db.QueryRowContext(ctx, query, args...) - err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID) + err = row.Scan(&ret.ID, &ret.StartedAt, &ret.EndedAt, &ret.SyncToken, &ret.Type, &ret.ParentSyncID, &ret.LinkedSyncID, &ret.SupportsDiff) if err != nil { return nil, err } @@ -589,6 +596,7 @@ func (c *C1File) insertSyncRunWithLink(ctx context.Context, syncID string, syncT "sync_type": syncType, "parent_sync_id": parentSyncID, "linked_sync_id": linkedSyncID, + "supports_diff": 1, // New code writes grants with expansion column populated correctly. }) query, args, err := q.ToSQL() @@ -658,6 +666,37 @@ func (c *C1File) endSyncRun(ctx context.Context, syncID string) error { return nil } +// SetSupportsDiff marks the given sync as supporting diff operations. +// This indicates the sync has SQL-layer grant metadata (is_expandable) properly populated. +func (c *C1File) SetSupportsDiff(ctx context.Context, syncID string) error { + ctx, span := tracer.Start(ctx, "C1File.SetSupportsDiff") + defer span.End() + + if c.readOnly { + return ErrReadOnly + } + + q := c.db.Update(syncRuns.Name()) + q = q.Set(goqu.Record{ + "supports_diff": 1, + }) + q = q.Where(goqu.C("sync_id").Eq(syncID)) + q = q.Where(goqu.C("supports_diff").Eq(0)) + + query, args, err := q.ToSQL() + if err != nil { + return err + } + + _, err = c.db.ExecContext(ctx, query, args...) + if err != nil { + return err + } + c.dbUpdated = true + + return nil +} + func (c *C1File) Cleanup(ctx context.Context) error { ctx, span := tracer.Start(ctx, "C1File.Cleanup") defer span.End() diff --git a/pkg/ratelimit/grpc.go b/pkg/ratelimit/grpc.go index cd6351cb7..411af3f6f 100644 --- a/pkg/ratelimit/grpc.go +++ b/pkg/ratelimit/grpc.go @@ -63,11 +63,13 @@ func getRatelimitDescriptors(ctx context.Context, method string, in interface{}, // ListEntitlements, ListGrants if req, ok := in.(hasResource); ok { - if resourceType := req.GetResource().GetId().GetResourceType(); resourceType != "" { - ret.SetEntries(append(ret.GetEntries(), ratelimitV1.RateLimitDescriptors_Entry_builder{ - Key: descriptorKeyConnectorResourceType, - Value: resourceType, - }.Build())) + if r := req.GetResource(); r != nil { + if resourceType := r.GetId().GetResourceType(); resourceType != "" { + ret.SetEntries(append(ret.GetEntries(), ratelimitV1.RateLimitDescriptors_Entry_builder{ + Key: descriptorKeyConnectorResourceType, + Value: resourceType, + }.Build())) + } } return ret } diff --git a/pkg/sync/expand/graph.go b/pkg/sync/expand/graph.go index 4dce1bafd..6ff1051f3 100644 --- a/pkg/sync/expand/graph.go +++ b/pkg/sync/expand/graph.go @@ -202,6 +202,29 @@ func (g *EntitlementGraph) AddEntitlement(entitlement *v2.Entitlement) { g.EntitlementsToNodes[entitlement.GetId()] = node.Id } +// AddEntitlementID adds an entitlement ID as an unconnected node in the graph. +// This is a convenience for callers that already have entitlement IDs and do not +// want to fetch/unmarshal full entitlement protos. +func (g *EntitlementGraph) AddEntitlementID(entitlementID string) { + // If the entitlement is already in the graph, fail silently. + found := g.GetNode(entitlementID) + if found != nil { + return + } + g.HasNoCycles = false // Reset this since we're changing the graph. + + // Start at 1 in case we don't initialize something and try to get node 0. + g.NextNodeID++ + + node := Node{ + Id: g.NextNodeID, + EntitlementIDs: []string{entitlementID}, + } + + g.Nodes[node.Id] = node + g.EntitlementsToNodes[entitlementID] = node.Id +} + // GetEntitlements returns a combined list of _all_ entitlements from all nodes. func (g *EntitlementGraph) GetEntitlements() []string { var entitlements []string diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 532cc8e47..e6ab1ed1e 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -704,6 +704,20 @@ func (s *syncer) Sync(ctx context.Context) error { continue case SyncGrantExpansionOp: + // Mark the sync as supporting diff, but only if we're starting fresh. + // If we're resuming (graph has edges or a page token), we may be continuing + // from old code that didn't have this marker, so we must not set it. + entitlementGraph := s.state.EntitlementGraph(ctx) + isResumingExpansion := entitlementGraph.Loaded || len(entitlementGraph.Edges) > 0 || s.state.PageToken(ctx) != "" + if !isResumingExpansion { + if expansionStore, ok := s.store.(connectorstore.ExpansionStore); ok { + if err := expansionStore.SetSupportsDiff(ctx, s.syncID); err != nil { + l.Error("failed to set supports_diff marker", zap.Error(err)) + return err + } + } + } + if s.dontExpandGrants || !s.state.NeedsExpansion() { l.Debug("skipping grant expansion, no grants to expand") s.state.FinishAction(ctx) @@ -1725,131 +1739,87 @@ func (s *syncer) loadEntitlementGraph(ctx context.Context, graph *expand.Entitle s.handleInitialActionForStep(ctx, *s.state.Current()) } - resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{PageToken: pageToken}.Build()) + // List only expandable grants that need expansion using SQL-layer filtering. + resp, err := s.store.ListGrants(ctx, v2.GrantsServiceListGrantsRequest_builder{ + PageToken: pageToken, + ExpandableOnly: true, + NeedsExpansionOnly: true, + }.Build()) if err != nil { return err } - // Handle pagination - if resp.GetNextPageToken() != "" { - err = s.state.NextPage(ctx, resp.GetNextPageToken()) - if err != nil { - return err - } - } else { - l.Debug("Finished loading grants to expand") - graph.Loaded = true - } - - // Process grants and add edges to the graph - updatedGrants := make([]*v2.Grant, 0) for _, grant := range resp.GetList() { - err := s.processGrantForGraph(ctx, grant, graph) - if err != nil { - return err - } - - // Remove expandable annotation from descendant grant now that we've added it to the graph. - // That way if this sync is part of a compaction, expanding grants at the end of compaction won't redo work. - newAnnos := make(annotations.Annotations, 0) - updated := false - for _, anno := range grant.GetAnnotations() { - if anno.MessageIs(&v2.GrantExpandable{}) { - updated = true - } else { - newAnnos = append(newAnnos, anno) - } - } - if !updated { + // Extract GrantExpandable annotation from the grant. + annos := annotations.Annotations(grant.GetAnnotations()) + expandable := &v2.GrantExpandable{} + ok, err := annos.Pick(expandable) + if err != nil || !ok { + // This shouldn't happen since we filtered by is_expandable=1, + // but skip gracefully if the annotation is missing. continue } - grant.SetAnnotations(newAnnos) - l.Debug("removed expandable annotation from grant", zap.String("grant_id", grant.GetId())) - updatedGrants = append(updatedGrants, grant) - updatedGrants, err = expand.PutGrantsInChunks(ctx, s.store, updatedGrants, 10000) - if err != nil { - return err - } - } - - _, err = expand.PutGrantsInChunks(ctx, s.store, updatedGrants, 0) - if err != nil { - return err - } - - if graph.Loaded { - l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) - } - return nil -} - -// processGrantForGraph examines a grant for expandable annotations and adds edges to the graph. -func (s *syncer) processGrantForGraph(ctx context.Context, grant *v2.Grant, graph *expand.EntitlementGraph) error { - l := ctxzap.Extract(ctx) + principalID := grant.GetPrincipal().GetId() + dstEntitlementID := grant.GetEntitlement().GetId() - annos := annotations.Annotations(grant.GetAnnotations()) - expandable := &v2.GrantExpandable{} - _, err := annos.Pick(expandable) - if err != nil { - return err - } - if len(expandable.GetEntitlementIds()) == 0 { - return nil - } - - principalID := grant.GetPrincipal().GetId() - if principalID == nil { - return fmt.Errorf("principal id was nil") - } - - for _, srcEntitlementID := range expandable.GetEntitlementIds() { - l.Debug( - "Expandable entitlement found", - zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), - ) + for _, srcEntitlementID := range expandable.GetEntitlementIds() { + // Validate that the source entitlement's resource matches the grant's principal. + srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ + EntitlementId: srcEntitlementID, + }.Build()) + if err != nil { + // Only skip not-found entitlements; propagate other errors + // to avoid silently dropping edges and yielding incorrect expansions. + if errors.Is(err, sql.ErrNoRows) { + l.Debug("source entitlement not found, skipping edge", + zap.String("src_entitlement_id", srcEntitlementID), + zap.String("dst_entitlement_id", dstEntitlementID), + ) + continue + } + l.Error("error fetching source entitlement", + zap.String("src_entitlement_id", srcEntitlementID), + zap.String("dst_entitlement_id", dstEntitlementID), + zap.Error(err), + ) + return err + } - srcEntitlement, err := s.store.GetEntitlement(ctx, reader_v2.EntitlementsReaderServiceGetEntitlementRequest_builder{ - EntitlementId: srcEntitlementID, - }.Build()) - if err != nil { - l.Error("error fetching source entitlement", - zap.String("src_entitlement_id", srcEntitlementID), - zap.String("dst_entitlement_id", grant.GetEntitlement().GetId()), - zap.Error(err), - ) - continue - } + sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() + if sourceEntitlementResourceID == nil { + return fmt.Errorf("source entitlement resource id was nil") + } + if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || + principalID.GetResource() != sourceEntitlementResourceID.GetResource() { + l.Error( + "source entitlement resource id did not match grant principal id", + zap.String("grant_principal_id", principalID.String()), + zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String())) - // The expand annotation points at entitlements by id. Those entitlements' resource should match - // the current grant's principal, so we don't allow expanding arbitrary entitlements. - sourceEntitlementResourceID := srcEntitlement.GetEntitlement().GetResource().GetId() - if sourceEntitlementResourceID == nil { - return fmt.Errorf("source entitlement resource id was nil") - } - if principalID.GetResourceType() != sourceEntitlementResourceID.GetResourceType() || - principalID.GetResource() != sourceEntitlementResourceID.GetResource() { - l.Error( - "source entitlement resource id did not match grant principal id", - zap.String("grant_principal_id", principalID.String()), - zap.String("source_entitlement_resource_id", sourceEntitlementResourceID.String())) + return fmt.Errorf("source entitlement resource id did not match grant principal id") + } - return fmt.Errorf("source entitlement resource id did not match grant principal id") + graph.AddEntitlementID(dstEntitlementID) + graph.AddEntitlementID(srcEntitlementID) + err = graph.AddEdge(ctx, srcEntitlementID, dstEntitlementID, expandable.GetShallow(), expandable.GetResourceTypeIds()) + if err != nil { + return fmt.Errorf("error adding edge to graph: %w", err) + } } + } - graph.AddEntitlement(grant.GetEntitlement()) - graph.AddEntitlement(srcEntitlement.GetEntitlement()) - err = graph.AddEdge(ctx, - srcEntitlement.GetEntitlement().GetId(), - grant.GetEntitlement().GetId(), - expandable.GetShallow(), - expandable.GetResourceTypeIds(), - ) - if err != nil { - return fmt.Errorf("error adding edge to graph: %w", err) + // Handle pagination + nextPageToken := resp.GetNextPageToken() + if nextPageToken != "" { + if err := s.state.NextPage(ctx, nextPageToken); err != nil { + return err } + } else { + graph.Loaded = true + l.Info("Finished loading entitlement graph", zap.Int("edges", len(graph.Edges))) } + return nil } diff --git a/pkg/sync/syncer_test.go b/pkg/sync/syncer_test.go index 8ea1b85f8..2324c3a25 100644 --- a/pkg/sync/syncer_test.go +++ b/pkg/sync/syncer_test.go @@ -108,13 +108,8 @@ func TestExpandGrants(t *testing.T) { } } require.Len(t, allGrants, expectedGrantCount, "should have %d grants but got %d", expectedGrantCount, len(allGrants)) - for _, grant := range allGrants { - annos := annotations.Annotations(grant.GetAnnotations()) - expandable := &v2.GrantExpandable{} - ok, err := annos.Pick(expandable) - require.NoError(t, err) - require.False(t, ok, "grants are expanded, but grant %s has expandable annotation with entitlement ids %v", grant.GetId(), expandable.GetEntitlementIds()) - } + // Note: We no longer strip GrantExpandable from stored grants during expansion. + // Expansion bookkeeping lives outside the grant proto so diffs can safely compare data bytes. } func TestInvalidResourceTypeFilter(t *testing.T) { diff --git a/proto/c1/connector/v2/grant.proto b/proto/c1/connector/v2/grant.proto index 4cb3a47f4..9af90c153 100644 --- a/proto/c1/connector/v2/grant.proto +++ b/proto/c1/connector/v2/grant.proto @@ -30,7 +30,7 @@ message Grant { } message GrantsServiceListGrantsRequest { - c1.connector.v2.Resource resource = 1 [(validate.rules).message = {required: true}]; + c1.connector.v2.Resource resource = 1; uint32 page_size = 2 [(validate.rules).uint32 = { ignore_empty: true lte: 250 @@ -46,6 +46,10 @@ message GrantsServiceListGrantsRequest { min_bytes: 1 max_bytes: 1024 }]; + // If true, only return grants that are expandable (have GrantExpandable annotation). + bool expandable_only = 6; + // If true, only return grants that need expansion processing. + bool needs_expansion_only = 7; } message GrantsServiceListGrantsResponse {