Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
}

type syncerConf struct {
WorkerCount int `mapstructure:"worker_count" default:"1"`
SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"`
RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"`
ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"`
Expand Down
13 changes: 8 additions & 5 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func cmdServe() *cobra.Command {
newrelic.ConfigAppName(cfg.Telemetry.ServiceName),
newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey),
)
if err != nil {
return err
}

store := setupStorage(cfg.PGConnStr, cfg.Syncer)
moduleService := module.NewService(setupRegistry(), store)
Expand All @@ -61,11 +64,11 @@ func cmdServe() *cobra.Command {
}

if spawnWorker {
go func() {
if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil {
zap.L().Error("syncer exited with error", zap.Error(err))
}
}()
go resourceService.RunSyncer(
cmd.Context(),
cfg.Syncer.WorkerCount,
cfg.Syncer.SyncInterval,
)
}

return entropyserver.Serve(cmd.Context(),
Expand Down
21 changes: 20 additions & 1 deletion core/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -12,7 +13,24 @@ import (

// RunSyncer runs the syncer thread that keeps performing resource-sync at
// regular intervals.
func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error {
func (svc *Service) RunSyncer(ctx context.Context, workerCount int, interval time.Duration) {
wg := &sync.WaitGroup{}
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

if err := svc.runWorker(ctx, id, interval); err != nil {
zap.L().Error("worker-%d failed", zap.Error(err))
}
}(i)
}
wg.Wait()

zap.L().Info("all syncer workers exited")
}

func (svc *Service) runWorker(ctx context.Context, _ int, interval time.Duration) error {
tick := time.NewTimer(interval)
defer tick.Stop()

Expand All @@ -27,6 +45,7 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error
err := svc.store.SyncOne(ctx, svc.handleSync)
if err != nil {
zap.L().Warn("SyncOne() failed", zap.Error(err))
continue
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions modules/firehose/driver_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (

const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"

var (
suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)
)
var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`)

var errCauseInvalidNamespaceUpdate = "cannot update kube namespace of a running firehose"

Expand Down