Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 44 additions & 34 deletions pkg/synccompactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ const (
)

type Compactor struct {
compactorType CompactorType
entries []*CompactableSync
compactorType CompactorType
entries []*CompactableSync
shouldExpandGrants bool

tmpDir string
destDir string
Expand Down Expand Up @@ -62,15 +63,22 @@ func WithCompactorType(compactorType CompactorType) Option {
}
}

func WithShouldExpandGrants(shouldExpandGrants bool) Option {
return func(c *Compactor) {
c.shouldExpandGrants = shouldExpandGrants
}
}

func NewCompactor(ctx context.Context, outputDir string, compactableSyncs []*CompactableSync, opts ...Option) (*Compactor, func() error, error) {
if len(compactableSyncs) < 2 {
return nil, nil, ErrNotEnoughFilesToCompact
}

c := &Compactor{
entries: compactableSyncs,
destDir: outputDir,
compactorType: CompactorTypeAttached,
entries: compactableSyncs,
destDir: outputDir,
compactorType: CompactorTypeAttached,
shouldExpandGrants: true,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -113,37 +121,39 @@ func (c *Compactor) Compact(ctx context.Context) (*CompactableSync, error) {
}
}

l := ctxzap.Extract(ctx)
// Grant expansion doesn't use the connector interface at all, so giving syncer an empty connector is safe... for now.
// If that ever changes, we should implement a file connector that is a wrapper around the reader.
emptyConnector, err := sdk.NewEmptyConnector()
if err != nil {
l.Error("error creating empty connector", zap.Error(err))
return nil, err
}
if c.shouldExpandGrants {
l := ctxzap.Extract(ctx)
// Grant expansion doesn't use the connector interface at all, so giving syncer an empty connector is safe... for now.
// If that ever changes, we should implement a file connector that is a wrapper around the reader.
emptyConnector, err := sdk.NewEmptyConnector()
if err != nil {
l.Error("error creating empty connector", zap.Error(err))
return nil, err
}

// Use syncer to expand grants.
// TODO: Handle external resources.
syncer, err := sync.NewSyncer(
ctx,
emptyConnector,
sync.WithC1ZPath(applied.FilePath),
sync.WithTmpDir(c.tmpDir),
sync.WithSyncID(applied.SyncID),
sync.WithOnlyExpandGrants(),
)
if err != nil {
l.Error("error creating syncer", zap.Error(err))
return nil, err
}
// Use syncer to expand grants.
// TODO: Handle external resources.
syncer, err := sync.NewSyncer(
ctx,
emptyConnector,
sync.WithC1ZPath(applied.FilePath),
sync.WithTmpDir(c.tmpDir),
sync.WithSyncID(applied.SyncID),
sync.WithOnlyExpandGrants(),
)
if err != nil {
l.Error("error creating syncer", zap.Error(err))
return nil, err
}

if err := syncer.Sync(ctx); err != nil {
l.Error("error syncing with grant expansion", zap.Error(err))
return nil, err
}
if err := syncer.Close(ctx); err != nil {
l.Error("error closing syncer", zap.Error(err))
return nil, err
if err := syncer.Sync(ctx); err != nil {
l.Error("error syncing with grant expansion", zap.Error(err))
return nil, err
}
if err := syncer.Close(ctx); err != nil {
l.Error("error closing syncer", zap.Error(err))
return nil, err
}
}

// Move last compacted file to the destination dir
Expand Down
Loading