From 5d8743500d05feb3ce37057e001e70cf2c53b3a3 Mon Sep 17 00:00:00 2001 From: britterm Date: Mon, 23 Sep 2019 08:35:37 -0600 Subject: [PATCH 1/4] add GossiperMaker, GetGossip(), and UTs --- gossip_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ router.go | 31 +++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/gossip_test.go b/gossip_test.go index dd9ae70..0aab708 100644 --- a/gossip_test.go +++ b/gossip_test.go @@ -196,6 +196,58 @@ func TestGossipSurrogate(t *testing.T) { g3.checkHas(t, 1, 2) } +type testGossiperMaker struct{} + +func (t *testGossiperMaker) MakeGossiper(channelName string, router *Router) (Gossiper, error) { + return newTestGossiper(), nil +} + +func TestGossiperMaker(t *testing.T) { + // create the topology r1 <-> r2 <-> r3 + r1 := newTestRouter(t, "01:00:00:01:00:00") + r2 := newTestRouter(t, "02:00:00:02:00:00") + r3 := newTestRouter(t, "03:00:00:03:00:00") + + // auto-create a gossiper at the far end, but not the middle + r3.GossiperMaker = &testGossiperMaker{} + + routers := []*Router{r1, r2, r3} + addTestGossipConnection(r1, r2) + addTestGossipConnection(r3, r2) + flushAndCheckTopology(t, routers, r1.tp(r2), r2.tp(r1, r3), r3.tp(r2)) + + // create a gossiper at the near end + g1 := newTestGossiper() + s1, err := r1.NewGossip("Test", g1) + require.NoError(t, err) + + // broadcast a message from the near end, check it reaches the far end + broadcast(s1, 1) + sendPendingGossip(r1, r2, r3) + + // ensure g3 has an auto-created gossip + var g3 *testGossiper + s3 := r3.GetGossip("Test") + switch s3.(type) { + case *gossipChannel: + switch s3.(*gossipChannel).gossiper.(type) { + case *testGossiper: + log.Println("test gossiper created!") + g3 = s3.(*gossipChannel).gossiper.(*testGossiper) + g3.checkHas(t, 1) + default: + t.Fatal("r3 did not auto-create a testGossiper using GossiperMaker") + } + default: + t.Fatal("r3 did not create a gossipChannel") + } + + // send it back and check it reaches the near end + broadcast(s3, 2) + sendPendingGossip(r1, r2, r3) + g1.checkHas(t, 2) +} + type testGossiper struct { sync.RWMutex state map[byte]struct{} diff --git a/router.go b/router.go index cfc7987..8e49921 100644 --- a/router.go +++ b/router.go @@ -41,6 +41,11 @@ type Config struct { GossipInterval *time.Duration } +// GossiperMaker is an interface to create a Gossiper instance +type GossiperMaker interface { + MakeGossiper(channelName string, router *Router) (Gossiper, error) +} + // Router manages communication between this peer and the rest of the mesh. // Router implements Gossiper. type Router struct { @@ -50,6 +55,7 @@ type Router struct { Peers *Peers Routes *routes ConnectionMaker *connectionMaker + GossiperMaker GossiperMaker gossipLock sync.RWMutex gossipChannels gossipChannels topologyGossip Gossip @@ -144,6 +150,13 @@ func (router *Router) NewGossip(channelName string, g Gossiper) (Gossip, error) return channel, nil } +// GetGossip returns a GossipChannel from the router, or nil if the channel has not been seen/created +func (router *Router) GetGossip(channelName string) Gossip { + router.gossipLock.Lock() + defer router.gossipLock.Unlock() + return router.gossipChannels[channelName] +} + func (router *Router) gossipChannel(channelName string) *gossipChannel { router.gossipLock.RLock() channel, found := router.gossipChannels[channelName] @@ -156,7 +169,23 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel { if channel, found = router.gossipChannels[channelName]; found { return channel } - channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{router: router}, router.logger) + // unknown channel - do we have a GossiperMaker? + var gossiper Gossiper + if router.GossiperMaker != nil { + // use the GossiperMaker to make the surrogate channel + g, err := router.GossiperMaker.MakeGossiper(channelName, router) + if err != nil { + router.logger.Printf("unable to create gossiper, falling back to surrogateGossiper: %s", err.Error()) + } else { + router.logger.Printf("created custom surrogate gossiper") + gossiper = g + } + } + if gossiper == nil { + // default surrogate channel + gossiper = &surrogateGossiper{router: router} + } + channel = newGossipChannel(channelName, router.Ourself, router.Routes, gossiper, router.logger) channel.logf("created surrogate channel") router.gossipChannels[channelName] = channel return channel From 060956d30fbdbfe4d67c1beaac4c18c32a0c6d5d Mon Sep 17 00:00:00 2001 From: britterm Date: Mon, 23 Sep 2019 08:48:20 -0600 Subject: [PATCH 2/4] remove extra print --- gossip_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/gossip_test.go b/gossip_test.go index 0aab708..e074aa7 100644 --- a/gossip_test.go +++ b/gossip_test.go @@ -232,7 +232,6 @@ func TestGossiperMaker(t *testing.T) { case *gossipChannel: switch s3.(*gossipChannel).gossiper.(type) { case *testGossiper: - log.Println("test gossiper created!") g3 = s3.(*gossipChannel).gossiper.(*testGossiper) g3.checkHas(t, 1) default: From 5c353becbc88c645ea42e6b22c5fb23992e3e418 Mon Sep 17 00:00:00 2001 From: britterm Date: Tue, 22 Oct 2019 16:57:55 -0600 Subject: [PATCH 3/4] simplify GossiperMaker --- gossip_test.go | 4 ++-- router.go | 13 +++---------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/gossip_test.go b/gossip_test.go index e074aa7..bd70aed 100644 --- a/gossip_test.go +++ b/gossip_test.go @@ -198,8 +198,8 @@ func TestGossipSurrogate(t *testing.T) { type testGossiperMaker struct{} -func (t *testGossiperMaker) MakeGossiper(channelName string, router *Router) (Gossiper, error) { - return newTestGossiper(), nil +func (t *testGossiperMaker) MakeGossiper(channelName string, router *Router) Gossiper { + return newTestGossiper() } func TestGossiperMaker(t *testing.T) { diff --git a/router.go b/router.go index 8e49921..c025217 100644 --- a/router.go +++ b/router.go @@ -43,7 +43,7 @@ type Config struct { // GossiperMaker is an interface to create a Gossiper instance type GossiperMaker interface { - MakeGossiper(channelName string, router *Router) (Gossiper, error) + MakeGossiper(channelName string, router *Router) Gossiper } // Router manages communication between this peer and the rest of the mesh. @@ -173,15 +173,8 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel { var gossiper Gossiper if router.GossiperMaker != nil { // use the GossiperMaker to make the surrogate channel - g, err := router.GossiperMaker.MakeGossiper(channelName, router) - if err != nil { - router.logger.Printf("unable to create gossiper, falling back to surrogateGossiper: %s", err.Error()) - } else { - router.logger.Printf("created custom surrogate gossiper") - gossiper = g - } - } - if gossiper == nil { + gossiper = router.GossiperMaker.MakeGossiper(channelName, router) + } else { // default surrogate channel gossiper = &surrogateGossiper{router: router} } From 1dc4b1037ee0c9eb1f5972e3969f603e31ed963d Mon Sep 17 00:00:00 2001 From: britterm Date: Fri, 27 Mar 2020 09:57:09 -0600 Subject: [PATCH 4/4] properly support ipv6 --- connection_maker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connection_maker.go b/connection_maker.go index df8d04c..99a28ee 100644 --- a/connection_maker.go +++ b/connection_maker.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "net" + "strconv" "time" "unicode" ) @@ -93,7 +94,7 @@ func (cm *connectionMaker) InitiateConnections(peers []string, replace bool) []e } if host == "" || !isAlnum(port) { errors = append(errors, fmt.Errorf("invalid peer name %q, should be host[:port]", peer)) - } else if addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%s", host, port)); err != nil { + } else if addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)); err != nil { errors = append(errors, err) } else { addrs[peer] = addr @@ -331,7 +332,7 @@ func (cm *connectionMaker) addPeerTargets(ourConnectedPeers peerNameSet, addTarg // ephemeral) remote port of an inbound connection // that some peer has. Let's try to connect on the // weave port instead. - addTarget(fmt.Sprintf("%s:%d", ip, cm.port)) + addTarget(net.JoinHostPort(ip, strconv.Itoa(cm.port))) } } })