diff --git a/pkg/connector/cache.go b/pkg/connector/cache.go index ac45d9c9..b0a4c557 100644 --- a/pkg/connector/cache.go +++ b/pkg/connector/cache.go @@ -3,6 +3,7 @@ package connector import ( "context" "fmt" + "sort" "strings" "time" @@ -317,3 +318,88 @@ func buildEntitlementCache(ctx context.Context, entitlements []EntitlementData, l.Info("Built entitlement cache", zap.Int("count", len(cache))) return cache, nil } + +// buildChildTypesIndex creates an index mapping parent resource IDs to their child resource types. +// This eliminates the need to scan all resources when checking for child types in the List method. +func buildChildTypesIndex(ctx context.Context, resourceData []ResourceData, resourceCache map[string]*v2.Resource) map[string]map[string]struct{} { + l := ctxzap.Extract(ctx) + index := make(map[string]map[string]struct{}) + + for _, rData := range resourceData { + if rData.ParentResource == "" { + continue + } + + // Check if parent exists in the cache + if _, exists := resourceCache[rData.ParentResource]; !exists { + continue + } + + childTypeId := strings.ToLower(rData.ResourceType) + + // Initialize the set for this parent if it doesn't exist + if index[rData.ParentResource] == nil { + index[rData.ParentResource] = make(map[string]struct{}) + } + + // Add the child type to the parent's set + index[rData.ParentResource][childTypeId] = struct{}{} + } + + l.Debug("Built child types index", zap.Int("parents_with_children", len(index))) + return index +} + +// buildSortedResourcesByType creates pre-sorted lists of resources grouped by type. +// This eliminates the need to filter and sort on every pagination request. +func buildSortedResourcesByType(ctx context.Context, resourceCache map[string]*v2.Resource) map[string][]*v2.Resource { + l := ctxzap.Extract(ctx) + index := make(map[string][]*v2.Resource) + + // Group resources by type + for _, res := range resourceCache { + typeId := res.Id.ResourceType + index[typeId] = append(index[typeId], res) + } + + // Sort each type's list + totalResources := 0 + for _, resources := range index { + sort.SliceStable(resources, func(i, j int) bool { + return resources[i].Id.Resource < resources[j].Id.Resource + }) + totalResources += len(resources) + } + + l.Debug("Built sorted resources by type index", + zap.Int("resource_types", len(index)), + zap.Int("total_resources", totalResources)) + return index +} + +// buildSortedEntitlementsByResource creates pre-sorted lists of entitlements grouped by resource. +// This eliminates the need to filter and sort on every pagination request. +func buildSortedEntitlementsByResource(ctx context.Context, entitlementCache map[string]*v2.Entitlement) map[string][]*v2.Entitlement { + l := ctxzap.Extract(ctx) + index := make(map[string][]*v2.Entitlement) + + // Group entitlements by resource + for _, ent := range entitlementCache { + resourceId := ent.Resource.Id.Resource + index[resourceId] = append(index[resourceId], ent) + } + + // Sort each resource's entitlement list + totalEntitlements := 0 + for _, entitlements := range index { + sort.SliceStable(entitlements, func(i, j int) bool { + return entitlements[i].Slug < entitlements[j].Slug + }) + totalEntitlements += len(entitlements) + } + + l.Debug("Built sorted entitlements by resource index", + zap.Int("resources_with_entitlements", len(index)), + zap.Int("total_entitlements", totalEntitlements)) + return index +} diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 64be45df..9e153649 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -34,28 +34,100 @@ func (fc *FileConnector) Validate(ctx context.Context) (annotations.Annotations, return nil, nil } +// getCachedData returns cached file data and built caches, loading them if not already cached. +// This method is thread-safe and ensures the file is only loaded and processed once per sync. +// The SDK's session cache mechanism ensures fresh data between syncs automatically. +func (fc *FileConnector) getCachedData(ctx context.Context) (*LoadedData, map[string]*v2.ResourceType, map[string]*v2.Resource, map[string]*v2.Entitlement, map[string]map[string]struct{}, error) { + l := ctxzap.Extract(ctx) + + // Fast path: check if cache is already populated using read lock + fc.cacheMutex.RLock() + if fc.cachedData != nil { + loadedData := fc.cachedData + resourceTypes := fc.cachedResourceTypes + resources := fc.cachedResources + entitlements := fc.cachedEntitlements + childTypes := fc.cachedChildTypes + fc.cacheMutex.RUnlock() + return loadedData, resourceTypes, resources, entitlements, childTypes, nil + } + fc.cacheMutex.RUnlock() + + // Slow path: need to load and build caches, acquire write lock + fc.cacheMutex.Lock() + defer fc.cacheMutex.Unlock() + + // Double-check after acquiring write lock (another goroutine may have loaded it) + if fc.cachedData != nil { + return fc.cachedData, fc.cachedResourceTypes, fc.cachedResources, fc.cachedEntitlements, fc.cachedChildTypes, nil + } + + l.Info("Loading and caching file data", zap.String("input_file_path", fc.inputFilePath)) + + // Load file data + loadedData, err := LoadFileData(fc.inputFilePath) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("failed to load data file: %w", err) + } + + // Build all caches + resourceTypesCache, err := buildResourceTypeCache(ctx, loadedData.Resources, loadedData.Users) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("failed to build resource type cache: %w", err) + } + + resourceCache, err := buildResourceCache(ctx, loadedData.Users, loadedData.Resources, resourceTypesCache) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("failed to build resource cache: %w", err) + } + + entitlementCache, err := buildEntitlementCache(ctx, loadedData.Entitlements, resourceCache) + if err != nil { + return nil, nil, nil, nil, nil, fmt.Errorf("failed to build entitlement cache: %w", err) + } + + // Build child types index to optimize child type lookups + childTypesIndex := buildChildTypesIndex(ctx, loadedData.Resources, resourceCache) + + // Build sorted indexes to eliminate sorting on every pagination request + sortedResourcesByType := buildSortedResourcesByType(ctx, resourceCache) + sortedEntitlementsByResource := buildSortedEntitlementsByResource(ctx, entitlementCache) + + // Store in cache + fc.cachedData = loadedData + fc.cachedResourceTypes = resourceTypesCache + fc.cachedResources = resourceCache + fc.cachedEntitlements = entitlementCache + fc.cachedChildTypes = childTypesIndex + fc.cachedSortedResourcesByType = sortedResourcesByType + fc.cachedSortedEntitlementsByRes = sortedEntitlementsByResource + + l.Info("Successfully cached file data", + zap.Int("users", len(loadedData.Users)), + zap.Int("resources", len(loadedData.Resources)), + zap.Int("entitlements", len(loadedData.Entitlements)), + zap.Int("grants", len(loadedData.Grants))) + + return loadedData, resourceTypesCache, resourceCache, entitlementCache, childTypesIndex, nil +} + // ResourceSyncers returns a list of syncers for the connector. // function is required by the connectorbuilder.Connector interface. // It determines resource types from the input file and creates a syncer instance for each type, enabling the SDK to sync them. -// implementation loads minimal data to find resource types, builds the type cache, and creates simple syncers passing only the file path for per-sync loading. +// implementation loads minimal data to find resource types, builds the type cache, and creates simple syncers passing the connector reference. func (fc *FileConnector) ResourceSyncers(ctx context.Context) []connectorbuilder.ResourceSyncer { l := ctxzap.Extract(ctx) l.Info("ResourceSyncers method called", zap.String("input_file_path", fc.inputFilePath)) - loadedData, err := LoadFileData(fc.inputFilePath) - if err != nil { - l.Error("Failed to load input data file to determine resource types", zap.Error(err)) - return nil - } - resourceTypesCache, err := buildResourceTypeCache(ctx, loadedData.Resources, loadedData.Users) + _, resourceTypesCache, _, _, _, err := fc.getCachedData(ctx) if err != nil { - l.Error("Failed to build resource type cache", zap.Error(err)) + l.Error("Failed to load and cache data", zap.Error(err)) return nil } rv := make([]connectorbuilder.ResourceSyncer, 0, len(resourceTypesCache)) for _, rt := range resourceTypesCache { - rv = append(rv, newFileSyncer(rt, fc.inputFilePath)) + rv = append(rv, newFileSyncer(rt, fc)) } l.Info("Created resource syncers", zap.Int("count", len(rv))) diff --git a/pkg/connector/models.go b/pkg/connector/models.go index 605892a1..8492eb50 100644 --- a/pkg/connector/models.go +++ b/pkg/connector/models.go @@ -5,15 +5,29 @@ package connector import ( "context" "fmt" + "sync" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" ) // FileConnector struct is the main implementation of the Baton connector for file processing. // It is required by the connectorbuilder.Connector interface for defining connector behavior. -// It holds the path to the input data file. -// structure provides the context (file path) needed for loading data during sync operations. +// It holds the path to the input data file and caches loaded data to avoid redundant file reads. +// The file is treated as static during a sync operation - cache is built once and reused. // Instances are created by NewFileConnector. type FileConnector struct { inputFilePath string + + // Cache fields to avoid redundant file loading and cache rebuilding + cacheMutex sync.RWMutex + cachedData *LoadedData + cachedResourceTypes map[string]*v2.ResourceType + cachedResources map[string]*v2.Resource + cachedEntitlements map[string]*v2.Entitlement + cachedChildTypes map[string]map[string]struct{} // parent resource ID -> set of child resource type IDs + // Pre-sorted indexes to eliminate sorting on every page request + cachedSortedResourcesByType map[string][]*v2.Resource // resource type ID -> sorted list of resources + cachedSortedEntitlementsByRes map[string][]*v2.Entitlement // resource ID -> sorted list of entitlements } // LoadedData holds all the data parsed from the input file. diff --git a/pkg/connector/syncers.go b/pkg/connector/syncers.go index 03a57a46..6fb192ae 100644 --- a/pkg/connector/syncers.go +++ b/pkg/connector/syncers.go @@ -5,7 +5,6 @@ import ( "fmt" "sort" "strconv" - "strings" v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/annotations" @@ -15,19 +14,26 @@ import ( "go.uber.org/zap" ) +const ( + // defaultPageSize is the number of items returned per pagination request. + // Increased from 50 to 200 for better performance since all data is in-memory + // and already pre-sorted/filtered during cache building. + defaultPageSize = 200 +) + // fileSyncer implements the ResourceSyncer interface for a specific resource type. -// It holds a reference to the resource type it handles and the path to the data file. -// Data loading and caching is performed within each interface method call (List, Entitlements, Grants). +// It holds a reference to the resource type it handles and the parent FileConnector. +// Data loading and caching is performed once by the connector and reused across all syncer calls. type fileSyncer struct { - resourceType *v2.ResourceType - inputFilePath string + resourceType *v2.ResourceType + connector *FileConnector } // newFileSyncer creates a new fileSyncer instance. -func newFileSyncer(rt *v2.ResourceType, filePath string) *fileSyncer { +func newFileSyncer(rt *v2.ResourceType, connector *FileConnector) *fileSyncer { return &fileSyncer{ - resourceType: rt, - inputFilePath: filePath, + resourceType: rt, + connector: connector, } } @@ -42,44 +48,42 @@ func (fs *fileSyncer) ResourceType(ctx context.Context) *v2.ResourceType { // List method retrieves a paginated list of resources for the syncer's type. // It implements the List method, required by the connectorbuilder.ResourceSyncer interface. -// It loads data, builds resource/type caches, filters for the relevant type, and returns paginated results. +// It uses pre-sorted cached data from the connector and returns paginated results. func (fs *fileSyncer) List(ctx context.Context, parentResourceID *v2.ResourceId, pToken *pagination.Token) ([]*v2.Resource, string, annotations.Annotations, error) { - loadedData, err := LoadFileData(fs.inputFilePath) - if err != nil { - return nil, "", nil, fmt.Errorf("List: failed to load data file: %w", err) - } - - resourceTypesCache, err := buildResourceTypeCache(ctx, loadedData.Resources, loadedData.Users) + _, _, _, _, childTypesIndex, err := fs.connector.getCachedData(ctx) if err != nil { - return nil, "", nil, fmt.Errorf("List: failed to build resource type cache: %w", err) - } - resourceCache, err := buildResourceCache(ctx, loadedData.Users, loadedData.Resources, resourceTypesCache) - if err != nil { - return nil, "", nil, fmt.Errorf("List: failed to build resource cache: %w", err) - } - - matchingResources := make([]*v2.Resource, 0) - for _, res := range resourceCache { - if res.Id.ResourceType != fs.resourceType.Id { - continue - } - if parentResourceID != nil { - if res.ParentResourceId == nil || res.ParentResourceId.ResourceType != parentResourceID.ResourceType || res.ParentResourceId.Resource != parentResourceID.Resource { - continue + return nil, "", nil, fmt.Errorf("List: %w", err) + } + + // Get pre-sorted resources for this type (already filtered and sorted during cache building) + fs.connector.cacheMutex.RLock() + sortedResources := fs.connector.cachedSortedResourcesByType[fs.resourceType.Id] + fs.connector.cacheMutex.RUnlock() + + // Filter by parent if needed + var matchingResources []*v2.Resource + if parentResourceID != nil { + // Need to filter by parent + matchingResources = make([]*v2.Resource, 0) + for _, res := range sortedResources { + if res.ParentResourceId != nil && + res.ParentResourceId.ResourceType == parentResourceID.ResourceType && + res.ParentResourceId.Resource == parentResourceID.Resource { + matchingResources = append(matchingResources, res) } - } else { - if res.ParentResourceId != nil { - continue + } + } else { + // Only top-level resources (no parent) + matchingResources = make([]*v2.Resource, 0) + for _, res := range sortedResources { + if res.ParentResourceId == nil { + matchingResources = append(matchingResources, res) } } - matchingResources = append(matchingResources, res) } + // No sorting needed - already sorted in the cached index - sort.SliceStable(matchingResources, func(i, j int) bool { - return matchingResources[i].Id.Resource < matchingResources[j].Id.Resource - }) - - pageSize := 50 + pageSize := defaultPageSize bag := &pagination.Bag{} err = bag.Unmarshal(pToken.Token) if err != nil { @@ -106,16 +110,9 @@ func (fs *fileSyncer) List(ctx context.Context, parentResourceID *v2.ResourceId, rv := matchingResources[start:end] + // Use prebuilt child types index for O(1) lookups instead of scanning all resources for _, resource := range rv { - childTypes := make(map[string]struct{}) - for _, possibleChild := range loadedData.Resources { - if possibleChild.ParentResource == resource.Id.Resource { - childTypeId := strings.ToLower(possibleChild.ResourceType) - childTypes[childTypeId] = struct{}{} - } - } - - if len(childTypes) > 0 { + if childTypes, exists := childTypesIndex[resource.Id.Resource]; exists && len(childTypes) > 0 { annos := annotations.Annotations(resource.Annotations) for childTypeId := range childTypes { childAnno := &v2.ChildResourceType{ResourceTypeId: childTypeId} @@ -138,38 +135,24 @@ func (fs *fileSyncer) List(ctx context.Context, parentResourceID *v2.ResourceId, // Entitlements method retrieves a paginated list of entitlements for the syncer's type. // It implements the Entitlements method, required by the connectorbuilder.ResourceSyncer interface. -// It loads data, builds resource/entitlement caches, filters for the relevant resource, and returns paginated results. +// It uses pre-sorted cached data from the connector and returns paginated results. func (fs *fileSyncer) Entitlements(ctx context.Context, resource *v2.Resource, pToken *pagination.Token) ([]*v2.Entitlement, string, annotations.Annotations, error) { - loadedData, err := LoadFileData(fs.inputFilePath) + _, _, _, _, _, err := fs.connector.getCachedData(ctx) if err != nil { - return nil, "", nil, fmt.Errorf("Entitlements: failed to load data file: %w", err) + return nil, "", nil, fmt.Errorf("Entitlements: %w", err) } - resourceTypesCache, err := buildResourceTypeCache(ctx, loadedData.Resources, loadedData.Users) - if err != nil { - return nil, "", nil, fmt.Errorf("Entitlements: failed to build resource type cache: %w", err) - } - resourceCache, err := buildResourceCache(ctx, loadedData.Users, loadedData.Resources, resourceTypesCache) - if err != nil { - return nil, "", nil, fmt.Errorf("Entitlements: failed to build resource cache: %w", err) - } - entitlementCache, err := buildEntitlementCache(ctx, loadedData.Entitlements, resourceCache) - if err != nil { - return nil, "", nil, fmt.Errorf("Entitlements: failed to build entitlement cache: %w", err) - } + // Get pre-sorted entitlements for this resource (already filtered and sorted during cache building) + fs.connector.cacheMutex.RLock() + matchingEntitlements := fs.connector.cachedSortedEntitlementsByRes[resource.Id.Resource] + fs.connector.cacheMutex.RUnlock() - matchingEntitlements := make([]*v2.Entitlement, 0) - for _, ent := range entitlementCache { - if ent.Resource.Id.ResourceType == resource.Id.ResourceType && ent.Resource.Id.Resource == resource.Id.Resource { - matchingEntitlements = append(matchingEntitlements, ent) - } + // matchingEntitlements is already sorted, no need to sort again + if matchingEntitlements == nil { + matchingEntitlements = []*v2.Entitlement{} // Return empty list if no entitlements for this resource } - sort.SliceStable(matchingEntitlements, func(i, j int) bool { - return matchingEntitlements[i].Slug < matchingEntitlements[j].Slug - }) - - pageSize := 50 + pageSize := defaultPageSize bag := &pagination.Bag{} err = bag.Unmarshal(pToken.Token) if err != nil { @@ -209,26 +192,13 @@ func (fs *fileSyncer) Entitlements(ctx context.Context, resource *v2.Resource, p // Grants method retrieves a paginated list of grants for the syncer's type. // It implements the Grants method, required by the connectorbuilder.ResourceSyncer interface. -// It loads data, builds all caches, filters grants based on the resource context, and returns paginated results. +// It uses cached data from the connector, filters grants based on the resource context, and returns paginated results. func (fs *fileSyncer) Grants(ctx context.Context, resource *v2.Resource, pToken *pagination.Token) ([]*v2.Grant, string, annotations.Annotations, error) { l := ctxzap.Extract(ctx) - loadedData, err := LoadFileData(fs.inputFilePath) - if err != nil { - return nil, "", nil, fmt.Errorf("Grants: failed to load data file: %w", err) - } - - resourceTypesCache, err := buildResourceTypeCache(ctx, loadedData.Resources, loadedData.Users) - if err != nil { - return nil, "", nil, fmt.Errorf("Grants: failed to build resource type cache: %w", err) - } - resourceCache, err := buildResourceCache(ctx, loadedData.Users, loadedData.Resources, resourceTypesCache) - if err != nil { - return nil, "", nil, fmt.Errorf("Grants: failed to build resource cache: %w", err) - } - entitlementCache, err := buildEntitlementCache(ctx, loadedData.Entitlements, resourceCache) + loadedData, resourceTypesCache, resourceCache, entitlementCache, _, err := fs.connector.getCachedData(ctx) if err != nil { - return nil, "", nil, fmt.Errorf("Grants: failed to build entitlement cache: %w", err) + return nil, "", nil, fmt.Errorf("Grants: %w", err) } matchingGrants := make([]*v2.Grant, 0) @@ -292,7 +262,7 @@ func (fs *fileSyncer) Grants(ctx context.Context, resource *v2.Resource, pToken return matchingGrants[i].Entitlement.Id < matchingGrants[j].Entitlement.Id }) - pageSize := 50 + pageSize := defaultPageSize bag := &pagination.Bag{} err = bag.Unmarshal(pToken.Token) if err != nil {