Skip to content

Commit 6c791f7

Browse files
committed
derp: include src IPs in mesh watch messages
Updates tailscale/corp#13945 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
1 parent 7ed3681 commit 6c791f7

File tree

6 files changed

+73
-38
lines changed

6 files changed

+73
-38
lines changed

cmd/derper/mesh.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"log"
1111
"net"
12+
"net/netip"
1213
"strings"
1314
"time"
1415

@@ -67,7 +68,7 @@ func startMeshWithHost(s *derp.Server, host string) error {
6768
return d.DialContext(ctx, network, addr)
6869
})
6970

70-
add := func(k key.NodePublic) { s.AddPacketForwarder(k, c) }
71+
add := func(k key.NodePublic, _ netip.AddrPort) { s.AddPacketForwarder(k, c) }
7172
remove := func(k key.NodePublic) { s.RemovePacketForwarder(k, c) }
7273
go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove)
7374
return nil

derp/derp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ const (
8585

8686
// framePeerPresent is like framePeerGone, but for other
8787
// members of the DERP region when they're meshed up together.
88-
framePeerPresent = frameType(0x09) // 32B pub key of peer that's connected
88+
framePeerPresent = frameType(0x09) // 32B pub key of peer that's connected + optional 18B ip:port (16 byte IP + 2 byte BE uint16 port)
8989

9090
// frameWatchConns is how one DERP node in a regional mesh
9191
// subscribes to the others in the region.

derp/derp_client.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,12 @@ func (PeerGoneMessage) msg() {}
363363

364364
// PeerPresentMessage is a ReceivedMessage that indicates that the client
365365
// is connected to the server. (Only used by trusted mesh clients)
366-
type PeerPresentMessage key.NodePublic
366+
type PeerPresentMessage struct {
367+
// Key is the public key of the client.
368+
Key key.NodePublic
369+
// IPPort is the remote IP and port of the client.
370+
IPPort netip.AddrPort
371+
}
367372

368373
func (PeerPresentMessage) msg() {}
369374

@@ -546,8 +551,15 @@ func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err erro
546551
c.logf("[unexpected] dropping short peerPresent frame from DERP server")
547552
continue
548553
}
549-
pg := PeerPresentMessage(key.NodePublicFromRaw32(mem.B(b[:keyLen])))
550-
return pg, nil
554+
var msg PeerPresentMessage
555+
msg.Key = key.NodePublicFromRaw32(mem.B(b[:keyLen]))
556+
if n >= keyLen+16+2 {
557+
msg.IPPort = netip.AddrPortFrom(
558+
netip.AddrFrom16([16]byte(b[keyLen:keyLen+16])).Unmap(),
559+
binary.BigEndian.Uint16(b[keyLen+16:keyLen+16+2]),
560+
)
561+
}
562+
return msg, nil
551563

552564
case frameRecvPacket:
553565
var rp ReceivedPacket

derp/derp_server.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
crand "crypto/rand"
1313
"crypto/x509"
1414
"crypto/x509/pkix"
15+
"encoding/binary"
1516
"encoding/json"
1617
"errors"
1718
"expvar"
@@ -43,6 +44,7 @@ import (
4344
"tailscale.com/tstime/rate"
4445
"tailscale.com/types/key"
4546
"tailscale.com/types/logger"
47+
"tailscale.com/util/set"
4648
"tailscale.com/version"
4749
)
4850

@@ -150,7 +152,7 @@ type Server struct {
150152
closed bool
151153
netConns map[Conn]chan struct{} // chan is closed when conn closes
152154
clients map[key.NodePublic]clientSet
153-
watchers map[*sclient]bool // mesh peer -> true
155+
watchers set.Set[*sclient] // mesh peers
154156
// clientsMesh tracks all clients in the cluster, both locally
155157
// and to mesh peers. If the value is nil, that means the
156158
// peer is only local (and thus in the clients Map, but not
@@ -219,8 +221,7 @@ func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) }
219221
// All fields are guarded by Server.mu.
220222
type dupClientSet struct {
221223
// set is the set of connected clients for sclient.key.
222-
// The values are all true.
223-
set map[*sclient]bool
224+
set set.Set[*sclient]
224225

225226
// last is the most recent addition to set, or nil if the most
226227
// recent one has since disconnected and nobody else has send
@@ -261,7 +262,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool {
261262

262263
trim := s.sendHistory[:0]
263264
for _, v := range s.sendHistory {
264-
if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) {
265+
if s.set.Contains(v) && (len(trim) == 0 || trim[len(trim)-1] != v) {
265266
trim = append(trim, v)
266267
}
267268
}
@@ -316,7 +317,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
316317
clientsMesh: map[key.NodePublic]PacketForwarder{},
317318
netConns: map[Conn]chan struct{}{},
318319
memSys0: ms.Sys,
319-
watchers: map[*sclient]bool{},
320+
watchers: set.Set[*sclient]{},
320321
sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
321322
avgQueueDuration: new(uint64),
322323
tcpRtt: metrics.LabelMap{Label: "le"},
@@ -498,23 +499,23 @@ func (s *Server) registerClient(c *sclient) {
498499
s.mu.Lock()
499500
defer s.mu.Unlock()
500501

501-
set := s.clients[c.key]
502-
switch set := set.(type) {
502+
curSet := s.clients[c.key]
503+
switch curSet := curSet.(type) {
503504
case nil:
504505
s.clients[c.key] = singleClient{c}
505506
c.debugLogf("register single client")
506507
case singleClient:
507508
s.dupClientKeys.Add(1)
508509
s.dupClientConns.Add(2) // both old and new count
509510
s.dupClientConnTotal.Add(1)
510-
old := set.ActiveClient()
511+
old := curSet.ActiveClient()
511512
old.isDup.Store(true)
512513
c.isDup.Store(true)
513514
s.clients[c.key] = &dupClientSet{
514515
last: c,
515-
set: map[*sclient]bool{
516-
old: true,
517-
c: true,
516+
set: set.Set[*sclient]{
517+
old: struct{}{},
518+
c: struct{}{},
518519
},
519520
sendHistory: []*sclient{old},
520521
}
@@ -523,9 +524,9 @@ func (s *Server) registerClient(c *sclient) {
523524
s.dupClientConns.Add(1) // the gauge
524525
s.dupClientConnTotal.Add(1) // the counter
525526
c.isDup.Store(true)
526-
set.set[c] = true
527-
set.last = c
528-
set.sendHistory = append(set.sendHistory, c)
527+
curSet.set.Add(c)
528+
curSet.last = c
529+
curSet.sendHistory = append(curSet.sendHistory, c)
529530
c.debugLogf("register another duplicate client")
530531
}
531532

@@ -534,17 +535,21 @@ func (s *Server) registerClient(c *sclient) {
534535
}
535536
s.keyOfAddr[c.remoteIPPort] = c.key
536537
s.curClients.Add(1)
537-
s.broadcastPeerStateChangeLocked(c.key, true)
538+
s.broadcastPeerStateChangeLocked(c.key, c.remoteIPPort, true)
538539
}
539540

540541
// broadcastPeerStateChangeLocked enqueues a message to all watchers
541542
// (other DERP nodes in the region, or trusted clients) that peer's
542543
// presence changed.
543544
//
544545
// s.mu must be held.
545-
func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, present bool) {
546+
func (s *Server) broadcastPeerStateChangeLocked(peer key.NodePublic, ipPort netip.AddrPort, present bool) {
546547
for w := range s.watchers {
547-
w.peerStateChange = append(w.peerStateChange, peerConnState{peer: peer, present: present})
548+
w.peerStateChange = append(w.peerStateChange, peerConnState{
549+
peer: peer,
550+
present: present,
551+
ipPort: ipPort,
552+
})
548553
go w.requestMeshUpdate()
549554
}
550555
}
@@ -565,7 +570,7 @@ func (s *Server) unregisterClient(c *sclient) {
565570
delete(s.clientsMesh, c.key)
566571
s.notePeerGoneFromRegionLocked(c.key)
567572
}
568-
s.broadcastPeerStateChangeLocked(c.key, false)
573+
s.broadcastPeerStateChangeLocked(c.key, netip.AddrPort{}, false)
569574
case *dupClientSet:
570575
c.debugLogf("removed duplicate client")
571576
if set.removeClient(c) {
@@ -655,13 +660,21 @@ func (s *Server) addWatcher(c *sclient) {
655660
defer s.mu.Unlock()
656661

657662
// Queue messages for each already-connected client.
658-
for peer := range s.clients {
659-
c.peerStateChange = append(c.peerStateChange, peerConnState{peer: peer, present: true})
663+
for peer, clientSet := range s.clients {
664+
ac := clientSet.ActiveClient()
665+
if ac == nil {
666+
continue
667+
}
668+
c.peerStateChange = append(c.peerStateChange, peerConnState{
669+
peer: peer,
670+
present: true,
671+
ipPort: ac.remoteIPPort,
672+
})
660673
}
661674

662675
// And enroll the watcher in future updates (of both
663676
// connections & disconnections).
664-
s.watchers[c] = true
677+
s.watchers.Add(c)
665678

666679
go c.requestMeshUpdate()
667680
}
@@ -1349,6 +1362,7 @@ type sclient struct {
13491362
type peerConnState struct {
13501363
peer key.NodePublic
13511364
present bool
1365+
ipPort netip.AddrPort // if present, the peer's IP:port
13521366
}
13531367

13541368
// pkt is a request to write a data frame to an sclient.
@@ -1542,12 +1556,18 @@ func (c *sclient) sendPeerGone(peer key.NodePublic, reason PeerGoneReasonType) e
15421556
}
15431557

15441558
// sendPeerPresent sends a peerPresent frame, without flushing.
1545-
func (c *sclient) sendPeerPresent(peer key.NodePublic) error {
1559+
func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort) error {
15461560
c.setWriteDeadline()
1547-
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, keyLen); err != nil {
1561+
const frameLen = keyLen + 16 + 2
1562+
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, frameLen); err != nil {
15481563
return err
15491564
}
1550-
_, err := c.bw.Write(peer.AppendTo(nil))
1565+
payload := make([]byte, frameLen)
1566+
_ = peer.AppendTo(payload[:0])
1567+
a16 := ipPort.Addr().As16()
1568+
copy(payload[keyLen:], a16[:])
1569+
binary.BigEndian.PutUint16(payload[keyLen+16:], ipPort.Port())
1570+
_, err := c.bw.Write(payload)
15511571
return err
15521572
}
15531573

@@ -1566,7 +1586,7 @@ func (c *sclient) sendMeshUpdates() error {
15661586
}
15671587
var err error
15681588
if pcs.present {
1569-
err = c.sendPeerPresent(pcs.peer)
1589+
err = c.sendPeerPresent(pcs.peer, pcs.ipPort)
15701590
} else {
15711591
err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected)
15721592
}

derp/derp_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestSendRecv(t *testing.T) {
9292
defer cancel()
9393

9494
brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
95-
go s.Accept(ctx, cin, brwServer, fmt.Sprintf("test-client-%d", i))
95+
go s.Accept(ctx, cin, brwServer, fmt.Sprintf("[abc::def]:%v", i))
9696

9797
key := clientPrivateKeys[i]
9898
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))
@@ -528,7 +528,7 @@ func newTestServer(t *testing.T, ctx context.Context) *testServer {
528528
// TODO: register c in ts so Close also closes it?
529529
go func(i int) {
530530
brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c))
531-
go s.Accept(ctx, c, brwServer, fmt.Sprintf("test-client-%d", i))
531+
go s.Accept(ctx, c, brwServer, c.RemoteAddr().String())
532532
}(i)
533533
}
534534
}()
@@ -615,14 +615,15 @@ func (tc *testClient) wantPresent(t *testing.T, peers ...key.NodePublic) {
615615
}
616616
switch m := m.(type) {
617617
case PeerPresentMessage:
618-
got := key.NodePublic(m)
618+
got := m.Key
619619
if !want[got] {
620620
t.Fatalf("got peer present for %v; want present for %v", tc.ts.keyName(got), logger.ArgWriter(func(bw *bufio.Writer) {
621621
for _, pub := range peers {
622622
fmt.Fprintf(bw, "%s ", tc.ts.keyName(pub))
623623
}
624624
}))
625625
}
626+
t.Logf("got present with IP %v", m.IPPort)
626627
delete(want, got)
627628
if len(want) == 0 {
628629
return

derp/derphttp/mesh_client.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package derphttp
55

66
import (
77
"context"
8+
"net/netip"
89
"sync"
910
"time"
1011

@@ -26,7 +27,7 @@ import (
2627
//
2728
// To force RunWatchConnectionLoop to return quickly, its ctx needs to
2829
// be closed, and c itself needs to be closed.
29-
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add, remove func(key.NodePublic)) {
30+
func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(key.NodePublic, netip.AddrPort), remove func(key.NodePublic)) {
3031
if infoLogf == nil {
3132
infoLogf = logger.Discard
3233
}
@@ -68,9 +69,9 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
6869
})
6970
defer timer.Stop()
7071

71-
updatePeer := func(k key.NodePublic, isPresent bool) {
72+
updatePeer := func(k key.NodePublic, ipPort netip.AddrPort, isPresent bool) {
7273
if isPresent {
73-
add(k)
74+
add(k, ipPort)
7475
} else {
7576
remove(k)
7677
}
@@ -126,7 +127,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
126127
}
127128
switch m := m.(type) {
128129
case derp.PeerPresentMessage:
129-
updatePeer(key.NodePublic(m), true)
130+
updatePeer(m.Key, m.IPPort, true)
130131
case derp.PeerGoneMessage:
131132
switch m.Reason {
132133
case derp.PeerGoneReasonDisconnected:
@@ -138,7 +139,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
138139
logf("Recv: peer %s not at server %s for unknown reason %v",
139140
key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason)
140141
}
141-
updatePeer(key.NodePublic(m.Peer), false)
142+
updatePeer(key.NodePublic(m.Peer), netip.AddrPort{}, false)
142143
default:
143144
continue
144145
}

0 commit comments

Comments
 (0)