Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type EventBus struct {

type eventHandler struct {
callBack reflect.Value
flagOnce bool
once *sync.Once
async bool
transactional bool
sync.Mutex // lock for an event handler - useful for running async callbacks serially
Expand Down Expand Up @@ -73,7 +73,7 @@ func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHan
// Returns error if `fn` is not a function.
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, false, false, sync.Mutex{},
reflect.ValueOf(fn), nil, false, false, sync.Mutex{},
})
}

Expand All @@ -83,15 +83,15 @@ func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, true, transactional, sync.Mutex{},
reflect.ValueOf(fn), nil, true, transactional, sync.Mutex{},
})
}

// SubscribeOnce subscribes to a topic once. Handler will be removed after executing.
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, false, false, sync.Mutex{},
reflect.ValueOf(fn), new(sync.Once), false, false, sync.Mutex{},
})
}

Expand All @@ -100,7 +100,7 @@ func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, true, false, sync.Mutex{},
reflect.ValueOf(fn), new(sync.Once), true, false, sync.Mutex{},
})
}

Expand Down Expand Up @@ -129,35 +129,44 @@ func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
// so make a copy and iterate the copied slice.
copyHandlers := make([]*eventHandler, len(handlers))
copy(copyHandlers, handlers)
for i, handler := range copyHandlers {
if handler.flagOnce {
bus.removeHandler(topic, i)
}
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
bus.lock.Unlock()
handler.Lock()
bus.lock.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
// so make a copy and iterate the copied slice.
bus.lock.Lock()
handlers := bus.handlers[topic]
copyHandlers := make([]*eventHandler, len(handlers))
copy(copyHandlers, handlers)
bus.lock.Unlock()
for _, handler := range copyHandlers {
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
handler.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
}
}
}

func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) {
passedArguments := bus.setUpPublish(handler, args...)
handler.callBack.Call(passedArguments)
if handler.once == nil {
handler.callBack.Call(passedArguments)
} else {
handler.once.Do(func() {
bus.lock.Lock()
for idx, h := range bus.handlers[topic] {
// compare pointers since pointers are unique for all members of slice
if h.once == handler.once {
bus.removeHandler(topic, idx)
break
}
}
bus.lock.Unlock()
handler.callBack.Call(passedArguments)
})
}
}

func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) {
Expand Down