From 7cf4f3aee7444c907eebb9ff85af0685e0aca085 Mon Sep 17 00:00:00 2001 From: Shivaprasad Date: Fri, 3 Nov 2023 13:51:02 +0530 Subject: [PATCH] feat: add worker-count config and launch multiple workers --- cli/config.go | 1 + cli/serve.go | 13 ++++++++----- core/sync.go | 21 ++++++++++++++++++++- modules/firehose/driver_plan.go | 4 +--- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/cli/config.go b/cli/config.go index 52b320e9..65e7eafc 100644 --- a/cli/config.go +++ b/cli/config.go @@ -27,6 +27,7 @@ type Config struct { } type syncerConf struct { + WorkerCount int `mapstructure:"worker_count" default:"1"` SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"` RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"` ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"` diff --git a/cli/serve.go b/cli/serve.go index 0cc100a0..986c6b6d 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -49,6 +49,9 @@ func cmdServe() *cobra.Command { newrelic.ConfigAppName(cfg.Telemetry.ServiceName), newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey), ) + if err != nil { + return err + } store := setupStorage(cfg.PGConnStr, cfg.Syncer) moduleService := module.NewService(setupRegistry(), store) @@ -61,11 +64,11 @@ func cmdServe() *cobra.Command { } if spawnWorker { - go func() { - if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil { - zap.L().Error("syncer exited with error", zap.Error(err)) - } - }() + go resourceService.RunSyncer( + cmd.Context(), + cfg.Syncer.WorkerCount, + cfg.Syncer.SyncInterval, + ) } return entropyserver.Serve(cmd.Context(), diff --git a/core/sync.go b/core/sync.go index d4d17795..ac928e50 100644 --- a/core/sync.go +++ b/core/sync.go @@ -2,6 +2,7 @@ package core import ( "context" + "sync" "time" "go.uber.org/zap" @@ -12,7 +13,24 @@ import ( // RunSyncer runs the syncer thread that keeps performing resource-sync at // regular intervals. -func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error { +func (svc *Service) RunSyncer(ctx context.Context, workerCount int, interval time.Duration) { + wg := &sync.WaitGroup{} + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + if err := svc.runWorker(ctx, id, interval); err != nil { + zap.L().Error("worker-%d failed", zap.Error(err)) + } + }(i) + } + wg.Wait() + + zap.L().Info("all syncer workers exited") +} + +func (svc *Service) runWorker(ctx context.Context, _ int, interval time.Duration) error { tick := time.NewTimer(interval) defer tick.Stop() @@ -27,6 +45,7 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error err := svc.store.SyncOne(ctx, svc.handleSync) if err != nil { zap.L().Warn("SyncOne() failed", zap.Error(err)) + continue } } } diff --git a/modules/firehose/driver_plan.go b/modules/firehose/driver_plan.go index aa2601d4..0546de20 100644 --- a/modules/firehose/driver_plan.go +++ b/modules/firehose/driver_plan.go @@ -16,9 +16,7 @@ import ( const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" -var ( - suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`) -) +var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`) var errCauseInvalidNamespaceUpdate = "cannot update kube namespace of a running firehose"