diff --git a/worker/worker.go b/worker/worker.go index 8db5777..aba0b3a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -2,16 +2,25 @@ package worker import "sync" +const ( + WithoutGoroutinePool = -1 +) + +type Handler[I any, O any] func(I) O + type Worker[I any, O any] struct { - wg sync.WaitGroup - jobChan chan I - resultChan chan O + wg sync.WaitGroup + jobChan chan I + resultChan chan O + handler Handler[I, O] + maxGoroutines int } -func NewWorker[I any, O any]() *Worker[I, O] { +func NewWorker[I any, O any](maxGoroutines int) *Worker[I, O] { return &Worker[I, O]{ - jobChan: make(chan I), - resultChan: make(chan O), + jobChan: make(chan I), + resultChan: make(chan O), + maxGoroutines: maxGoroutines, } } @@ -26,15 +35,33 @@ func (w *Worker[I, O]) Wait() { } func (w *Worker[I, O]) SubmitJob(job I) { - w.wg.Add(1) w.jobChan <- job } -func (w *Worker[I, O]) HandleJobs(function func(I) O) { +func (w *Worker[I, O]) SetHandler(handler Handler[I, O]) { + w.handler = handler +} + +func (w *Worker[I, O]) workerGoroutine(id int) { + defer w.wg.Done() for job := range w.jobChan { - go func(job I) { - w.resultChan <- function(job) - w.wg.Done() - }(job) + w.resultChan <- w.handler(job) + w.wg.Done() + } +} + +func (w *Worker[I, O]) HandleJobs() { + if w.maxGoroutines == WithoutGoroutinePool { + for job := range w.jobChan { + go func(job I) { + w.resultChan <- w.handler(job) + w.wg.Done() + }(job) + } + } else { + for i := 1; i <= w.maxGoroutines; i++ { + w.wg.Add(1) + go w.workerGoroutine(i) + } } }