Skip to content

Commit 5cfa511

Browse files
authored
Keep event history in circular buffer (gammazero#316)
Storing event history in a circular buffer avoids memory allocation after the storage limit is reached. - Minor fixes to storage history logic - Order message type switch to check for most common messages first - Minor code improvements
1 parent dd076ea commit 5cfa511

File tree

6 files changed

+83
-85
lines changed

6 files changed

+83
-85
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
module github.com/gammazero/nexus/v3
22

3-
go 1.20
3+
go 1.21
44

55
require (
6+
github.com/gammazero/deque v0.2.1
67
github.com/gorilla/websocket v1.5.0
78
github.com/stretchr/testify v1.8.4
89
github.com/ugorji/go/codec v1.2.11

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
22
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
33
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
5+
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
46
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
57
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
68
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
9+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
710
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
811
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
912
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -18,5 +21,6 @@ golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
1821
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
1922
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2023
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
24+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2125
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
2226
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

router/broker.go

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/gammazero/deque"
78
"github.com/gammazero/nexus/v3/stdlog"
89
"github.com/gammazero/nexus/v3/wamp"
910
)
@@ -52,10 +53,13 @@ type historyEntry struct {
5253
}
5354

5455
type historyStore struct {
55-
entries []historyEntry
56-
matchPolicy string
57-
limit int
58-
isLimitReached bool
56+
entries deque.Deque[historyEntry]
57+
matchPolicy string
58+
limit int
59+
}
60+
61+
func (h *historyStore) atLimit() bool {
62+
return h.entries.Len() >= h.limit
5963
}
6064

6165
type broker struct {
@@ -149,10 +153,8 @@ func (b *broker) PreInitEventHistoryTopics(evntCfgs []*TopicEventHistoryConfig)
149153
sub, _ := b.syncInitSubscription(topicCfg.Topic, topicCfg.MatchPolicy, nil)
150154

151155
b.eventHistoryStore[sub] = &historyStore{
152-
entries: []historyEntry{},
153-
matchPolicy: topicCfg.MatchPolicy,
154-
limit: topicCfg.Limit,
155-
isLimitReached: false,
156+
matchPolicy: topicCfg.MatchPolicy,
157+
limit: topicCfg.Limit,
156158
}
157159

158160
}
@@ -383,12 +385,8 @@ func newSubscription(id wamp.ID, subscriber *wamp.Session, topic wamp.URI, match
383385
}
384386

385387
func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, event *wamp.Event) {
386-
387-
if eventStore.isLimitReached {
388-
eventStore.entries = eventStore.entries[1:]
389-
} else if len(eventStore.entries) >= eventStore.limit {
390-
eventStore.isLimitReached = true
391-
eventStore.entries = eventStore.entries[1:]
388+
if eventStore.atLimit() {
389+
eventStore.entries.PopFront()
392390
}
393391

394392
item := historyEntry{
@@ -403,7 +401,7 @@ func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, even
403401
},
404402
}
405403

406-
eventStore.entries = append(eventStore.entries, item)
404+
eventStore.entries.PushBack(item)
407405
}
408406

409407
func (b *broker) syncInitSubscription(topic wamp.URI, match string, subscriber *wamp.Session) (sub *subscription, existingSub bool) {
@@ -1138,7 +1136,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
11381136

11391137
fromPubOp, ok := msg.ArgumentsKw["from_publication"]
11401138
if ok {
1141-
fromPub = fromPubOp.(wamp.ID)
1139+
fromPub, ok = fromPubOp.(wamp.ID)
11421140
if !ok || fromPub < 1 {
11431141
return &wamp.Error{
11441142
Type: msg.MessageType(),
@@ -1152,7 +1150,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
11521150

11531151
afterPubOp, ok := msg.ArgumentsKw["after_publication"]
11541152
if ok {
1155-
afterPub = afterPubOp.(wamp.ID)
1153+
afterPub, ok = afterPubOp.(wamp.ID)
11561154
if !ok || afterPub < 1 {
11571155
return &wamp.Error{
11581156
Type: msg.MessageType(),
@@ -1165,7 +1163,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
11651163

11661164
beforePubOp, ok := msg.ArgumentsKw["before_publication"]
11671165
if ok {
1168-
beforePub = beforePubOp.(wamp.ID)
1166+
beforePub, ok = beforePubOp.(wamp.ID)
11691167
if !ok || beforePub < 1 {
11701168
return &wamp.Error{
11711169
Type: msg.MessageType(),
@@ -1178,7 +1176,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
11781176

11791177
untilPubOp, ok := msg.ArgumentsKw["until_publication"]
11801178
if ok {
1181-
untilPub = untilPubOp.(wamp.ID)
1179+
untilPub, ok = untilPubOp.(wamp.ID)
11821180
if !ok || untilPub < 1 {
11831181
return &wamp.Error{
11841182
Type: msg.MessageType(),
@@ -1189,19 +1187,20 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
11891187
}
11901188
}
11911189

1192-
ch := make(chan struct{})
1190+
done := make(chan struct{})
11931191
b.actionChan <- func() {
1192+
defer close(done)
1193+
11941194
var filteredEvents []storedEvent
11951195

11961196
if subscription, ok := b.subscriptions[subId]; ok {
11971197
if storeItem, ok := b.eventHistoryStore[subscription]; ok {
1198-
isLimitReached = storeItem.isLimitReached
1198+
isLimitReached = storeItem.atLimit()
11991199

1200-
fromPubReached := false
1201-
afterPubReached := false
1202-
untilPubReached := false
1200+
var untilPubReached bool
12031201

1204-
for _, entry := range storeItem.entries {
1202+
for i := 0; i < storeItem.entries.Len(); i++ {
1203+
entry := storeItem.entries.At(i)
12051204
if !fromDate.IsZero() && entry.event.timestamp.Before(fromDate) {
12061205
continue
12071206
}
@@ -1214,20 +1213,17 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
12141213
if !untilDate.IsZero() && entry.event.timestamp.After(untilDate) {
12151214
continue
12161215
}
1217-
if fromPub > 0 && !fromPubReached {
1216+
if fromPub != 0 {
12181217
if entry.event.Publication != fromPub {
12191218
continue
1220-
} else {
1221-
fromPubReached = true
12221219
}
1220+
fromPub = 0
12231221
}
1224-
if afterPub > 0 && !afterPubReached {
1225-
if entry.event.Publication != afterPub {
1226-
continue
1227-
} else {
1228-
afterPubReached = true
1229-
continue
1222+
if afterPub != 0 {
1223+
if entry.event.Publication == afterPub {
1224+
afterPub = 0
12301225
}
1226+
continue
12311227
}
12321228
if beforePub > 0 && entry.event.Publication == beforePub {
12331229
break
@@ -1270,9 +1266,8 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message {
12701266
}
12711267

12721268
events, _ = wamp.AsList(filteredEvents)
1273-
close(ch)
12741269
}
1275-
<-ch
1270+
<-done
12761271

12771272
return &wamp.Yield{
12781273
Request: msg.Request,

router/broker_test.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -699,48 +699,48 @@ func TestEventHistory(t *testing.T) {
699699
topic := wamp.URI("nexus.test.exact.topic")
700700
subscription := broker.topicSubscription[topic]
701701
subEvents := broker.eventHistoryStore[subscription].entries
702-
require.Equalf(t, 3, len(subEvents), "Store for topic %s should hold 3 records", topic)
703-
require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic)
704-
require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
705-
require.Equalf(t, 25509, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
706-
require.Equalf(t, "nexus.test.exact.topic", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
707-
require.Equalf(t, 25513, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
708-
require.Equalf(t, "nexus.test.exact.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
709-
require.Equalf(t, 25517, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
702+
require.Equalf(t, 3, subEvents.Len(), "Store for topic %s should hold 3 records", topic)
703+
require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic)
704+
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
705+
require.Equalf(t, 25509, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
706+
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
707+
require.Equalf(t, 25513, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
708+
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
709+
require.Equalf(t, 25517, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
710710

711711
topic = wamp.URI("nexus.test")
712712
subscription = broker.pfxTopicSubscription[topic]
713713
subEvents = broker.eventHistoryStore[subscription].entries
714-
require.Equalf(t, 4, len(subEvents), "Store for topic %s should hold 3 records", topic)
715-
require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic)
716-
require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
717-
require.Equalf(t, 25517, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
718-
require.Equalf(t, "nexus.test.prefix.catch", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
719-
require.Equalf(t, 25518, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
720-
require.Equalf(t, "nexus.test.wildcard.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
721-
require.Equalf(t, 25519, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
722-
require.Equalf(t, "nexus.test.wildcard.miss", subEvents[3].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
723-
require.Equalf(t, 25520, subEvents[3].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
714+
require.Equalf(t, 4, subEvents.Len(), "Store for topic %s should hold 3 records", topic)
715+
require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic)
716+
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
717+
require.Equalf(t, 25517, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
718+
require.Equalf(t, "nexus.test.prefix.catch", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
719+
require.Equalf(t, 25518, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
720+
require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
721+
require.Equalf(t, 25519, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
722+
require.Equalf(t, "nexus.test.wildcard.miss", subEvents.At(3).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
723+
require.Equalf(t, 25520, subEvents.At(3).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
724724

725725
topic = wamp.URI("nexus.test..topic")
726726
subscription = broker.wcTopicSubscription[topic]
727727
subEvents = broker.eventHistoryStore[subscription].entries
728-
require.Equalf(t, 4, len(subEvents), "Store for topic %s should hold 3 records", topic)
729-
require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic)
730-
require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
731-
require.Equalf(t, 25513, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
732-
require.Equalf(t, "nexus.test.wildcard.topic", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
733-
require.Equalf(t, 25515, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
734-
require.Equalf(t, "nexus.test.exact.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
735-
require.Equalf(t, 25517, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
736-
require.Equalf(t, "nexus.test.wildcard.topic", subEvents[3].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
737-
require.Equalf(t, 25519, subEvents[3].event.Arguments[0], "Event store for topic %s holds invalid event", topic)
728+
require.Equalf(t, 4, subEvents.Len(), "Store for topic %s should hold 3 records", topic)
729+
require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic)
730+
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
731+
require.Equalf(t, 25513, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
732+
require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
733+
require.Equalf(t, 25515, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
734+
require.Equalf(t, "nexus.test.exact.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
735+
require.Equalf(t, 25517, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
736+
require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(3).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic)
737+
require.Equalf(t, 25519, subEvents.At(3).event.Arguments[0], "Event store for topic %s holds invalid event", topic)
738738

739739
topic = wamp.URI("nexus")
740740
subscription = broker.pfxTopicSubscription[topic]
741741
subEvents = broker.eventHistoryStore[subscription].entries
742-
require.Equalf(t, 20, len(subEvents), "Store for topic %s should hold 20 records", topic)
743-
require.Falsef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should not be reached", topic)
742+
require.Equalf(t, 20, subEvents.Len(), "Store for topic %s should hold 20 records", topic)
743+
require.Falsef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should not be reached", topic)
744744

745745
//Now let's test Event History MetaRPCs
746746
topic = wamp.URI("nexus.test.exact.topic")
@@ -885,7 +885,7 @@ func TestEventHistory(t *testing.T) {
885885
// Let's test filtering based on publication ID
886886
topic = wamp.URI("nexus")
887887
subscription = broker.pfxTopicSubscription[topic]
888-
pubId := broker.eventHistoryStore[subscription].entries[4].event.Publication
888+
pubId := broker.eventHistoryStore[subscription].entries.At(4).event.Publication
889889
inv = wamp.Invocation{
890890
Request: wamp.ID(reqId),
891891
Registration: 0,

router/realm.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -472,21 +472,20 @@ func (r *realm) handleInboundMessages(sess *wamp.Session) (bool, bool, error) {
472472
switch msg := msg.(type) {
473473
case *wamp.Publish:
474474
r.broker.publish(sess, msg)
475+
case *wamp.Yield:
476+
r.dealer.yield(sess, msg)
477+
case *wamp.Call:
478+
r.dealer.call(sess, msg)
479+
case *wamp.Cancel:
480+
r.dealer.cancel(sess, msg)
475481
case *wamp.Subscribe:
476482
r.broker.subscribe(sess, msg)
477-
case *wamp.Unsubscribe:
478-
r.broker.unsubscribe(sess, msg)
479-
480483
case *wamp.Register:
481484
r.dealer.register(sess, msg)
485+
case *wamp.Unsubscribe:
486+
r.broker.unsubscribe(sess, msg)
482487
case *wamp.Unregister:
483488
r.dealer.unregister(sess, msg)
484-
case *wamp.Call:
485-
r.dealer.call(sess, msg)
486-
case *wamp.Yield:
487-
r.dealer.yield(sess, msg)
488-
case *wamp.Cancel:
489-
r.dealer.cancel(sess, msg)
490489

491490
case *wamp.Error:
492491
// An INVOCATION error is the only type of ERROR message the

wamp/session.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,21 @@ func (s *Session) HasFeature(role, feature string) bool {
7171
// by calling EndRecv.
7272
func (s *Session) RecvDone() <-chan struct{} {
7373
s.mu.Lock()
74+
defer s.mu.Unlock()
75+
7476
if s.done == nil {
7577
s.done = make(chan struct{})
7678
}
77-
d := s.done
78-
s.mu.Unlock()
79-
return d
79+
return s.done
8080
}
8181

8282
// If RecvDone is not yet closed, Goodbye returns nil.
8383
// If RecvDone is closed, Goodbye returns the GOODBYE message that was supplied
8484
// when RecvEnd was called.
8585
func (s *Session) Goodbye() *Goodbye {
8686
s.mu.Lock()
87-
g := s.goodbye
88-
s.mu.Unlock()
89-
return g
87+
defer s.mu.Unlock()
88+
return s.goodbye
9089
}
9190

9291
// EndRecv tells the session to signal messages handlers to stop receiving
@@ -96,8 +95,9 @@ func (s *Session) Goodbye() *Goodbye {
9695
// with exiting the message handler for other reasons.
9796
func (s *Session) EndRecv(goodbye *Goodbye) bool {
9897
s.mu.Lock()
98+
defer s.mu.Unlock()
99+
99100
if s.goodbye != nil {
100-
s.mu.Unlock()
101101
return false // already ended
102102
}
103103

@@ -112,7 +112,6 @@ func (s *Session) EndRecv(goodbye *Goodbye) bool {
112112
}
113113
close(s.done)
114114

115-
s.mu.Unlock()
116115
return true
117116
}
118117

0 commit comments

Comments
 (0)