From e80cea792fca966ed0564c957649457ebcdf8184 Mon Sep 17 00:00:00 2001 From: "tomasz.ziolkowski" Date: Fri, 28 Feb 2025 08:58:48 +0100 Subject: [PATCH 1/2] Simple bus implementation Signed-off-by: tomasz.ziolkowski --- README.md | 38 ++++++++ event_bus_test.go | 35 +++++++ generic_bus.go | 170 ++++++++++++++++++++++++++++++++++ generic_bus_test.go | 220 ++++++++++++++++++++++++++++++++++++++++++++ go.mod | 3 + 5 files changed, 466 insertions(+) create mode 100644 generic_bus.go create mode 100644 generic_bus_test.go create mode 100644 go.mod diff --git a/README.md b/README.md index b3bd42c..9a75353 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,12 @@ New returns new EventBus with empty handlers. bus := EventBus.New(); ``` +You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type. +The below example creates a new EventBus with handlers accepting only string arguments: +```go +bus := EventBus.NewSimpleBus[string](); +```` + #### Subscribe(topic string, fn interface{}) error Subscribe to a topic. Returns error if `fn` is not a function. ```go @@ -91,6 +97,8 @@ bus.Subscribe("topic:handler", Handler) bus.Publish("topic:handler", "Hello, World!"); ``` +In the case of simplified bus, there is only one argument accepted of predefined type. + #### SubscribeAsync(topic string, fn interface{}, transactional bool) Subscribe to a topic with an asynchronous callback. Returns error if `fn` is not a function. ```go @@ -147,6 +155,36 @@ func main() { } ``` +### Benchmarks + +```shell +% GOMAXPROCS=1 go test -bench=. +2 +goos: darwin +goarch: amd64 +pkg: github.com/ziollek/EventBus +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkSynchronousPublishing 2877297 406.9 ns/op +BenchmarkAsynchronousPublishing 548457 2101 ns/op +BenchmarkSimpleSynchronousPublishing 3939492 305.5 ns/op +BenchmarkSimpleAsynchronousPublishing 1000000 1141 ns/op +PASS +ok github.com/ziollek/EventBus 9.449s + +% GOMAXPROCS=2 go test -bench=. +2 +goos: darwin +goarch: amd64 +pkg: github.com/ziollek/EventBus +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkSynchronousPublishing-2 3146671 379.3 ns/op +BenchmarkAsynchronousPublishing-2 1050337 1148 ns/op +BenchmarkSimpleSynchronousPublishing-2 5079916 246.0 ns/op +BenchmarkSimpleAsynchronousPublishing-2 2124786 548.2 ns/op +PASS +ok github.com/ziollek/EventBus 9.677s +``` + #### Notes Documentation is available here: [godoc.org](https://godoc.org/github.com/asaskevich/EventBus). Full information about code coverage is also available here: [EventBus on gocover.io](http://gocover.io/github.com/asaskevich/EventBus). diff --git a/event_bus_test.go b/event_bus_test.go index 0cdb579..27c9019 100644 --- a/event_bus_test.go +++ b/event_bus_test.go @@ -188,3 +188,38 @@ func TestSubscribeAsync(t *testing.T) { // t.Fail() //} } + +func BenchmarkSynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := New() + _ = bus.Subscribe("topic", func(i input) {}) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } +} + +func BenchmarkAsynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := New() + _ = bus.SubscribeAsync("topic", func(i input) {}, false) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } + bus.WaitAsync() +} diff --git a/generic_bus.go b/generic_bus.go new file mode 100644 index 0000000..8ef8a1b --- /dev/null +++ b/generic_bus.go @@ -0,0 +1,170 @@ +package EventBus + +import ( + "reflect" + "sync" +) + +// SimpleBusSubscriber defines subscription-related bus behavior for event of specific type T +type SimpleBusSubscriber[T any] interface { + Subscribe(topic string, fn func(T)) + SubscribeAsync(topic string, fn func(T), transactional bool) + SubscribeOnce(topic string, fn func(T)) + SubscribeOnceAsync(topic string, fn func(T)) + Unsubscribe(topic string, handler func(T)) +} + +// SimpleBusPublisher defines publishing-related bus behavior for event of specific type T +type SimpleBusPublisher[T any] interface { + Publish(topic string, arg T) +} + +// SimpleBus includes global (subscribe, publish, control) bus behavior +type SimpleBus[T any] interface { + BusController + SimpleBusSubscriber[T] + SimpleBusPublisher[T] +} + +type callBack[T any] struct { + Call func(T) + pointer uintptr +} + +type genericHandler[T any] struct { + callBack callBack[T] + once bool + async bool + transactional bool + sync.Mutex // lock for an event handler - useful for running async callbacks serially +} + +func (h *genericHandler[T]) Call(arg T, wg *sync.WaitGroup) { + if h.async { + wg.Add(1) + go func() { + defer wg.Done() + if h.transactional { + h.Lock() + defer h.Unlock() + } + h.callBack.Call(arg) + }() + return + } else { + h.callBack.Call(arg) + } +} + +type listeners[T any] []*genericHandler[T] + +func (l *listeners[T]) add(handler *genericHandler[T]) { + for _, h := range *l { + if h.callBack.pointer == handler.callBack.pointer { + return + } + } + *l = append(*l, handler) +} + +func (l *listeners[T]) delete(handler *genericHandler[T]) { + for i, h := range *l { + if h.callBack.pointer == handler.callBack.pointer { + *l = append((*l)[:i], (*l)[i+1:]...) + return + } + } +} + +func newGenericHandler[T any](fn func(T), async, transactional, once bool) *genericHandler[T] { + return &genericHandler[T]{ + callBack: callBack[T]{Call: fn, pointer: reflect.ValueOf(fn).Pointer()}, + async: async, + transactional: transactional, + once: once, + } +} + +type GenericBus[T any] struct { + handlers map[string]*listeners[T] + sync.RWMutex + wg sync.WaitGroup +} + +func NewSimpleBus[T any]() SimpleBus[T] { + return &GenericBus[T]{ + handlers: make(map[string]*listeners[T]), + } +} + +func (bus *GenericBus[_]) HasCallback(topic string) bool { + bus.RLock() + defer bus.RUnlock() + _, ok := bus.handlers[topic] + return ok && len(*bus.handlers[topic]) > 0 +} + +func (bus *GenericBus[T]) Subscribe(topic string, fn func(T)) { + bus.subscribe(topic, fn, false, false, false) +} + +func (bus *GenericBus[T]) SubscribeOnce(topic string, fn func(T)) { + bus.subscribe(topic, fn, false, false, true) +} + +func (bus *GenericBus[T]) SubscribeOnceAsync(topic string, fn func(T)) { + bus.subscribe(topic, fn, true, false, true) +} + +func (bus *GenericBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) { + bus.subscribe(topic, fn, true, transactional, false) +} + +func (bus *GenericBus[T]) Unsubscribe(topic string, fn func(T)) { + bus.Lock() + defer bus.Unlock() + if _, ok := bus.handlers[topic]; ok { + bus.handlers[topic].delete(newGenericHandler(fn, false, false, false)) + } +} + +func (bus *GenericBus[T]) Publish(topic string, arg T) { + if subscribers, ok := bus.fetchSubscribers(topic); ok { + fire := make(chan *genericHandler[T], len(*subscribers)) + bus.Lock() + for _, subscriber := range *subscribers { + fire <- subscriber + if subscriber.once { + bus.handlers[topic].delete(subscriber) + } + } + bus.Unlock() + close(fire) + // calling the callbacks will not block the whole bus + for subscriber := range fire { + subscriber.Call(arg, &bus.wg) + } + } +} + +func (bus *GenericBus[T]) WaitAsync() { + bus.wg.Wait() +} + +func (bus *GenericBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) { + bus.Lock() + defer bus.Unlock() + if _, ok := bus.handlers[topic]; !ok { + bus.handlers[topic] = &listeners[T]{} + } + bus.handlers[topic].add(newGenericHandler(fn, async, transactional, once)) +} + +func (bus *GenericBus[T]) fetchSubscribers(topic string) (*listeners[T], bool) { + bus.RLock() + defer bus.RUnlock() + if handlers, ok := bus.handlers[topic]; ok && len(*handlers) > 0 { + return handlers, true + } + return nil, false +} diff --git a/generic_bus_test.go b/generic_bus_test.go new file mode 100644 index 0000000..4139856 --- /dev/null +++ b/generic_bus_test.go @@ -0,0 +1,220 @@ +package EventBus + +import ( + "testing" + "time" +) + +func TestNewGeneric(t *testing.T) { + bus := NewSimpleBus[any]() + if bus == nil { + t.Log("New GenericBus not created!") + t.Fail() + } +} + +func TestSimpleHasCallback(t *testing.T) { + bus := NewSimpleBus[any]() + bus.Subscribe("topic", func(_ any) {}) + if bus.HasCallback("topic_topic") { + t.Fail() + } + if !bus.HasCallback("topic") { + t.Fail() + } +} + +func TestSimpleOnceAndManySubscribe(t *testing.T) { + bus := NewSimpleBus[any]() + topic := "topic" + flag := 0 + // artificially define different functions with the same logic + // subscription logic prevents from subscribe the same callback for the same topic multiple times + fn1 := func(_ any) { flag += 1 } + fn2 := func(_ any) { flag += 1 } + fn3 := func(_ any) { flag += 1 } + bus.SubscribeOnce(topic, fn1) + bus.Subscribe(topic, fn2) + bus.Subscribe(topic, fn3) + bus.Publish(topic, nil) + + if flag != 3 { + t.Fail() + } +} + +func TestSimpleUnsubscribe(t *testing.T) { + bus := NewSimpleBus[any]() + fn := func(_ any) {} + bus.Subscribe("topic", fn) + if !bus.HasCallback("topic") { + t.Logf("Expected to have callback for topic but it is not present") + t.Fail() + } + bus.Unsubscribe("topic", fn) + if bus.HasCallback("topic") { + t.Logf("Expected to have no callback for topic after unsubscribe but it is present") + t.Fail() + } +} + +type accumulator struct { + val int +} + +func (a *accumulator) Handle(_ any) { + a.val++ +} + +func TestSimpleUnsubscribeMethod(t *testing.T) { + bus := NewSimpleBus[any]() + h := &accumulator{val: 0} + + bus.Subscribe("topic", h.Handle) + bus.Publish("topic", nil) + bus.Unsubscribe("topic", h.Handle) + bus.Publish("topic", nil) + + if h.val != 1 { + t.Fail() + } +} + +func TestSimplePublish(t *testing.T) { + bus := NewSimpleBus[int]() + bus.Subscribe("topic", func(a int) { + if a != 10 { + t.Fail() + } + }) + bus.Publish("topic", 10) +} + +func TestSimpleSubscribeOnceAsync(t *testing.T) { + value := 0 + + bus := NewSimpleBus[*int]() + bus.SubscribeOnceAsync("topic", func(a *int) { + *a++ + }) + + bus.Publish("topic", &value) + bus.Publish("topic", &value) + bus.WaitAsync() + + if value != 1 { + t.Fail() + } + + if bus.HasCallback("topic") { + t.Fail() + } +} + +func TestSimpleSubscribeAsyncTransactional(t *testing.T) { + started := make(chan struct{}) + defer close(started) + results := make([]int, 0) + type input struct { + a int + out *[]int + dur string + } + + bus := NewSimpleBus[input]() + bus.SubscribeAsync("topic", func(i input) { + started <- struct{}{} + sleep, _ := time.ParseDuration(i.dur) + time.Sleep(sleep) + *i.out = append(*i.out, i.a) + }, true) + + bus.Publish("topic", input{a: 1, out: &results, dur: "1s"}) + // started is used to enforce that the first message is started being processed before the second is sent + <-started + bus.Publish("topic", input{a: 2, out: &results, dur: "0s"}) + <-started + + bus.WaitAsync() + + if len(results) != 2 { + t.Logf("Expected 2 results, got %d", len(results)) + t.Fail() + } + + if results[0] != 1 || results[1] != 2 { + t.Logf("Expected [1 2] results, got %+v", results) + t.Fail() + } +} + +func TestSimpleSubscribeAsync(t *testing.T) { + ready := make(chan struct{}) + results := make(chan int) + + type input struct { + a int + out chan<- int + } + + bus := NewSimpleBus[input]() + bus.SubscribeAsync("topic", func(i input) { + i.out <- i.a + }, false) + + bus.Publish("topic", input{a: 1, out: results}) + bus.Publish("topic", input{a: 2, out: results}) + + numResults := 0 + + go func() { + for _ = range results { + numResults++ + } + close(ready) + }() + + bus.WaitAsync() + close(results) + <-ready + + if numResults != 2 { + t.Logf("Expected 2 results, got %d", numResults) + t.Fail() + } +} + +func BenchmarkSimpleSynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := NewSimpleBus[input]() + bus.Subscribe("topic", func(i input) {}) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } +} + +func BenchmarkSimpleAsynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := NewSimpleBus[input]() + bus.SubscribeAsync("topic", func(i input) {}, false) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } + bus.WaitAsync() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..42789cb --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/ziollek/EventBus + +go 1.23.6 From 13bb31a45242948a4b149af7d6b8d1220c80f34f Mon Sep 17 00:00:00 2001 From: "tomasz.ziolkowski" Date: Sat, 1 Mar 2025 16:43:23 +0100 Subject: [PATCH 2/2] introduce SubscriptionRef, add example to README Signed-off-by: tomasz.ziolkowski --- README.md | 40 ++++- generic_bus.go | 170 ---------------------- go.mod | 3 - simple_bus.go | 162 +++++++++++++++++++++ generic_bus_test.go => simple_bus_test.go | 30 ++-- 5 files changed, 217 insertions(+), 188 deletions(-) delete mode 100644 generic_bus.go delete mode 100644 go.mod create mode 100644 simple_bus.go rename generic_bus_test.go => simple_bus_test.go (87%) diff --git a/README.md b/README.md index 9a75353..5b962e7 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,9 @@ import ( ``` #### Example + +General purpose event bus (unlimited number of parameters of any type): + ```go func calculator(a int, b int) { fmt.Printf("%d\n", a + b) @@ -39,6 +42,27 @@ func main() { } ``` +You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type. +Simple event bus (only one parameter of a chosen type): + +```go +type sumComponents struct { + a int + b int +} + +func calculator(args sumComponents) { + fmt.Printf("%d\n", args.a + args.b) +} + +func main() { + bus := EventBus.NewSimpleBus[sumComponents]() + ref := bus.Subscribe("main:calculator", calculator) + bus.Publish("main:calculator", sumComponents{20, 40}) + bus.Unsubscribe("main:calculator", ref) +} +``` + #### Implemented methods * **New()** * **Subscribe()** @@ -56,10 +80,10 @@ New returns new EventBus with empty handlers. bus := EventBus.New(); ``` -You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type. +#### NewSimpleBus[T]() The below example creates a new EventBus with handlers accepting only string arguments: ```go -bus := EventBus.NewSimpleBus[string](); +bus := EventBus.NewSimpleBus[string]() ```` #### Subscribe(topic string, fn interface{}) error @@ -70,6 +94,18 @@ func Handler() { ... } bus.Subscribe("topic:handler", Handler) ``` +#### Subscribe(topic string, fn func(T)) SubscriptionRef + +Simplified version of bus returns `SubscriptionRef` as a result of all subscription methods. It can be used to unsubscribe from the topic. +```go +func Handler(param string) { ... } +... +bus := EventBus.NewSimpleBus[string]() +ref := bus.Subscribe("topic:handler", Handler) +... +bus.Unsubscribe(ref) +``` + #### SubscribeOnce(topic string, fn interface{}) error Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function. ```go diff --git a/generic_bus.go b/generic_bus.go deleted file mode 100644 index 8ef8a1b..0000000 --- a/generic_bus.go +++ /dev/null @@ -1,170 +0,0 @@ -package EventBus - -import ( - "reflect" - "sync" -) - -// SimpleBusSubscriber defines subscription-related bus behavior for event of specific type T -type SimpleBusSubscriber[T any] interface { - Subscribe(topic string, fn func(T)) - SubscribeAsync(topic string, fn func(T), transactional bool) - SubscribeOnce(topic string, fn func(T)) - SubscribeOnceAsync(topic string, fn func(T)) - Unsubscribe(topic string, handler func(T)) -} - -// SimpleBusPublisher defines publishing-related bus behavior for event of specific type T -type SimpleBusPublisher[T any] interface { - Publish(topic string, arg T) -} - -// SimpleBus includes global (subscribe, publish, control) bus behavior -type SimpleBus[T any] interface { - BusController - SimpleBusSubscriber[T] - SimpleBusPublisher[T] -} - -type callBack[T any] struct { - Call func(T) - pointer uintptr -} - -type genericHandler[T any] struct { - callBack callBack[T] - once bool - async bool - transactional bool - sync.Mutex // lock for an event handler - useful for running async callbacks serially -} - -func (h *genericHandler[T]) Call(arg T, wg *sync.WaitGroup) { - if h.async { - wg.Add(1) - go func() { - defer wg.Done() - if h.transactional { - h.Lock() - defer h.Unlock() - } - h.callBack.Call(arg) - }() - return - } else { - h.callBack.Call(arg) - } -} - -type listeners[T any] []*genericHandler[T] - -func (l *listeners[T]) add(handler *genericHandler[T]) { - for _, h := range *l { - if h.callBack.pointer == handler.callBack.pointer { - return - } - } - *l = append(*l, handler) -} - -func (l *listeners[T]) delete(handler *genericHandler[T]) { - for i, h := range *l { - if h.callBack.pointer == handler.callBack.pointer { - *l = append((*l)[:i], (*l)[i+1:]...) - return - } - } -} - -func newGenericHandler[T any](fn func(T), async, transactional, once bool) *genericHandler[T] { - return &genericHandler[T]{ - callBack: callBack[T]{Call: fn, pointer: reflect.ValueOf(fn).Pointer()}, - async: async, - transactional: transactional, - once: once, - } -} - -type GenericBus[T any] struct { - handlers map[string]*listeners[T] - sync.RWMutex - wg sync.WaitGroup -} - -func NewSimpleBus[T any]() SimpleBus[T] { - return &GenericBus[T]{ - handlers: make(map[string]*listeners[T]), - } -} - -func (bus *GenericBus[_]) HasCallback(topic string) bool { - bus.RLock() - defer bus.RUnlock() - _, ok := bus.handlers[topic] - return ok && len(*bus.handlers[topic]) > 0 -} - -func (bus *GenericBus[T]) Subscribe(topic string, fn func(T)) { - bus.subscribe(topic, fn, false, false, false) -} - -func (bus *GenericBus[T]) SubscribeOnce(topic string, fn func(T)) { - bus.subscribe(topic, fn, false, false, true) -} - -func (bus *GenericBus[T]) SubscribeOnceAsync(topic string, fn func(T)) { - bus.subscribe(topic, fn, true, false, true) -} - -func (bus *GenericBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) { - bus.subscribe(topic, fn, true, transactional, false) -} - -func (bus *GenericBus[T]) Unsubscribe(topic string, fn func(T)) { - bus.Lock() - defer bus.Unlock() - if _, ok := bus.handlers[topic]; ok { - bus.handlers[topic].delete(newGenericHandler(fn, false, false, false)) - } -} - -func (bus *GenericBus[T]) Publish(topic string, arg T) { - if subscribers, ok := bus.fetchSubscribers(topic); ok { - fire := make(chan *genericHandler[T], len(*subscribers)) - bus.Lock() - for _, subscriber := range *subscribers { - fire <- subscriber - if subscriber.once { - bus.handlers[topic].delete(subscriber) - } - } - bus.Unlock() - close(fire) - // calling the callbacks will not block the whole bus - for subscriber := range fire { - subscriber.Call(arg, &bus.wg) - } - } -} - -func (bus *GenericBus[T]) WaitAsync() { - bus.wg.Wait() -} - -func (bus *GenericBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) { - bus.Lock() - defer bus.Unlock() - if _, ok := bus.handlers[topic]; !ok { - bus.handlers[topic] = &listeners[T]{} - } - bus.handlers[topic].add(newGenericHandler(fn, async, transactional, once)) -} - -func (bus *GenericBus[T]) fetchSubscribers(topic string) (*listeners[T], bool) { - bus.RLock() - defer bus.RUnlock() - if handlers, ok := bus.handlers[topic]; ok && len(*handlers) > 0 { - return handlers, true - } - return nil, false -} diff --git a/go.mod b/go.mod deleted file mode 100644 index 42789cb..0000000 --- a/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/ziollek/EventBus - -go 1.23.6 diff --git a/simple_bus.go b/simple_bus.go new file mode 100644 index 0000000..7b9521e --- /dev/null +++ b/simple_bus.go @@ -0,0 +1,162 @@ +package EventBus + +import ( + "sync" +) + +type SubscriptionRef uint64 + +// SimpleBusSubscriber defines subscription-related bus behavior for event of specific type T +type SimpleBusSubscriber[T any] interface { + Subscribe(topic string, fn func(T)) SubscriptionRef + SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef + SubscribeOnce(topic string, fn func(T)) SubscriptionRef + SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef + Unsubscribe(topic string, ref SubscriptionRef) +} + +// SimpleBusPublisher defines publishing-related bus behavior for event of specific type T +type SimpleBusPublisher[T any] interface { + Publish(topic string, arg T) +} + +// SimpleBus includes global (subscribe, publish, control) bus behavior +type SimpleBus[T any] interface { + BusController + SimpleBusSubscriber[T] + SimpleBusPublisher[T] +} + +type simpleHandler[T any] struct { + callBack func(T) + once bool + async bool + transactional bool + reference SubscriptionRef + sync.Mutex // lock for an event handler - useful for running async callbacks serially +} + +func (h *simpleHandler[T]) Call(arg T, wg *sync.WaitGroup) { + if h.async { + wg.Add(1) + go func() { + defer wg.Done() + if h.transactional { + h.Lock() + defer h.Unlock() + } + h.callBack(arg) + }() + return + } else { + h.callBack(arg) + } +} + +type listeners[T any] map[SubscriptionRef]*simpleHandler[T] + +func (l listeners[T]) add(handler *simpleHandler[T]) { + l[handler.reference] = handler +} + +func (l listeners[_]) delete(ref SubscriptionRef) { + delete(l, ref) +} + +func newSimpleHandler[T any](fn func(T), ref SubscriptionRef, async, transactional, once bool) *simpleHandler[T] { + return &simpleHandler[T]{ + callBack: fn, + reference: ref, + async: async, + transactional: transactional, + once: once, + } +} + +type TypedBus[T any] struct { + handlers map[string]*listeners[T] + lastRef SubscriptionRef + sync.RWMutex + wg sync.WaitGroup +} + +func NewSimpleBus[T any]() SimpleBus[T] { + return &TypedBus[T]{ + handlers: make(map[string]*listeners[T]), + lastRef: SubscriptionRef(0), + } +} + +func (bus *TypedBus[_]) HasCallback(topic string) bool { + bus.RLock() + defer bus.RUnlock() + _, ok := bus.handlers[topic] + return ok && len(*bus.handlers[topic]) > 0 +} + +func (bus *TypedBus[T]) Subscribe(topic string, fn func(T)) SubscriptionRef { + return bus.subscribe(topic, fn, false, false, false) +} + +func (bus *TypedBus[T]) SubscribeOnce(topic string, fn func(T)) SubscriptionRef { + return bus.subscribe(topic, fn, false, false, true) +} + +func (bus *TypedBus[T]) SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef { + return bus.subscribe(topic, fn, true, false, true) +} + +func (bus *TypedBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef { + return bus.subscribe(topic, fn, true, transactional, false) +} + +func (bus *TypedBus[T]) Unsubscribe(topic string, ref SubscriptionRef) { + bus.Lock() + defer bus.Unlock() + if _, ok := bus.handlers[topic]; ok { + bus.handlers[topic].delete(ref) + } +} + +func (bus *TypedBus[T]) Publish(topic string, arg T) { + if subscribers, ok := bus.fetchSubscribers(topic); ok { + fire := make(chan *simpleHandler[T], len(*subscribers)) + bus.Lock() + for _, subscriber := range *subscribers { + fire <- subscriber + if subscriber.once { + bus.handlers[topic].delete(subscriber.reference) + } + } + bus.Unlock() + close(fire) + // calling the callbacks will not block the whole bus + for subscriber := range fire { + subscriber.Call(arg, &bus.wg) + } + } +} + +func (bus *TypedBus[T]) WaitAsync() { + bus.wg.Wait() +} + +func (bus *TypedBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) SubscriptionRef { + bus.Lock() + defer bus.Unlock() + bus.lastRef++ + if _, ok := bus.handlers[topic]; !ok { + bus.handlers[topic] = &listeners[T]{} + } + bus.handlers[topic].add(newSimpleHandler(fn, bus.lastRef, async, transactional, once)) + return bus.lastRef +} + +func (bus *TypedBus[T]) fetchSubscribers(topic string) (*listeners[T], bool) { + bus.RLock() + defer bus.RUnlock() + if handlers, ok := bus.handlers[topic]; ok && len(*handlers) > 0 { + return handlers, true + } + return nil, false +} diff --git a/generic_bus_test.go b/simple_bus_test.go similarity index 87% rename from generic_bus_test.go rename to simple_bus_test.go index 4139856..70e8063 100644 --- a/generic_bus_test.go +++ b/simple_bus_test.go @@ -1,6 +1,7 @@ package EventBus import ( + "fmt" "testing" "time" ) @@ -8,7 +9,7 @@ import ( func TestNewGeneric(t *testing.T) { bus := NewSimpleBus[any]() if bus == nil { - t.Log("New GenericBus not created!") + t.Log("New TypedBus not created!") t.Fail() } } @@ -28,14 +29,17 @@ func TestSimpleOnceAndManySubscribe(t *testing.T) { bus := NewSimpleBus[any]() topic := "topic" flag := 0 - // artificially define different functions with the same logic - // subscription logic prevents from subscribe the same callback for the same topic multiple times - fn1 := func(_ any) { flag += 1 } - fn2 := func(_ any) { flag += 1 } - fn3 := func(_ any) { flag += 1 } - bus.SubscribeOnce(topic, fn1) - bus.Subscribe(topic, fn2) - bus.Subscribe(topic, fn3) + + fn := func(_ any) { flag += 1 } + + refFirst := bus.SubscribeOnce(topic, fn) + refSecond := bus.Subscribe(topic, fn) + refThird := bus.Subscribe(topic, fn) + + if refFirst == refSecond || refFirst == refThird || refSecond == refThird { + t.Fail() + } + bus.Publish(topic, nil) if flag != 3 { @@ -46,12 +50,12 @@ func TestSimpleOnceAndManySubscribe(t *testing.T) { func TestSimpleUnsubscribe(t *testing.T) { bus := NewSimpleBus[any]() fn := func(_ any) {} - bus.Subscribe("topic", fn) + ref := bus.Subscribe("topic", fn) if !bus.HasCallback("topic") { t.Logf("Expected to have callback for topic but it is not present") t.Fail() } - bus.Unsubscribe("topic", fn) + bus.Unsubscribe("topic", ref) if bus.HasCallback("topic") { t.Logf("Expected to have no callback for topic after unsubscribe but it is present") t.Fail() @@ -70,9 +74,9 @@ func TestSimpleUnsubscribeMethod(t *testing.T) { bus := NewSimpleBus[any]() h := &accumulator{val: 0} - bus.Subscribe("topic", h.Handle) + ref := bus.Subscribe("topic", h.Handle) bus.Publish("topic", nil) - bus.Unsubscribe("topic", h.Handle) + bus.Unsubscribe("topic", ref) bus.Publish("topic", nil) if h.val != 1 {