diff --git a/README.md b/README.md index b3bd42c..5b962e7 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,9 @@ import ( ``` #### Example + +General purpose event bus (unlimited number of parameters of any type): + ```go func calculator(a int, b int) { fmt.Printf("%d\n", a + b) @@ -39,6 +42,27 @@ func main() { } ``` +You can alternatively use ~50% faster implementation if your listener accepts only one argument of a particular type. +Simple event bus (only one parameter of a chosen type): + +```go +type sumComponents struct { + a int + b int +} + +func calculator(args sumComponents) { + fmt.Printf("%d\n", args.a + args.b) +} + +func main() { + bus := EventBus.NewSimpleBus[sumComponents]() + ref := bus.Subscribe("main:calculator", calculator) + bus.Publish("main:calculator", sumComponents{20, 40}) + bus.Unsubscribe("main:calculator", ref) +} +``` + #### Implemented methods * **New()** * **Subscribe()** @@ -56,6 +80,12 @@ New returns new EventBus with empty handlers. bus := EventBus.New(); ``` +#### NewSimpleBus[T]() +The below example creates a new EventBus with handlers accepting only string arguments: +```go +bus := EventBus.NewSimpleBus[string]() +```` + #### Subscribe(topic string, fn interface{}) error Subscribe to a topic. Returns error if `fn` is not a function. ```go @@ -64,6 +94,18 @@ func Handler() { ... } bus.Subscribe("topic:handler", Handler) ``` +#### Subscribe(topic string, fn func(T)) SubscriptionRef + +Simplified version of bus returns `SubscriptionRef` as a result of all subscription methods. It can be used to unsubscribe from the topic. +```go +func Handler(param string) { ... } +... +bus := EventBus.NewSimpleBus[string]() +ref := bus.Subscribe("topic:handler", Handler) +... +bus.Unsubscribe(ref) +``` + #### SubscribeOnce(topic string, fn interface{}) error Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function. ```go @@ -91,6 +133,8 @@ bus.Subscribe("topic:handler", Handler) bus.Publish("topic:handler", "Hello, World!"); ``` +In the case of simplified bus, there is only one argument accepted of predefined type. + #### SubscribeAsync(topic string, fn interface{}, transactional bool) Subscribe to a topic with an asynchronous callback. Returns error if `fn` is not a function. ```go @@ -147,6 +191,36 @@ func main() { } ``` +### Benchmarks + +```shell +% GOMAXPROCS=1 go test -bench=. +2 +goos: darwin +goarch: amd64 +pkg: github.com/ziollek/EventBus +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkSynchronousPublishing 2877297 406.9 ns/op +BenchmarkAsynchronousPublishing 548457 2101 ns/op +BenchmarkSimpleSynchronousPublishing 3939492 305.5 ns/op +BenchmarkSimpleAsynchronousPublishing 1000000 1141 ns/op +PASS +ok github.com/ziollek/EventBus 9.449s + +% GOMAXPROCS=2 go test -bench=. +2 +goos: darwin +goarch: amd64 +pkg: github.com/ziollek/EventBus +cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz +BenchmarkSynchronousPublishing-2 3146671 379.3 ns/op +BenchmarkAsynchronousPublishing-2 1050337 1148 ns/op +BenchmarkSimpleSynchronousPublishing-2 5079916 246.0 ns/op +BenchmarkSimpleAsynchronousPublishing-2 2124786 548.2 ns/op +PASS +ok github.com/ziollek/EventBus 9.677s +``` + #### Notes Documentation is available here: [godoc.org](https://godoc.org/github.com/asaskevich/EventBus). Full information about code coverage is also available here: [EventBus on gocover.io](http://gocover.io/github.com/asaskevich/EventBus). diff --git a/event_bus_test.go b/event_bus_test.go index 0cdb579..27c9019 100644 --- a/event_bus_test.go +++ b/event_bus_test.go @@ -188,3 +188,38 @@ func TestSubscribeAsync(t *testing.T) { // t.Fail() //} } + +func BenchmarkSynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := New() + _ = bus.Subscribe("topic", func(i input) {}) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } +} + +func BenchmarkAsynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := New() + _ = bus.SubscribeAsync("topic", func(i input) {}, false) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } + bus.WaitAsync() +} diff --git a/simple_bus.go b/simple_bus.go new file mode 100644 index 0000000..7b9521e --- /dev/null +++ b/simple_bus.go @@ -0,0 +1,162 @@ +package EventBus + +import ( + "sync" +) + +type SubscriptionRef uint64 + +// SimpleBusSubscriber defines subscription-related bus behavior for event of specific type T +type SimpleBusSubscriber[T any] interface { + Subscribe(topic string, fn func(T)) SubscriptionRef + SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef + SubscribeOnce(topic string, fn func(T)) SubscriptionRef + SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef + Unsubscribe(topic string, ref SubscriptionRef) +} + +// SimpleBusPublisher defines publishing-related bus behavior for event of specific type T +type SimpleBusPublisher[T any] interface { + Publish(topic string, arg T) +} + +// SimpleBus includes global (subscribe, publish, control) bus behavior +type SimpleBus[T any] interface { + BusController + SimpleBusSubscriber[T] + SimpleBusPublisher[T] +} + +type simpleHandler[T any] struct { + callBack func(T) + once bool + async bool + transactional bool + reference SubscriptionRef + sync.Mutex // lock for an event handler - useful for running async callbacks serially +} + +func (h *simpleHandler[T]) Call(arg T, wg *sync.WaitGroup) { + if h.async { + wg.Add(1) + go func() { + defer wg.Done() + if h.transactional { + h.Lock() + defer h.Unlock() + } + h.callBack(arg) + }() + return + } else { + h.callBack(arg) + } +} + +type listeners[T any] map[SubscriptionRef]*simpleHandler[T] + +func (l listeners[T]) add(handler *simpleHandler[T]) { + l[handler.reference] = handler +} + +func (l listeners[_]) delete(ref SubscriptionRef) { + delete(l, ref) +} + +func newSimpleHandler[T any](fn func(T), ref SubscriptionRef, async, transactional, once bool) *simpleHandler[T] { + return &simpleHandler[T]{ + callBack: fn, + reference: ref, + async: async, + transactional: transactional, + once: once, + } +} + +type TypedBus[T any] struct { + handlers map[string]*listeners[T] + lastRef SubscriptionRef + sync.RWMutex + wg sync.WaitGroup +} + +func NewSimpleBus[T any]() SimpleBus[T] { + return &TypedBus[T]{ + handlers: make(map[string]*listeners[T]), + lastRef: SubscriptionRef(0), + } +} + +func (bus *TypedBus[_]) HasCallback(topic string) bool { + bus.RLock() + defer bus.RUnlock() + _, ok := bus.handlers[topic] + return ok && len(*bus.handlers[topic]) > 0 +} + +func (bus *TypedBus[T]) Subscribe(topic string, fn func(T)) SubscriptionRef { + return bus.subscribe(topic, fn, false, false, false) +} + +func (bus *TypedBus[T]) SubscribeOnce(topic string, fn func(T)) SubscriptionRef { + return bus.subscribe(topic, fn, false, false, true) +} + +func (bus *TypedBus[T]) SubscribeOnceAsync(topic string, fn func(T)) SubscriptionRef { + return bus.subscribe(topic, fn, true, false, true) +} + +func (bus *TypedBus[T]) SubscribeAsync(topic string, fn func(T), transactional bool) SubscriptionRef { + return bus.subscribe(topic, fn, true, transactional, false) +} + +func (bus *TypedBus[T]) Unsubscribe(topic string, ref SubscriptionRef) { + bus.Lock() + defer bus.Unlock() + if _, ok := bus.handlers[topic]; ok { + bus.handlers[topic].delete(ref) + } +} + +func (bus *TypedBus[T]) Publish(topic string, arg T) { + if subscribers, ok := bus.fetchSubscribers(topic); ok { + fire := make(chan *simpleHandler[T], len(*subscribers)) + bus.Lock() + for _, subscriber := range *subscribers { + fire <- subscriber + if subscriber.once { + bus.handlers[topic].delete(subscriber.reference) + } + } + bus.Unlock() + close(fire) + // calling the callbacks will not block the whole bus + for subscriber := range fire { + subscriber.Call(arg, &bus.wg) + } + } +} + +func (bus *TypedBus[T]) WaitAsync() { + bus.wg.Wait() +} + +func (bus *TypedBus[T]) subscribe(topic string, fn func(T), async, transactional, once bool) SubscriptionRef { + bus.Lock() + defer bus.Unlock() + bus.lastRef++ + if _, ok := bus.handlers[topic]; !ok { + bus.handlers[topic] = &listeners[T]{} + } + bus.handlers[topic].add(newSimpleHandler(fn, bus.lastRef, async, transactional, once)) + return bus.lastRef +} + +func (bus *TypedBus[T]) fetchSubscribers(topic string) (*listeners[T], bool) { + bus.RLock() + defer bus.RUnlock() + if handlers, ok := bus.handlers[topic]; ok && len(*handlers) > 0 { + return handlers, true + } + return nil, false +} diff --git a/simple_bus_test.go b/simple_bus_test.go new file mode 100644 index 0000000..70e8063 --- /dev/null +++ b/simple_bus_test.go @@ -0,0 +1,224 @@ +package EventBus + +import ( + "fmt" + "testing" + "time" +) + +func TestNewGeneric(t *testing.T) { + bus := NewSimpleBus[any]() + if bus == nil { + t.Log("New TypedBus not created!") + t.Fail() + } +} + +func TestSimpleHasCallback(t *testing.T) { + bus := NewSimpleBus[any]() + bus.Subscribe("topic", func(_ any) {}) + if bus.HasCallback("topic_topic") { + t.Fail() + } + if !bus.HasCallback("topic") { + t.Fail() + } +} + +func TestSimpleOnceAndManySubscribe(t *testing.T) { + bus := NewSimpleBus[any]() + topic := "topic" + flag := 0 + + fn := func(_ any) { flag += 1 } + + refFirst := bus.SubscribeOnce(topic, fn) + refSecond := bus.Subscribe(topic, fn) + refThird := bus.Subscribe(topic, fn) + + if refFirst == refSecond || refFirst == refThird || refSecond == refThird { + t.Fail() + } + + bus.Publish(topic, nil) + + if flag != 3 { + t.Fail() + } +} + +func TestSimpleUnsubscribe(t *testing.T) { + bus := NewSimpleBus[any]() + fn := func(_ any) {} + ref := bus.Subscribe("topic", fn) + if !bus.HasCallback("topic") { + t.Logf("Expected to have callback for topic but it is not present") + t.Fail() + } + bus.Unsubscribe("topic", ref) + if bus.HasCallback("topic") { + t.Logf("Expected to have no callback for topic after unsubscribe but it is present") + t.Fail() + } +} + +type accumulator struct { + val int +} + +func (a *accumulator) Handle(_ any) { + a.val++ +} + +func TestSimpleUnsubscribeMethod(t *testing.T) { + bus := NewSimpleBus[any]() + h := &accumulator{val: 0} + + ref := bus.Subscribe("topic", h.Handle) + bus.Publish("topic", nil) + bus.Unsubscribe("topic", ref) + bus.Publish("topic", nil) + + if h.val != 1 { + t.Fail() + } +} + +func TestSimplePublish(t *testing.T) { + bus := NewSimpleBus[int]() + bus.Subscribe("topic", func(a int) { + if a != 10 { + t.Fail() + } + }) + bus.Publish("topic", 10) +} + +func TestSimpleSubscribeOnceAsync(t *testing.T) { + value := 0 + + bus := NewSimpleBus[*int]() + bus.SubscribeOnceAsync("topic", func(a *int) { + *a++ + }) + + bus.Publish("topic", &value) + bus.Publish("topic", &value) + bus.WaitAsync() + + if value != 1 { + t.Fail() + } + + if bus.HasCallback("topic") { + t.Fail() + } +} + +func TestSimpleSubscribeAsyncTransactional(t *testing.T) { + started := make(chan struct{}) + defer close(started) + results := make([]int, 0) + type input struct { + a int + out *[]int + dur string + } + + bus := NewSimpleBus[input]() + bus.SubscribeAsync("topic", func(i input) { + started <- struct{}{} + sleep, _ := time.ParseDuration(i.dur) + time.Sleep(sleep) + *i.out = append(*i.out, i.a) + }, true) + + bus.Publish("topic", input{a: 1, out: &results, dur: "1s"}) + // started is used to enforce that the first message is started being processed before the second is sent + <-started + bus.Publish("topic", input{a: 2, out: &results, dur: "0s"}) + <-started + + bus.WaitAsync() + + if len(results) != 2 { + t.Logf("Expected 2 results, got %d", len(results)) + t.Fail() + } + + if results[0] != 1 || results[1] != 2 { + t.Logf("Expected [1 2] results, got %+v", results) + t.Fail() + } +} + +func TestSimpleSubscribeAsync(t *testing.T) { + ready := make(chan struct{}) + results := make(chan int) + + type input struct { + a int + out chan<- int + } + + bus := NewSimpleBus[input]() + bus.SubscribeAsync("topic", func(i input) { + i.out <- i.a + }, false) + + bus.Publish("topic", input{a: 1, out: results}) + bus.Publish("topic", input{a: 2, out: results}) + + numResults := 0 + + go func() { + for _ = range results { + numResults++ + } + close(ready) + }() + + bus.WaitAsync() + close(results) + <-ready + + if numResults != 2 { + t.Logf("Expected 2 results, got %d", numResults) + t.Fail() + } +} + +func BenchmarkSimpleSynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := NewSimpleBus[input]() + bus.Subscribe("topic", func(i input) {}) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } +} + +func BenchmarkSimpleAsynchronousPublishing(b *testing.B) { + type input struct { + number int + slice []int + name string + } + list := []int{1, 2, 3, 4, 5} + + bus := NewSimpleBus[input]() + bus.SubscribeAsync("topic", func(i input) {}, false) + + b.ResetTimer() + for range b.N { + bus.Publish("topic", input{1, list, "test"}) + } + bus.WaitAsync() +}