diff --git a/pkg/dotc1z/c1file.go b/pkg/dotc1z/c1file.go index 87fe02700..ee6c148fb 100644 --- a/pkg/dotc1z/c1file.go +++ b/pkg/dotc1z/c1file.go @@ -3,7 +3,9 @@ package dotc1z import ( "context" "database/sql" + "errors" "fmt" + "math" "os" "path/filepath" "sync" @@ -21,6 +23,7 @@ import ( v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" "github.com/conductorone/baton-sdk/pkg/connectorstore" + "github.com/conductorone/baton-sdk/pkg/progress" ) type pragma struct { @@ -209,6 +212,102 @@ func (c *C1File) init(ctx context.Context) error { return nil } +func (c *C1File) ProgressCounts(ctx context.Context, syncID string) (*progress.ProgressCounts, error) { + ctx, span := tracer.Start(ctx, "C1File.ProgressCounts") + defer span.End() + + // Check if sync ID exists + _, err := c.getSync(ctx, syncID) + if err != nil { + return nil, err + } + + counts := progress.NewProgressCounts() + + var rtStats []*v2.ResourceType + pageToken := "" + for { + resp, err := c.ListResourceTypes(ctx, &v2.ResourceTypesServiceListResourceTypesRequest{PageToken: pageToken}) + if err != nil { + return nil, err + } + + rtStats = append(rtStats, resp.List...) + + if resp.NextPageToken == "" { + break + } + + pageToken = resp.NextPageToken + } + counts.ResourceTypes = len(rtStats) + for _, rt := range rtStats { + resourceCount, err := c.db.From(resources.Name()). + Where(goqu.C("resource_type_id").Eq(rt.Id)). + Where(goqu.C("sync_id").Eq(syncID)). + CountContext(ctx) + if err != nil { + return nil, err + } + if resourceCount > math.MaxInt { + return nil, fmt.Errorf("resource count for %s is too large", rt.Id) + } + counts.Resources[rt.Id] = int(resourceCount) + + query, args, err := c.db.From(entitlements.Name()). + Select(goqu.COUNT(goqu.DISTINCT(goqu.C("resource_id")))). + Where(goqu.C("resource_type_id").Eq(rt.Id)). + Where(goqu.C("sync_id").Eq(syncID)). + ToSQL() + if err != nil { + return nil, err + } + + row := c.db.QueryRowContext(ctx, query, args...) + if row == nil { + continue + } + var entitlementsCount int64 + err = row.Scan(&entitlementsCount) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + continue + } + return nil, err + } + counts.EntitlementsProgress[rt.Id] = int(entitlementsCount) + + query, args, err = c.db.From(grants.Name()). + Select(goqu.COUNT(goqu.DISTINCT(goqu.C("resource_id")))). + Where(goqu.C("resource_type_id").Eq(rt.Id)). + Where(goqu.C("sync_id").Eq(syncID)). + ToSQL() + if err != nil { + return nil, err + } + + row = c.db.QueryRowContext(ctx, query, args...) + if row == nil { + continue + } + var grantsCount int64 + err = row.Scan(&grantsCount) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + continue + } + return nil, err + } + + if grantsCount > math.MaxInt { + return nil, fmt.Errorf("grants count for %s is too large", rt.Id) + } + counts.GrantsProgress[rt.Id] = int(grantsCount) + } + + return counts, nil +} + // Stats introspects the database and returns the count of objects for the given sync run. func (c *C1File) Stats(ctx context.Context) (map[string]int64, error) { ctx, span := tracer.Start(ctx, "C1File.Stats") diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go new file mode 100644 index 000000000..a03c46ede --- /dev/null +++ b/pkg/progress/progress.go @@ -0,0 +1,144 @@ +package progress + +import ( + "context" + "time" + + "github.com/conductorone/baton-sdk/pkg/sync/expand" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "go.uber.org/zap" +) + +type ProgressCounts struct { + ResourceTypes int // count of resource types + Resources map[string]int // map of resource type id to resource count + EntitlementsProgress map[string]int // map of resource type id to entitlement count + LastEntitlementLog map[string]time.Time + GrantsProgress map[string]int // map of resource type id to grant count + LastGrantLog map[string]time.Time + LastActionLog time.Time +} + +const maxLogFrequency = 10 * time.Second + +// TODO: use a mutex or a syncmap for when syncer code becomes parallel +func NewProgressCounts() *ProgressCounts { + return &ProgressCounts{ + Resources: make(map[string]int), + EntitlementsProgress: make(map[string]int), + LastEntitlementLog: make(map[string]time.Time), + GrantsProgress: make(map[string]int), + LastGrantLog: make(map[string]time.Time), + LastActionLog: time.Time{}, + } +} + +func (p *ProgressCounts) LogResourceTypesProgress(ctx context.Context) { + l := ctxzap.Extract(ctx) + l.Info("Synced resource types", zap.Int("count", p.ResourceTypes)) +} + +func (p *ProgressCounts) LogResourcesProgress(ctx context.Context, resourceType string) { + l := ctxzap.Extract(ctx) + resources := p.Resources[resourceType] + l.Info("Synced resources", zap.String("resource_type_id", resourceType), zap.Int("count", resources)) +} + +func (p *ProgressCounts) LogEntitlementsProgress(ctx context.Context, resourceType string) { + entitlementsProgress := p.EntitlementsProgress[resourceType] + resources := p.Resources[resourceType] + + l := ctxzap.Extract(ctx) + if resources == 0 { + // if resuming sync, resource counts will be zero, so don't calculate percentage. just log every 10 seconds. + if time.Since(p.LastEntitlementLog[resourceType]) > maxLogFrequency { + l.Info("Syncing entitlements", + zap.String("resource_type_id", resourceType), + zap.Int("synced", entitlementsProgress), + ) + p.LastEntitlementLog[resourceType] = time.Now() + } + return + } + + percentComplete := (entitlementsProgress * 100) / resources + + switch { + case entitlementsProgress > resources: + l.Error("more entitlement resources than resources", + zap.String("resource_type_id", resourceType), + zap.Int("synced", entitlementsProgress), + zap.Int("total", resources), + ) + case percentComplete == 100: + l.Info("Synced entitlements", + zap.String("resource_type_id", resourceType), + zap.Int("count", entitlementsProgress), + zap.Int("total", resources), + ) + p.LastEntitlementLog[resourceType] = time.Time{} + case time.Since(p.LastEntitlementLog[resourceType]) > maxLogFrequency: + l.Info("Syncing entitlements", + zap.String("resource_type_id", resourceType), + zap.Int("synced", entitlementsProgress), + zap.Int("total", resources), + zap.Int("percent_complete", percentComplete), + ) + p.LastEntitlementLog[resourceType] = time.Now() + } +} + +func (p *ProgressCounts) LogGrantsProgress(ctx context.Context, resourceType string) { + grantsProgress := p.GrantsProgress[resourceType] + resources := p.Resources[resourceType] + + l := ctxzap.Extract(ctx) + if resources == 0 { + // if resuming sync, resource counts will be zero, so don't calculate percentage. just log every 10 seconds. + if time.Since(p.LastGrantLog[resourceType]) > maxLogFrequency { + l.Info("Syncing grants", + zap.String("resource_type_id", resourceType), + zap.Int("synced", grantsProgress), + ) + p.LastGrantLog[resourceType] = time.Now() + } + return + } + + percentComplete := (grantsProgress * 100) / resources + + switch { + case grantsProgress > resources: + l.Error("more grant resources than resources", + zap.String("resource_type_id", resourceType), + zap.Int("synced", grantsProgress), + zap.Int("total", resources), + ) + case percentComplete == 100: + l.Info("Synced grants", + zap.String("resource_type_id", resourceType), + zap.Int("count", grantsProgress), + zap.Int("total", resources), + ) + p.LastGrantLog[resourceType] = time.Time{} + case time.Since(p.LastGrantLog[resourceType]) > maxLogFrequency: + l.Info("Syncing grants", + zap.String("resource_type_id", resourceType), + zap.Int("synced", grantsProgress), + zap.Int("total", resources), + zap.Int("percent_complete", percentComplete), + ) + p.LastGrantLog[resourceType] = time.Now() + } +} + +func (p *ProgressCounts) LogExpandProgress(ctx context.Context, actions []*expand.EntitlementGraphAction) { + actionsLen := len(actions) + if time.Since(p.LastActionLog) < maxLogFrequency { + return + } + p.LastActionLog = time.Now() + + l := ctxzap.Extract(ctx) + l.Info("Expanding grants", zap.Int("actions_remaining", actionsLen)) +} diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 58bef7e53..c2a58d225 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -33,6 +33,7 @@ import ( "github.com/conductorone/baton-sdk/pkg/annotations" "github.com/conductorone/baton-sdk/pkg/connectorstore" "github.com/conductorone/baton-sdk/pkg/dotc1z/manager" + "github.com/conductorone/baton-sdk/pkg/progress" "github.com/conductorone/baton-sdk/pkg/types" ) @@ -50,140 +51,6 @@ type Syncer interface { Close(context.Context) error } -type ProgressCounts struct { - ResourceTypes int - Resources map[string]int - EntitlementsProgress map[string]int - LastEntitlementLog map[string]time.Time - GrantsProgress map[string]int - LastGrantLog map[string]time.Time - LastActionLog time.Time -} - -const maxLogFrequency = 10 * time.Second - -// TODO: use a mutex or a syncmap for when this code becomes parallel -func NewProgressCounts() *ProgressCounts { - return &ProgressCounts{ - Resources: make(map[string]int), - EntitlementsProgress: make(map[string]int), - LastEntitlementLog: make(map[string]time.Time), - GrantsProgress: make(map[string]int), - LastGrantLog: make(map[string]time.Time), - LastActionLog: time.Time{}, - } -} - -func (p *ProgressCounts) LogResourceTypesProgress(ctx context.Context) { - l := ctxzap.Extract(ctx) - l.Info("Synced resource types", zap.Int("count", p.ResourceTypes)) -} - -func (p *ProgressCounts) LogResourcesProgress(ctx context.Context, resourceType string) { - l := ctxzap.Extract(ctx) - resources := p.Resources[resourceType] - l.Info("Synced resources", zap.String("resource_type_id", resourceType), zap.Int("count", resources)) -} - -func (p *ProgressCounts) LogEntitlementsProgress(ctx context.Context, resourceType string) { - entitlementsProgress := p.EntitlementsProgress[resourceType] - resources := p.Resources[resourceType] - - l := ctxzap.Extract(ctx) - if resources == 0 { - // if resuming sync, resource counts will be zero, so don't calculate percentage. just log every 10 seconds. - if time.Since(p.LastEntitlementLog[resourceType]) > maxLogFrequency { - l.Info("Syncing entitlements", - zap.String("resource_type_id", resourceType), - zap.Int("synced", entitlementsProgress), - ) - p.LastEntitlementLog[resourceType] = time.Now() - } - return - } - - percentComplete := (entitlementsProgress * 100) / resources - - switch { - case entitlementsProgress > resources: - l.Error("more entitlement resources than resources", - zap.String("resource_type_id", resourceType), - zap.Int("synced", entitlementsProgress), - zap.Int("total", resources), - ) - case percentComplete == 100: - l.Info("Synced entitlements", - zap.String("resource_type_id", resourceType), - zap.Int("count", entitlementsProgress), - zap.Int("total", resources), - ) - p.LastEntitlementLog[resourceType] = time.Time{} - case time.Since(p.LastEntitlementLog[resourceType]) > maxLogFrequency: - l.Info("Syncing entitlements", - zap.String("resource_type_id", resourceType), - zap.Int("synced", entitlementsProgress), - zap.Int("total", resources), - zap.Int("percent_complete", percentComplete), - ) - p.LastEntitlementLog[resourceType] = time.Now() - } -} - -func (p *ProgressCounts) LogGrantsProgress(ctx context.Context, resourceType string) { - grantsProgress := p.GrantsProgress[resourceType] - resources := p.Resources[resourceType] - - l := ctxzap.Extract(ctx) - if resources == 0 { - // if resuming sync, resource counts will be zero, so don't calculate percentage. just log every 10 seconds. - if time.Since(p.LastGrantLog[resourceType]) > maxLogFrequency { - l.Info("Syncing grants", - zap.String("resource_type_id", resourceType), - zap.Int("synced", grantsProgress), - ) - p.LastGrantLog[resourceType] = time.Now() - } - return - } - - percentComplete := (grantsProgress * 100) / resources - - switch { - case grantsProgress > resources: - l.Error("more grant resources than resources", - zap.String("resource_type_id", resourceType), - zap.Int("synced", grantsProgress), - zap.Int("total", resources), - ) - case percentComplete == 100: - l.Info("Synced grants", - zap.String("resource_type_id", resourceType), - zap.Int("count", grantsProgress), - zap.Int("total", resources), - ) - p.LastGrantLog[resourceType] = time.Time{} - case time.Since(p.LastGrantLog[resourceType]) > maxLogFrequency: - l.Info("Syncing grants", - zap.String("resource_type_id", resourceType), - zap.Int("synced", grantsProgress), - zap.Int("total", resources), - zap.Int("percent_complete", percentComplete), - ) - p.LastGrantLog[resourceType] = time.Now() - } -} - -func (p *ProgressCounts) LogExpandProgress(ctx context.Context, actions []*expand.EntitlementGraphAction) { - actionsLen := len(actions) - if time.Since(p.LastActionLog) < maxLogFrequency { - return - } - p.LastActionLog = time.Now() - - l := ctxzap.Extract(ctx) - l.Info("Expanding grants", zap.Int("actions_remaining", actionsLen)) -} - // syncer orchestrates a connector sync and stores the results using the provided datasource.Writer. type syncer struct { c1zManager manager.Manager @@ -200,7 +67,7 @@ type syncer struct { tmpDir string skipFullSync bool lastCheckPointTime time.Time - counts *ProgressCounts + counts *progress.ProgressCounts skipEGForResourceType map[string]bool } @@ -354,6 +221,18 @@ func (s *syncer) Sync(ctx context.Context) error { l.Debug("beginning new sync", zap.String("sync_id", syncID)) } else { l.Debug("resuming previous sync", zap.String("sync_id", syncID)) + // If connectorwriter is a c1file, get resource counts for logging progress + if c1z, ok := s.store.(*dotc1z.C1File); ok { + counts, err := c1z.ProgressCounts(ctx, syncID) + if err != nil { + return err + } + l.Info("progress counts", zap.Any("counts", counts)) + s.counts.ResourceTypes = counts.ResourceTypes + s.counts.EntitlementsProgress = counts.EntitlementsProgress + s.counts.GrantsProgress = counts.GrantsProgress + s.counts.Resources = counts.Resources + } } currentStep, err := s.store.CurrentSyncStep(ctx) @@ -2395,7 +2274,7 @@ func NewSyncer(ctx context.Context, c types.ConnectorClient, opts ...SyncOpt) (S s := &syncer{ connector: c, skipEGForResourceType: make(map[string]bool), - counts: NewProgressCounts(), + counts: progress.NewProgressCounts(), } for _, o := range opts {