From 4d41c032055406d5e97054f6eee25590486aeae3 Mon Sep 17 00:00:00 2001 From: ehsan Date: Mon, 29 Jan 2024 22:29:25 +0330 Subject: [PATCH 1/2] Add goroutine pool option --- worker/worker.go | 48 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 8db5777..3eb6e58 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -2,16 +2,25 @@ package worker import "sync" +const ( + WithoutGoroutinePool = -1 +) + +type Handler[I any, O any] func(I) O + type Worker[I any, O any] struct { - wg sync.WaitGroup - jobChan chan I - resultChan chan O + wg sync.WaitGroup + jobChan chan I + resultChan chan O + handler Handler[I, O] + maxGoroutines int } -func NewWorker[I any, O any]() *Worker[I, O] { +func NewWorker[I any, O any](maxGoroutines int) *Worker[I, O] { return &Worker[I, O]{ - jobChan: make(chan I), - resultChan: make(chan O), + jobChan: make(chan I), + resultChan: make(chan O), + maxGoroutines: maxGoroutines, } } @@ -30,11 +39,28 @@ func (w *Worker[I, O]) SubmitJob(job I) { w.jobChan <- job } -func (w *Worker[I, O]) HandleJobs(function func(I) O) { +func (w *Worker[I, O]) SetHandler(handler Handler[I, O]) { + w.handler = handler +} + +func (w *Worker[I, O]) workerGoroutine(id int) { for job := range w.jobChan { - go func(job I) { - w.resultChan <- function(job) - w.wg.Done() - }(job) + w.resultChan <- w.handler(job) + w.wg.Done() + } +} + +func (w *Worker[I, O]) HandleJobs() { + if w.maxGoroutines == WithoutGoroutinePool { + for job := range w.jobChan { + go func(job I) { + w.resultChan <- w.handler(job) + w.wg.Done() + }(job) + } + } else { + for i := 1; i <= w.maxGoroutines; i++ { + go w.workerGoroutine(i) + } } } From 1e0087de3e59a96d7f0b5e80f21eb350c35a930b Mon Sep 17 00:00:00 2001 From: ehsan Date: Tue, 6 Feb 2024 23:21:45 +0330 Subject: [PATCH 2/2] Adding and doing waitgroup --- worker/worker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index 3eb6e58..aba0b3a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -35,7 +35,6 @@ func (w *Worker[I, O]) Wait() { } func (w *Worker[I, O]) SubmitJob(job I) { - w.wg.Add(1) w.jobChan <- job } @@ -44,6 +43,7 @@ func (w *Worker[I, O]) SetHandler(handler Handler[I, O]) { } func (w *Worker[I, O]) workerGoroutine(id int) { + defer w.wg.Done() for job := range w.jobChan { w.resultChan <- w.handler(job) w.wg.Done() @@ -60,6 +60,7 @@ func (w *Worker[I, O]) HandleJobs() { } } else { for i := 1; i <= w.maxGoroutines; i++ { + w.wg.Add(1) go w.workerGoroutine(i) } }