From e40a8371f8243230fab665939b06c051ff95b608 Mon Sep 17 00:00:00 2001 From: appleboy Date: Sun, 13 Apr 2025 10:48:56 +0800 Subject: [PATCH] feat: implement listener for asynchronous notifications in queue - Add `notify` channel to `Queue` - Notify worker in the `queue` function without blocking the main thread - Add case to listen for `notify` channel in the `start` function Signed-off-by: appleboy --- queue.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/queue.go b/queue.go index 11f59e1..836de8a 100644 --- a/queue.go +++ b/queue.go @@ -26,6 +26,7 @@ type ( routineGroup *routineGroup quit chan struct{} ready chan struct{} + notify chan struct{} worker core.Worker stopOnce sync.Once stopFlag int32 @@ -44,6 +45,7 @@ func NewQueue(opts ...Option) (*Queue, error) { routineGroup: newRoutineGroup(), quit: make(chan struct{}), ready: make(chan struct{}, 1), + notify: make(chan struct{}, 1), workerCount: o.workerCount, logger: o.logger, worker: o.worker, @@ -149,6 +151,13 @@ func (q *Queue) queue(m *job.Message) error { } q.metric.IncSubmittedTask() + // notify worker + // if the channel is full, it means that the worker is busy + // and we don't want to block the main thread + select { + case q.notify <- struct{}{}: + default: + } return nil } @@ -325,6 +334,7 @@ func (q *Queue) start() { return } case <-ticker.C: + case <-q.notify: } } }