From 84469ec2b11103b96efd3a558f5e1faaf19c1227 Mon Sep 17 00:00:00 2001 From: Vladislav Yarmak Date: Fri, 15 Apr 2022 00:49:00 +0300 Subject: [PATCH] do not hold lock on whole publish --- event_bus.go | 63 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/event_bus.go b/event_bus.go index dedc7fd..3044612 100644 --- a/event_bus.go +++ b/event_bus.go @@ -42,7 +42,7 @@ type EventBus struct { type eventHandler struct { callBack reflect.Value - flagOnce bool + once *sync.Once async bool transactional bool sync.Mutex // lock for an event handler - useful for running async callbacks serially @@ -73,7 +73,7 @@ func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHan // Returns error if `fn` is not a function. func (bus *EventBus) Subscribe(topic string, fn interface{}) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), false, false, false, sync.Mutex{}, + reflect.ValueOf(fn), nil, false, false, sync.Mutex{}, }) } @@ -83,7 +83,7 @@ func (bus *EventBus) Subscribe(topic string, fn interface{}) error { // Returns error if `fn` is not a function. func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), false, true, transactional, sync.Mutex{}, + reflect.ValueOf(fn), nil, true, transactional, sync.Mutex{}, }) } @@ -91,7 +91,7 @@ func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional // Returns error if `fn` is not a function. func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), true, false, false, sync.Mutex{}, + reflect.ValueOf(fn), new(sync.Once), false, false, sync.Mutex{}, }) } @@ -100,7 +100,7 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error { // Returns error if `fn` is not a function. func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error { return bus.doSubscribe(topic, fn, &eventHandler{ - reflect.ValueOf(fn), true, true, false, sync.Mutex{}, + reflect.ValueOf(fn), new(sync.Once), true, false, sync.Mutex{}, }) } @@ -129,35 +129,44 @@ func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error { // Publish executes callback defined for a topic. Any additional argument will be transferred to the callback. func (bus *EventBus) Publish(topic string, args ...interface{}) { - bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish - defer bus.lock.Unlock() - if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) { - // Handlers slice may be changed by removeHandler and Unsubscribe during iteration, - // so make a copy and iterate the copied slice. - copyHandlers := make([]*eventHandler, len(handlers)) - copy(copyHandlers, handlers) - for i, handler := range copyHandlers { - if handler.flagOnce { - bus.removeHandler(topic, i) - } - if !handler.async { - bus.doPublish(handler, topic, args...) - } else { - bus.wg.Add(1) - if handler.transactional { - bus.lock.Unlock() - handler.Lock() - bus.lock.Lock() - } - go bus.doPublishAsync(handler, topic, args...) + // Handlers slice may be changed by removeHandler and Unsubscribe during iteration, + // so make a copy and iterate the copied slice. + bus.lock.Lock() + handlers := bus.handlers[topic] + copyHandlers := make([]*eventHandler, len(handlers)) + copy(copyHandlers, handlers) + bus.lock.Unlock() + for _, handler := range copyHandlers { + if !handler.async { + bus.doPublish(handler, topic, args...) + } else { + bus.wg.Add(1) + if handler.transactional { + handler.Lock() } + go bus.doPublishAsync(handler, topic, args...) } } } func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) { passedArguments := bus.setUpPublish(handler, args...) - handler.callBack.Call(passedArguments) + if handler.once == nil { + handler.callBack.Call(passedArguments) + } else { + handler.once.Do(func() { + bus.lock.Lock() + for idx, h := range bus.handlers[topic] { + // compare pointers since pointers are unique for all members of slice + if h.once == handler.once { + bus.removeHandler(topic, idx) + break + } + } + bus.lock.Unlock() + handler.callBack.Call(passedArguments) + }) + } } func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) {