diff --git a/Makefile b/Makefile index 91c9ca9cc..cb56a0893 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,8 @@ else SHELL := /bin/bash endif -.PHONY : check lint install-linters dep test test-e2e test-e2e-build test-e2e-run test-e2e-test test-e2e-stop test-e2e-clean build +.PHONY : check lint install-linters dep tidy test test-e2e test-e2e-build test-e2e-run test-e2e-test test-e2e-stop test-e2e-clean build +.PHONY : update-dep update-skywire update-skycoin push-deps sync-upstream-develop VERSION := $(shell git describe --always) @@ -101,7 +102,7 @@ install-linters-windows: ## Install linters on windows ${OPTS} go install golang.org/x/tools/cmd/goimports@latest ${OPTS} go install github.com/incu6us/goimports-reviser@latest -format: ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters). +format: tidy ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters). ${OPTS} goimports -w -local ${DMSG_REPO} ./pkg ./cmd ./internal ./examples find . -type f -name '*.go' -not -path "./.git/*" -not -path "./vendor/*" -exec goimports-reviser -project-name ${DMSG_REPO} {} \; @@ -109,9 +110,78 @@ format: ## Formats the code. Must have goimports and goimports-reviser installed format-windows: ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters-windows). powershell -Command .\scripts\format-windows.ps1 -dep: ## Sorts dependencies +tidy: ## Tidies dependencies + ${OPTS} go mod tidy -v + +dep: tidy ## Sorts and vendors dependencies + ${OPTS} go mod vendor -v + +update-dep: ## Update all dependencies to latest versions, vendor, and commit + ${OPTS} go get -v -u ./... + ${OPTS} go mod tidy -v ${OPTS} go mod vendor -v + git add go.mod go.sum vendor + git diff --cached --quiet || git commit -m "update deps" + +update-skywire: ## Update skywire to latest develop branch + @echo "Updating skywire to latest develop..." + ${OPTS} go get -v github.com/skycoin/skywire@develop ${OPTS} go mod tidy -v + ${OPTS} go mod vendor -v + @echo "skywire updated successfully" + +update-skycoin: ## Update skycoin to latest develop branch + @echo "Updating skycoin to latest develop..." + ${OPTS} go get -v github.com/skycoin/skycoin@develop + ${OPTS} go mod tidy -v + ${OPTS} go mod vendor -v + @echo "skycoin updated successfully" + +push-deps: ## Commit and push dependency updates + @echo "Committing dependency updates..." + git add go.mod go.sum vendor + git diff --cached --quiet || git commit -m "update deps" + git push + @echo "Dependencies pushed successfully" + +sync-upstream-develop: ## Sync local develop branch with upstream skycoin/dmsg develop + @normalize() { \ + echo "$$1" | sed \ + -e 's|git@github.com:|https://github.com/|' \ + -e 's|ssh://github.com/|https://github.com/|' \ + -e 's|\.git$$||' \ + -e 's|https://github.com/||' \ + | tr '[:upper:]' '[:lower:]'; \ + }; \ + UPSTREAM_URL=$$(git remote get-url upstream 2>/dev/null); \ + if [ -z "$$UPSTREAM_URL" ]; then \ + echo "[error] no 'upstream' remote found. Add it with:"; \ + echo " git remote add upstream https://github.com/skycoin/dmsg.git"; \ + exit 1; \ + fi; \ + UPSTREAM_NORM=$$(normalize "$$UPSTREAM_URL"); \ + if [ "$$UPSTREAM_NORM" != "skycoin/dmsg" ]; then \ + echo "[error] upstream remote does not point to skycoin/dmsg."; \ + echo " Found: $$UPSTREAM_URL"; \ + exit 1; \ + fi; \ + ORIGIN_URL=$$(git remote get-url origin 2>/dev/null); \ + if [ -z "$$ORIGIN_URL" ]; then \ + echo "[error] no 'origin' remote found."; \ + exit 1; \ + fi; \ + ORIGIN_NORM=$$(normalize "$$ORIGIN_URL"); \ + if [ "$$ORIGIN_NORM" = "skycoin/dmsg" ]; then \ + echo "[error] origin points to skycoin/dmsg directly."; \ + echo " This target must be run from a fork, not the canonical repo."; \ + exit 1; \ + fi; \ + echo "[ok] origin is a fork ($$ORIGIN_NORM), upstream is skycoin/dmsg — syncing develop..."; \ + git checkout develop && \ + git pull && \ + git fetch upstream && \ + git merge upstream/develop && \ + git push install: ## Install `dmsg-discovery`, `dmsg-server`, `dmsgcurl`,`dmsgpty-cli`, `dmsgpty-host`, `dmsgpty-ui` ${OPTS} go install ${BUILD_OPTS} ./cmd/* diff --git a/cmd/dmsg-server/commands/start/root.go b/cmd/dmsg-server/commands/start/root.go index 605ddd80b..627af2eb6 100644 --- a/cmd/dmsg-server/commands/start/root.go +++ b/cmd/dmsg-server/commands/start/root.go @@ -97,10 +97,17 @@ var RootCmd = &cobra.Command{ srvAPI := dmsgserver.NewServerAPI(r, log, m) + // Convert peer config to dmsg.PeerEntry. + var peers []dmsg.PeerEntry + for _, p := range conf.Peers { + peers = append(peers, dmsg.PeerEntry{PK: p.PubKey, Addr: p.Address}) + } + srvConf := dmsg.ServerConfig{ MaxSessions: conf.MaxSessions, UpdateInterval: conf.UpdateInterval, AuthPassphrase: authPassphrase, + Peers: peers, } srv := dmsg.NewServer(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.Discovery, &http.Client{}, log), &srvConf, m) srv.SetLogger(log) diff --git a/cmd/dmsg-socks5/commands/dmsg-socks5.go b/cmd/dmsg-socks5/commands/dmsg-socks5.go index 0ced2af61..b8ee17629 100644 --- a/cmd/dmsg-socks5/commands/dmsg-socks5.go +++ b/cmd/dmsg-socks5/commands/dmsg-socks5.go @@ -4,6 +4,7 @@ package commands import ( "context" "fmt" + "net" "net/http" "os" "strings" @@ -138,7 +139,6 @@ var serveCmd = &cobra.Command{ ctx, cancel := cmdutil.SignalContext(context.Background(), dlog) defer cancel() - //TODO: implement whitelist logic dmsgC, closeDmsg, err := dmsgclient.InitDmsgWithFlags(ctx, dlog, pk, sk, httpClient, pk.String()) @@ -182,6 +182,28 @@ var serveCmd = &cobra.Command{ } dlog.Infof("Accepted connection from: %s", respConn.RemoteAddr()) + // Enforce whitelist: extract remote PK from the dmsg address. + if len(wlkeys) > 0 { + remotePK, _, splitErr := net.SplitHostPort(respConn.RemoteAddr().String()) + if splitErr != nil { + dlog.WithError(splitErr).Warn("Failed to parse remote address, rejecting connection.") + respConn.Close() //nolint:errcheck,gosec + continue + } + allowed := false + for _, key := range wlkeys { + if remotePK == key.String() { + allowed = true + break + } + } + if !allowed { + dlog.WithField("remote_pk", remotePK).Warn("Connection rejected: not in whitelist.") + respConn.Close() //nolint:errcheck,gosec + continue + } + } + conf := &socks5.Config{} server, err := socks5.New(conf) if err != nil { diff --git a/cmd/dmsg/commands/kill.go b/cmd/dmsg/commands/kill.go deleted file mode 100644 index 7ee118cb0..000000000 --- a/cmd/dmsg/commands/kill.go +++ /dev/null @@ -1,27 +0,0 @@ -// Package commands cmd/dmsg/commands/kill.go -package commands - -import ( - "os" - "os/signal" - "syscall" -) - -func init() { - // TEMPORARY WORKAROUND: Force exit on Ctrl+C after 3 attempts - // This can be removed once the proper signal handling fixes are verified: - // - dmsgC.Serve() now uses signal-aware context (not context.Background()) - // - Accept loops now check for context cancellation - // - HTTP servers now shutdown gracefully - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - sigCount := 0 - for range c { - sigCount++ - if sigCount >= 3 { - os.Exit(1) - } - } - }() -} diff --git a/go.mod b/go.mod index e0a71614f..a81da9a60 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/bytedance/gopkg v0.1.4 // indirect github.com/bytedance/sonic v1.15.0 // indirect - github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/bytedance/sonic/loader v0.5.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/containerd/errdefs v1.0.0 // indirect @@ -53,7 +53,7 @@ require ( github.com/fatih/color v1.19.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.13 // indirect - github.com/gin-contrib/sse v1.1.0 // indirect + github.com/gin-contrib/sse v1.1.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect diff --git a/go.sum b/go.sum index b8b28e230..b6c7ad951 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/bytedance/gopkg v0.1.4 h1:oZnQwnX82KAIWb7033bEwtxvTqXcYMxDBaQxo5JJHWM github.com/bytedance/gopkg v0.1.4/go.mod h1:v1zWfPm21Fb+OsyXN2VAHdL6TBb2L88anLQgdyje6R4= github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= -github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= -github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= +github.com/bytedance/sonic/loader v0.5.1 h1:Ygpfa9zwRCCKSlrp5bBP/b/Xzc3VxsAW+5NIYXrOOpI= +github.com/bytedance/sonic/loader v0.5.1/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= @@ -61,8 +61,8 @@ github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= -github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= -github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= +github.com/gin-contrib/sse v1.1.1 h1:uGYpNwTacv5R68bSGMapo62iLTRa9l5zxGCps4hK6ko= +github.com/gin-contrib/sse v1.1.1/go.mod h1:QXzuVkA0YO7o/gun03UI1Q+FTI8ZV/n5t03kIQAI89s= github.com/gin-gonic/gin v1.12.0 h1:b3YAbrZtnf8N//yjKeU2+MQsh2mY5htkZidOM7O0wG8= github.com/gin-gonic/gin v1.12.0/go.mod h1:VxccKfsSllpKshkBWgVgRniFFAzFb9csfngsqANjnLc= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index ec4e53471..5e5a0f44d 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -82,6 +82,7 @@ type Client struct { errCh chan error done chan struct{} once sync.Once + wg sync.WaitGroup // tracks background goroutines for clean shutdown sesMx sync.Mutex } @@ -330,8 +331,7 @@ func (ce *Client) discoverServers(ctx context.Context, all bool) (entries []*dis return entries, err } -// Close closes the dmsg client entity. -// TODO(evanlinjin): Have waitgroup. +// Close closes the dmsg client entity and waits for background goroutines to finish. func (ce *Client) Close() error { if ce == nil { return nil @@ -354,6 +354,7 @@ func (ce *Client) Close() error { ce.log.Debug("All sessions closed.") ce.sessionsMx.Unlock() ce.porter.CloseAll(ce.log) + ce.wg.Wait() err = ce.EntityCommon.delEntry(context.Background()) }) return err diff --git a/pkg/dmsg/client_dial.go b/pkg/dmsg/client_dial.go index 947b5c1e2..cf8fa6082 100644 --- a/pkg/dmsg/client_dial.go +++ b/pkg/dmsg/client_dial.go @@ -33,8 +33,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { return nil, err } - // Range client's delegated servers. - // Try existing sessions first, falling back to next server on failure. + // 1. Try existing sessions to the target's delegated servers (direct path, cheapest). for _, srvPK := range entry.Client.DelegatedServers { if dSes, ok := ce.clientSession(ce.porter, srvPK); ok { stream, err := dSes.DialStream(addr) @@ -47,8 +46,22 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { } } - // Range client's delegated servers. - // Attempt to connect to a delegated server. + // 2. Try all other existing sessions (mesh path — already connected, no new handshake). + // If servers are meshed, our server forwards the request to the target's server. + for _, ses := range ce.allClientSessions(ce.porter) { + if hasPK(entry.Client.DelegatedServers, ses.RemotePK()) { + continue // already tried above + } + stream, err := ses.DialStream(addr) + if err != nil { + ce.log.WithError(err).WithField("server", ses.RemotePK()). + Debug("DialStream failed via mesh, trying next server") + continue + } + return stream, nil + } + + // 3. Last resort: establish new sessions to the target's delegated servers. for _, srvPK := range entry.Client.DelegatedServers { dSes, err := ce.EnsureAndObtainSession(ctx, srvPK) if err != nil { diff --git a/pkg/dmsg/client_sessions.go b/pkg/dmsg/client_sessions.go index de5534385..1c8ae3587 100644 --- a/pkg/dmsg/client_sessions.go +++ b/pkg/dmsg/client_sessions.go @@ -113,7 +113,9 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client return ClientSession{}, errors.New("session already exists") } + ce.wg.Add(1) go func() { + defer ce.wg.Done() defer func() { if r := recover(); r != nil { ce.log.Warnf("recovered panic in session serve goroutine: %v", r) diff --git a/pkg/dmsg/const.go b/pkg/dmsg/const.go index 331818eb6..03362b355 100644 --- a/pkg/dmsg/const.go +++ b/pkg/dmsg/const.go @@ -21,7 +21,7 @@ const ( DefaultUpdateInterval = time.Minute - DefaultMaxSessions = 100 + DefaultMaxSessions = 2048 DefaultDmsgHTTPPort = uint16(80) diff --git a/pkg/dmsg/entity_common.go b/pkg/dmsg/entity_common.go index 1b700d3e2..a55f3ae8f 100644 --- a/pkg/dmsg/entity_common.go +++ b/pkg/dmsg/entity_common.go @@ -36,6 +36,10 @@ type EntityCommon struct { setSessionCallback func(ctx context.Context) error delSessionCallback func(ctx context.Context) error + + // peerSessionsFunc returns peer server sessions for mesh forwarding. + // Only set on Server entities; nil for clients. + peerSessionsFunc func() []*SessionCommon } func (c *EntityCommon) init(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, log logrus.FieldLogger, updateInterval time.Duration) { @@ -84,6 +88,19 @@ func (c *EntityCommon) serverSession(pk cipher.PubKey) (ServerSession, bool) { return ServerSession{SessionCommon: ses}, ok } +// peerServerSessions returns all peer server sessions for mesh forwarding. +func (c *EntityCommon) peerServerSessions() []ServerSession { + if c.peerSessionsFunc == nil { + return nil + } + raw := c.peerSessionsFunc() + sessions := make([]ServerSession, len(raw)) + for i, ses := range raw { + sessions[i] = ServerSession{SessionCommon: ses} + } + return sessions +} + // clientSession obtains a session as a client. func (c *EntityCommon) clientSession(porter *netutil.Porter, pk cipher.PubKey) (ClientSession, bool) { ses, ok := c.session(pk) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 8ed37950d..642b0713e 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -17,11 +17,18 @@ import ( "github.com/skycoin/dmsg/pkg/dmsg/metrics" ) +// PeerEntry represents a peer dmsg server to connect to. +type PeerEntry struct { + PK cipher.PubKey + Addr string +} + // ServerConfig configues the Server type ServerConfig struct { MaxSessions int UpdateInterval time.Duration AuthPassphrase string + Peers []PeerEntry } // DefaultServerConfig returns the default server config. @@ -53,6 +60,12 @@ type Server struct { maxSessions int authPassphrase string + + // Peer server mesh support. + peers []PeerEntry + peerPKs map[cipher.PubKey]struct{} // set of known peer server PKs + peerSessions map[cipher.PubKey]*SessionCommon + peerSessionsMx sync.Mutex } // NewServer creates a new dmsg server entity. @@ -83,6 +96,24 @@ func NewServer(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Serv return s.updateServerEntry(ctx, s.AdvertisedAddr(), s.maxSessions, conf.AuthPassphrase) } s.authPassphrase = conf.AuthPassphrase + + // Initialize peer mesh. + s.peers = conf.Peers + s.peerPKs = make(map[cipher.PubKey]struct{}, len(conf.Peers)) + for _, p := range conf.Peers { + s.peerPKs[p.PK] = struct{}{} + } + s.peerSessions = make(map[cipher.PubKey]*SessionCommon) + s.peerSessionsFunc = func() []*SessionCommon { + s.peerSessionsMx.Lock() + defer s.peerSessionsMx.Unlock() + sessions := make([]*SessionCommon, 0, len(s.peerSessions)) + for _, ses := range s.peerSessions { + sessions = append(sessions, ses) + } + return sessions + } + return s } @@ -142,6 +173,8 @@ func (s *Server) Serve(lis net.Listener, addr string) error { return err } + s.connectToPeers(ctx) + log.Info("Accepting sessions...") s.readyOnce.Do(func() { close(s.ready) }) for { @@ -211,6 +244,180 @@ func (s *Server) Ready() <-chan struct{} { return s.ready } +func (s *Server) connectToPeers(ctx context.Context) { + // Connect to statically configured peers. + for _, peer := range s.peers { + s.wg.Add(1) + go func(peer PeerEntry) { + defer s.wg.Done() + s.maintainPeerConnection(ctx, peer) + }(peer) + } + + // Periodically discover other servers from discovery and peer with them. + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.discoverAndConnectPeers(ctx) + }() +} + +// discoverAndConnectPeers periodically queries discovery for all servers +// and establishes peer connections to any that aren't already connected. +func (s *Server) discoverAndConnectPeers(ctx context.Context) { + // activePeers tracks goroutines managing discovered peer connections. + activePeers := make(map[cipher.PubKey]context.CancelFunc) + + // Initial delay to let the server register itself first. + select { + case <-time.After(10 * time.Second): + case <-ctx.Done(): + return + } + + ticker := time.NewTicker(s.updateInterval) + defer ticker.Stop() + + for { + entries, err := s.dc.AllServers(ctx) + if err != nil { + s.log.WithError(err).Debug("Failed to discover peer servers.") + } else { + for _, entry := range entries { + pk := entry.Static + // Skip self and already-connected peers. + if pk == s.pk { + continue + } + if _, ok := activePeers[pk]; ok { + continue + } + // Skip if already a static peer (handled by connectToPeers). + s.peerSessionsMx.Lock() + _, alreadyPeer := s.peerPKs[pk] + if !alreadyPeer && entry.Server != nil && entry.Server.Address != "" { + s.peerPKs[pk] = struct{}{} + } + s.peerSessionsMx.Unlock() + if alreadyPeer { + continue + } + if entry.Server == nil || entry.Server.Address == "" { + continue + } + + peerCtx, peerCancel := context.WithCancel(ctx) //nolint:gosec + activePeers[pk] = peerCancel + + peer := PeerEntry{PK: pk, Addr: entry.Server.Address} + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.maintainPeerConnection(peerCtx, peer) + }() + } + } + + select { + case <-ticker.C: + case <-ctx.Done(): + // Cancel all discovered peer connections. + for _, cancel := range activePeers { + cancel() + } + return + } + } +} + +func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) { + log := s.log.WithField("peer_pk", peer.PK).WithField("peer_addr", peer.Addr) + bo := 5 * time.Second + + for { + select { + case <-ctx.Done(): + return + case <-s.done: + return + default: + } + + log.Info("Dialing peer server...") + conn, err := net.DialTimeout("tcp", peer.Addr, 10*time.Second) + if err != nil { + log.WithError(err).Warn("Failed to dial peer server.") + select { + case <-time.After(bo): + case <-ctx.Done(): + return + } + if bo < time.Minute { + bo = time.Duration(float64(bo) * 1.5) + } + continue + } + + ses := new(SessionCommon) + if err := ses.initClient(&s.EntityCommon, conn, peer.PK); err != nil { + log.WithError(err).Warn("Peer noise handshake failed.") + conn.Close() //nolint:errcheck,gosec + select { + case <-time.After(bo): + case <-ctx.Done(): + return + } + continue + } + + ses.sm.mutx.Lock() + ses.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig()) + if err != nil { + ses.sm.mutx.Unlock() + log.WithError(err).Warn("Peer yamux setup failed.") + conn.Close() //nolint:errcheck,gosec + select { + case <-time.After(bo): + case <-ctx.Done(): + return + } + continue + } + ses.sm.addr = ses.sm.yamux.RemoteAddr() + ses.isPeer = true + ses.sm.mutx.Unlock() + + s.peerSessionsMx.Lock() + s.peerSessions[peer.PK] = ses + s.peerSessionsMx.Unlock() + + log.Info("Connected to peer server.") + bo = 5 * time.Second // reset backoff on success + + // Block until the yamux session closes or context is done. + select { + case <-ctx.Done(): + case <-s.done: + } + + // Clean up. + s.peerSessionsMx.Lock() + delete(s.peerSessions, peer.PK) + s.peerSessionsMx.Unlock() + ses.Close() //nolint:errcheck,gosec + + log.Info("Peer session closed, will reconnect.") + } +} + +// isPeerPK returns true if the given PK is a known peer server. +func (s *Server) isPeerPK(pk cipher.PubKey) bool { + s.peerSessionsMx.Lock() + _, ok := s.peerPKs[pk] + s.peerSessionsMx.Unlock() + return ok +} + func (s *Server) handleSession(conn net.Conn) { defer func() { if r := recover(); r != nil { @@ -234,7 +441,14 @@ func (s *Server) handleSession(conn net.Conn) { return } log = log.WithField("remote_pk", dSes.RemotePK()) - log.Info("Started session.") + + // Mark session as peer if remote PK is a known peer server. + if s.isPeerPK(dSes.RemotePK()) { + dSes.isPeer = true + log.Info("Started peer server session.") + } else { + log.Info("Started session.") + } ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 800e8cdfe..b8ff15864 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -155,11 +155,17 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl if err != nil { return StreamRequest{}, err } - // TODO(evanlinjin): Implement timestamp tracker. + // Timestamp validation: we pass 0 because concurrent streams from + // the same client can have timestamps that arrive out of order at + // the server. Strict monotonic enforcement would reject valid + // concurrent requests. The noise encryption layer already prevents + // replay at the session level via nonce tracking. if err := req.Verify(0); err != nil { return StreamRequest{}, err } - if req.SrcAddr.PK != ss.rPK { + // For peer sessions, the SrcAddr.PK is the original client, not the + // peer server. The request signature is still verified above. + if !ss.isPeer && req.SrcAddr.PK != ss.rPK { return StreamRequest{}, ErrReqInvalidSrcPK } return req, nil @@ -205,25 +211,34 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl return nil } - // Obtain next session. + // Obtain next session (local client). ss2, ok := ss.entity.serverSession(req.DstAddr.PK) if !ok { - ss.m.RecordStream(metrics.DeltaFailed) // record failed stream - return ErrReqNoNextSession + // Destination not connected locally. If this request came from a + // client (not a peer), try forwarding through peer servers (1-hop). + // If this request already came from a peer, do NOT forward further. + if ss.isPeer { + ss.m.RecordStream(metrics.DeltaFailed) + return ErrReqNoNextSession + } + return ss.forwardViaPeer(log, yStr, req) } log.Debug("Obtained next session.") - // Forward request and obtain/check response. - yStr2, resp, err := ss2.forwardRequest(req) + return ss.bridgeStream(log, yStr, ss2, req) +} + +// bridgeStream forwards a request to a destination session and bridges the two streams. +func (ss *ServerSession) bridgeStream(log logrus.FieldLogger, yStr io.ReadWriteCloser, dst ServerSession, req StreamRequest) error { + yStr2, resp, err := dst.forwardRequest(req) if err != nil { - ss.m.RecordStream(metrics.DeltaFailed) // record failed stream + ss.m.RecordStream(metrics.DeltaFailed) return err } log.Debug("Forwarded stream request.") - // Forward response. if err := ss.writeObject(yStr, resp); err != nil { - ss.m.RecordStream(metrics.DeltaFailed) // record failed stream + ss.m.RecordStream(metrics.DeltaFailed) return err } log.Debug("Forwarded stream response.") @@ -233,13 +248,41 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl conn.SetReadDeadline(time.Time{}) //nolint:errcheck,gosec } - // Serve stream. log.Info("Serving stream.") - ss.m.RecordStream(metrics.DeltaConnect) // record successful stream - defer ss.m.RecordStream(metrics.DeltaDisconnect) // record disconnection + ss.m.RecordStream(metrics.DeltaConnect) + defer ss.m.RecordStream(metrics.DeltaDisconnect) return netutil.CopyReadWriteCloser(yStr, yStr2) } +// forwardViaPeer tries to forward a stream request through peer server sessions. +// This is only called for client-originated requests (not peer-originated, enforcing 1-hop max). +func (ss *ServerSession) forwardViaPeer(log logrus.FieldLogger, yStr io.ReadWriteCloser, req StreamRequest) error { + peers := ss.entity.peerServerSessions() + if len(peers) == 0 { + ss.m.RecordStream(metrics.DeltaFailed) + return ErrReqNoNextSession + } + + for _, peer := range peers { + // Don't forward back to the session the request came from. + if peer.RemotePK() == ss.rPK { + continue + } + + log := log.WithField("peer", peer.RemotePK()) + log.Debug("Trying peer server for forwarding.") + + err := ss.bridgeStream(log, yStr, peer, req) + if err == nil { + return nil + } + log.WithError(err).Debug("Peer forward failed, trying next.") + } + + ss.m.RecordStream(metrics.DeltaFailed) + return ErrReqNoNextSession +} + func addrToIP(addr net.Addr) (net.IP, error) { switch a := addr.(type) { case *net.TCPAddr: diff --git a/pkg/dmsg/session_common.go b/pkg/dmsg/session_common.go index fd2c59ade..1c47a5069 100644 --- a/pkg/dmsg/session_common.go +++ b/pkg/dmsg/session_common.go @@ -23,6 +23,7 @@ import ( type SessionCommon struct { entity *EntityCommon // back reference rPK cipher.PubKey // remote pk + isPeer bool // true if this session is with a peer server netConn net.Conn // underlying net.Conn (TCP connection to the dmsg server) // ys *yamux.Session @@ -75,7 +76,7 @@ func (sc *SessionCommon) initClient(entity *EntityCommon, conn net.Conn, rPK cip } rw := noise.NewReadWriter(conn, ns) - if err := rw.Handshake(time.Second * 5); err != nil { + if err := rw.Handshake(HandshakeTimeout); err != nil { return err } if rw.Buffered() > 0 { @@ -101,7 +102,7 @@ func (sc *SessionCommon) initServer(entity *EntityCommon, conn net.Conn) error { } rw := noise.NewReadWriter(conn, ns) - if err := rw.Handshake(time.Second * 5); err != nil { + if err := rw.Handshake(HandshakeTimeout); err != nil { return err } if rw.Buffered() > 0 { diff --git a/pkg/dmsgctrl/control_test.go b/pkg/dmsgctrl/control_test.go index d86e4ffa6..016a9ff2d 100644 --- a/pkg/dmsgctrl/control_test.go +++ b/pkg/dmsgctrl/control_test.go @@ -19,21 +19,24 @@ func TestControl_Ping(t *testing.T) { ctrlA := ControlStream(connA) ctrlB := ControlStream(connB) - t.Cleanup(func() { - assert.NoError(t, ctrlA.Close()) - assert.NoError(t, ctrlB.Close()) - }) + defer func() { + // Close in order: B first (the responder side), then A. + // This avoids EOF races where A's close kills the pipe + // while B's serve goroutine is mid-read. + _ = ctrlB.Close() //nolint:errcheck + _ = ctrlA.Close() //nolint:errcheck + }() for i := 0; i < times; i++ { - // act + // Ping A → B (ctrlA sends ping, ctrlB's serve goroutine responds with pong). durA, errA := ctrlA.Ping(context.TODO()) - durB, errB := ctrlB.Ping(context.TODO()) - t.Log(durA) - t.Log(durB) + require.NoError(t, errA, "ping A failed on iteration %d", i) + t.Logf("A: %v", durA) - // assert - assert.NoError(t, errA) - assert.NoError(t, errB) + // Ping B → A. + durB, errB := ctrlB.Ping(context.TODO()) + require.NoError(t, errB, "ping B failed on iteration %d", i) + t.Logf("B: %v", durB) } } diff --git a/pkg/dmsghttp/http_test.go b/pkg/dmsghttp/http_test.go index c07e2a7ce..03795709b 100644 --- a/pkg/dmsghttp/http_test.go +++ b/pkg/dmsghttp/http_test.go @@ -3,6 +3,7 @@ package dmsghttp import ( "bytes" + "context" "io" "net" "net/http" @@ -12,8 +13,6 @@ import ( "github.com/go-chi/chi/v5" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" "github.com/stretchr/testify/assert" - - dmsg "github.com/skycoin/dmsg/pkg/dmsg" ) const ( @@ -126,22 +125,27 @@ func startHTTPServer(t *testing.T, results chan httpServerResult, lis net.Listen results <- result }) + srv := &http.Server{ + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 3 * time.Second, + Handler: r, + } + errCh := make(chan error, 1) go func() { - srv := &http.Server{ - ReadTimeout: 3 * time.Second, - WriteTimeout: 3 * time.Second, - IdleTimeout: 30 * time.Second, - ReadHeaderTimeout: 3 * time.Second, - Handler: r, - } errCh <- srv.Serve(lis) close(errCh) }() t.Cleanup(func() { - assert.NoError(t, lis.Close()) - assert.EqualError(t, <-errCh, dmsg.ErrEntityClosed.Error()) + // Graceful shutdown: let in-flight requests finish before closing. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) //nolint:errcheck + _ = lis.Close() //nolint:errcheck + <-errCh }) } diff --git a/pkg/dmsgpty/pty_client.go b/pkg/dmsgpty/pty_client.go index 64da62197..4706c9d3d 100644 --- a/pkg/dmsgpty/pty_client.go +++ b/pkg/dmsgpty/pty_client.go @@ -98,7 +98,7 @@ func (sc *PtyClient) call(method string, args, reply interface{}) error { call := sc.rpcC.Go(sc.rpcMethod(method), args, reply, nil) select { case <-sc.done: - return io.ErrClosedPipe // TODO(evanlinjin): Is there a better error to use? + return io.ErrClosedPipe case <-call.Done: return call.Error } diff --git a/pkg/dmsgserver/config.go b/pkg/dmsgserver/config.go index e6daff4bf..6869f8776 100644 --- a/pkg/dmsgserver/config.go +++ b/pkg/dmsgserver/config.go @@ -26,6 +26,12 @@ var defaultDiscoveryURL = dmsg.DiscAddr(false) // DefaultDiscoverURLTest default URL for discovery in test env var DefaultDiscoverURLTest = dmsg.DiscAddr(true) +// PeerConfig represents a peer dmsg server to connect to for server-to-server mesh. +type PeerConfig struct { + PubKey cipher.PubKey `json:"public_key"` + Address string `json:"address"` +} + // Config is structure of config file type Config struct { Path string `json:"-"` @@ -39,6 +45,7 @@ type Config struct { LogLevel string `json:"log_level"` UpdateInterval time.Duration `json:"update_interval"` MaxSessions int `json:"max_sessions"` + Peers []PeerConfig `json:"peers,omitempty"` } // GenerateDefaultConfig generate default config for dmsg-server @@ -53,7 +60,7 @@ func GenerateDefaultConfig(c *Config) { c.LocalAddress = defaultLocalAddress c.HTTPAddress = defaultHTTPAddress c.LogLevel = "info" - c.MaxSessions = 2048 + c.MaxSessions = dmsg.DefaultMaxSessions } // Flush trying to save config file diff --git a/pkg/dmsgtest/mesh_test.go b/pkg/dmsgtest/mesh_test.go new file mode 100644 index 000000000..95a1b5c42 --- /dev/null +++ b/pkg/dmsgtest/mesh_test.go @@ -0,0 +1,190 @@ +// Package dmsgtest pkg/dmsgtest/mesh_test.go +// +//nolint:errcheck,gosec +package dmsgtest + +import ( + "bytes" + "context" + "io" + "sync" + "testing" + "time" + + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" +) + +// TestServerMesh_CrossServerDial verifies that a client connected to server A +// can dial a client connected to server B when the two servers are peered. +// Each client uses a separate filtered discovery so it only sees one server. +func TestServerMesh_CrossServerDial(t *testing.T) { + const timeout = 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Shared discovery for servers to register in. + sharedDC := disc.NewMock(0) + + // --- Server A --- + pkA, skA := cipher.GenerateKeyPair() + lisA, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + + // --- Server B --- + pkB, skB := cipher.GenerateKeyPair() + lisB, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + + // Configure servers as static peers of each other. + confA := &dmsg.ServerConfig{ + MaxSessions: 10, + UpdateInterval: dmsg.DefaultUpdateInterval, + Peers: []dmsg.PeerEntry{ + {PK: pkB, Addr: lisB.Addr().String()}, + }, + } + confB := &dmsg.ServerConfig{ + MaxSessions: 10, + UpdateInterval: dmsg.DefaultUpdateInterval, + Peers: []dmsg.PeerEntry{ + {PK: pkA, Addr: lisA.Addr().String()}, + }, + } + + srvA := dmsg.NewServer(pkA, skA, sharedDC, confA, nil) + srvB := dmsg.NewServer(pkB, skB, sharedDC, confB, nil) + + // Start servers. + var srvWg sync.WaitGroup + srvWg.Add(2) + go func() { + defer srvWg.Done() + if err := srvA.Serve(lisA, ""); err != nil { + t.Logf("Server A stopped: %v", err) + } + }() + go func() { + defer srvWg.Done() + if err := srvB.Serve(lisB, ""); err != nil { + t.Logf("Server B stopped: %v", err) + } + }() + + select { + case <-srvA.Ready(): + case <-ctx.Done(): + t.Fatal("Server A not ready") + } + select { + case <-srvB.Ready(): + case <-ctx.Done(): + t.Fatal("Server B not ready") + } + + // Give peer connections time to establish. + time.Sleep(2 * time.Second) + + // --- Client 1: only sees Server A --- + dcA := disc.NewMock(0) + entryA, err := sharedDC.Entry(ctx, pkA) + require.NoError(t, err) + require.NoError(t, dcA.PostEntry(ctx, entryA)) + + pk1, sk1 := cipher.GenerateKeyPair() + client1 := dmsg.NewClient(pk1, sk1, dcA, &dmsg.Config{MinSessions: 1}) + go client1.Serve(ctx) + select { + case <-client1.Ready(): + case <-ctx.Done(): + t.Fatal("Client 1 not ready") + } + + // Register client1 in shared discovery so server B can look it up if needed. + entry1, _ := dcA.Entry(ctx, pk1) + sharedDC.PostEntry(ctx, entry1) + + // --- Client 2: only sees Server B --- + dcB := disc.NewMock(0) + entryB, err := sharedDC.Entry(ctx, pkB) + require.NoError(t, err) + require.NoError(t, dcB.PostEntry(ctx, entryB)) + + pk2, sk2 := cipher.GenerateKeyPair() + client2 := dmsg.NewClient(pk2, sk2, dcB, &dmsg.Config{MinSessions: 1}) + go client2.Serve(ctx) + select { + case <-client2.Ready(): + case <-ctx.Done(): + t.Fatal("Client 2 not ready") + } + + // Register client2 in shared discovery so server A can route to it. + entry2, _ := dcB.Entry(ctx, pk2) + sharedDC.PostEntry(ctx, entry2) + + // Also register client2's entry in client1's discovery so client1 can + // look up client2's delegated servers for dialing. + dcA.PostEntry(ctx, entry2) + + // --- Cross-server dial: Client 1 (on A) dials Client 2 (on B) --- + const port = uint16(200) + + lis2, err := client2.Listen(port) + require.NoError(t, err) + defer lis2.Close() + + stream1, err := client1.DialStream(ctx, dmsg.Addr{PK: pk2, Port: port}) + require.NoError(t, err, "Cross-server dial should succeed via peer mesh") + defer stream1.Close() + + conn2, err := lis2.AcceptStream() + require.NoError(t, err) + defer conn2.Close() + + // --- Verify bidirectional data transfer --- + payload := cipher.RandByte(1024) + + // Client 1 -> Client 2 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, wErr := stream1.Write(payload) + assert.NoError(t, wErr) + }() + + recv := make([]byte, len(payload)) + _, err = io.ReadFull(conn2, recv) + require.NoError(t, err) + wg.Wait() + require.True(t, bytes.Equal(payload, recv), "data mismatch: client1 -> client2") + + // Client 2 -> Client 1 + wg.Add(1) + go func() { + defer wg.Done() + _, wErr := conn2.Write(payload) + assert.NoError(t, wErr) + }() + + recv2 := make([]byte, len(payload)) + _, err = io.ReadFull(stream1, recv2) + require.NoError(t, err) + wg.Wait() + require.True(t, bytes.Equal(payload, recv2), "data mismatch: client2 -> client1") + + t.Log("Cross-server bidirectional stream test passed.") + + // Cleanup. + client1.Close() + client2.Close() + srvA.Close() + srvB.Close() + srvWg.Wait() +} diff --git a/vendor/github.com/bytedance/sonic/loader/loader_latest.go b/vendor/github.com/bytedance/sonic/loader/loader_latest.go index 22de274b5..4b55a0dc6 100644 --- a/vendor/github.com/bytedance/sonic/loader/loader_latest.go +++ b/vendor/github.com/bytedance/sonic/loader/loader_latest.go @@ -1,4 +1,3 @@ - /* * Copyright 2021 ByteDance Inc. * @@ -18,92 +17,146 @@ package loader import ( - `github.com/bytedance/sonic/loader/internal/rt` + "strconv" + "sync/atomic" + + "github.com/bytedance/sonic/loader/internal/rt" ) +var loadBatchSeq uint64 + +type LoadOneItem struct { + Text []byte + FuncName string + FrameSize int + ArgSize int + ArgPtrs []bool + LocalPtrs []bool + Pcdata Pcdata +} + +func buildLoadFunc(noPreempt bool, item LoadOneItem, textSize uint32, entryOff uint32) Func { + fn := Func{ + Name: item.FuncName, + TextSize: textSize, + ArgsSize: int32(item.ArgSize), + EntryOff: entryOff, + } + + fn.Pcsp = &item.Pcdata + + if noPreempt { + fn.PcUnsafePoint = &Pcdata{ + {PC: textSize, Val: PCDATA_UnsafePointUnsafe}, + } + } else { + fn.PcUnsafePoint = &Pcdata{ + {PC: textSize, Val: PCDATA_UnsafePointSafe}, + } + } + + // NOTICE: suppose the function has only one stack map at index 0 + fn.PcStackMapIndex = &Pcdata{ + {PC: textSize, Val: 0}, + } + + if item.ArgPtrs != nil { + args := rt.StackMapBuilder{} + for _, b := range item.ArgPtrs { + args.AddField(b) + } + fn.ArgsPointerMaps = args.Build() + } + + if item.LocalPtrs != nil { + locals := rt.StackMapBuilder{} + for _, b := range item.LocalPtrs { + locals.AddField(b) + } + fn.LocalsPointerMaps = locals.Build() + } + + return fn +} + // LoadFuncs loads only one function as module, and returns the function pointer // - text: machine code // - funcName: function name -// - frameSize: stack frame size. +// - frameSize: stack frame size. // - argSize: argument total size (in bytes) // - argPtrs: indicates if a slot (8 Bytes) of arguments memory stores pointer, from low to high // - localPtrs: indicates if a slot (8 Bytes) of local variants memory stores pointer, from low to high -// -// WARN: +// +// WARN: // - the function MUST has fixed SP offset equaling to this, otherwise it go.gentraceback will fail // - the function MUST has only one stack map for all arguments and local variants func (self Loader) LoadOne(text []byte, funcName string, frameSize int, argSize int, argPtrs []bool, localPtrs []bool, pcdata Pcdata) Function { - size := uint32(len(text)) - - fn := Func{ - Name: funcName, - TextSize: size, - ArgsSize: int32(argSize), - } - - - fn.Pcsp = &pcdata - - if self.NoPreempt { - fn.PcUnsafePoint = &Pcdata{ - {PC: size, Val: PCDATA_UnsafePointUnsafe}, - } - } else { - fn.PcUnsafePoint = &Pcdata{ - {PC: size, Val: PCDATA_UnsafePointSafe}, - } - } - - // NOTICE: suppose the function has only one stack map at index 0 - fn.PcStackMapIndex = &Pcdata{ - {PC: size, Val: 0}, - } - - if argPtrs != nil { - args := rt.StackMapBuilder{} - for _, b := range argPtrs { - args.AddField(b) - } - fn.ArgsPointerMaps = args.Build() - } - - if localPtrs != nil { - locals := rt.StackMapBuilder{} - for _, b := range localPtrs { - locals.AddField(b) - } - fn.LocalsPointerMaps = locals.Build() - } - - out := Load(text, []Func{fn}, self.Name + funcName, []string{self.File}) - return out[0] + _ = frameSize + + fn := buildLoadFunc(self.NoPreempt, LoadOneItem{ + Text: text, + FuncName: funcName, + FrameSize: frameSize, + ArgSize: argSize, + ArgPtrs: argPtrs, + LocalPtrs: localPtrs, + Pcdata: pcdata, + }, uint32(len(text)), 0) + + out := Load(text, []Func{fn}, self.Name+funcName, []string{self.File}) + return out[0] +} + +func (self Loader) LoadMany(items []LoadOneItem) (out []Function) { + if len(items) == 0 { + return nil + } + + total := 0 + funcs := make([]Func, 0, len(items)) + text := make([]byte, 0) + + for _, item := range items { + _ = item.FrameSize + size := uint32(len(item.Text)) + funcs = append(funcs, buildLoadFunc(self.NoPreempt, item, size, uint32(total))) + total += len(item.Text) + } + + text = make([]byte, 0, total) + for _, item := range items { + text = append(text, item.Text...) + } + + moduleName := self.Name + "batch." + strconv.FormatUint(atomic.AddUint64(&loadBatchSeq, 1), 10) + return Load(text, funcs, moduleName, []string{self.File}) } // Load loads given machine codes and corresponding function information into go moduledata // and returns runnable function pointer // WARN: this API is experimental, use it carefully func Load(text []byte, funcs []Func, modulename string, filenames []string) (out []Function) { - ids := make([]string, len(funcs)) - for i, f := range funcs { - ids[i] = f.Name - } - // generate module data and allocate memory address - mod := makeModuledata(modulename, filenames, &funcs, text) - - // verify and register the new module - moduledataverify1(mod) - registerModule(mod) - - // - // encapsulate function address - out = make([]Function, len(funcs)) - for i, s := range ids { - for _, f := range funcs { - if f.Name == s { - m := uintptr(mod.text + uintptr(f.EntryOff)) - out[i] = Function(&m) - } - } - } - return + ids := make([]string, len(funcs)) + for i, f := range funcs { + ids[i] = f.Name + } + // generate module data and allocate memory address + mod := makeModuledata(modulename, filenames, &funcs, text) + + // verify and register the new module + moduledataverify1(mod) + registerModule(mod) + + // + // encapsulate function address + out = make([]Function, len(funcs)) + for i, s := range ids { + for _, f := range funcs { + if f.Name == s { + m := uintptr(mod.text + uintptr(f.EntryOff)) + out[i] = Function(&m) + } + } + } + return } diff --git a/vendor/github.com/gin-contrib/sse/README.md b/vendor/github.com/gin-contrib/sse/README.md index cfe2c820b..a63ac2037 100644 --- a/vendor/github.com/gin-contrib/sse/README.md +++ b/vendor/github.com/gin-contrib/sse/README.md @@ -2,6 +2,7 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/gin-contrib/sse.svg)](https://pkg.go.dev/github.com/gin-contrib/sse) [![Run Tests](https://github.com/gin-contrib/sse/actions/workflows/go.yml/badge.svg)](https://github.com/gin-contrib/sse/actions/workflows/go.yml) +[![Trivy Security Scan](https://github.com/gin-contrib/sse/actions/workflows/trivy-scan.yml/badge.svg)](https://github.com/gin-contrib/sse/actions/workflows/trivy-scan.yml) [![codecov](https://codecov.io/gh/gin-contrib/sse/branch/master/graph/badge.svg)](https://codecov.io/gh/gin-contrib/sse) [![Go Report Card](https://goreportcard.com/badge/github.com/gin-contrib/sse)](https://goreportcard.com/report/github.com/gin-contrib/sse) diff --git a/vendor/modules.txt b/vendor/modules.txt index 99577ad97..22092302d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -56,7 +56,7 @@ github.com/bytedance/sonic/internal/utils github.com/bytedance/sonic/option github.com/bytedance/sonic/unquote github.com/bytedance/sonic/utf8 -# github.com/bytedance/sonic/loader v0.5.0 +# github.com/bytedance/sonic/loader v0.5.1 ## explicit; go 1.16 github.com/bytedance/sonic/loader github.com/bytedance/sonic/loader/internal/abi @@ -155,7 +155,7 @@ github.com/gabriel-vasile/mimetype/internal/json github.com/gabriel-vasile/mimetype/internal/magic github.com/gabriel-vasile/mimetype/internal/markup github.com/gabriel-vasile/mimetype/internal/scan -# github.com/gin-contrib/sse v1.1.0 +# github.com/gin-contrib/sse v1.1.1 ## explicit; go 1.23 github.com/gin-contrib/sse # github.com/gin-gonic/gin v1.12.0