Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/dmsg-discovery/commands/dmsg-discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ Example:
go a.RunBackgroundTasks(ctx, log)
log.WithField("addr", addr).Info("Serving discovery API...")
go func() {
if err = listenAndServe(addr, a); err != nil {
log.Errorf("ListenAndServe: %v", err)
if listenErr := listenAndServe(addr, a); listenErr != nil {
log.Errorf("ListenAndServe: %v", listenErr)
cancel()
}
}()
Expand Down Expand Up @@ -408,8 +408,8 @@ Example:
go updateServers(ctx, a, dClient, dmsgDC, dmsgServerType, log)

go func() {
if err = dmsghttp.ListenAndServe(ctx, sk, a, dClient, dmsg.DefaultDmsgHTTPPort, dmsgDC, log); err != nil {
log.Errorf("dmsghttp.ListenAndServe: %v", err)
if dmsgErr := dmsghttp.ListenAndServe(ctx, sk, a, dClient, dmsg.DefaultDmsgHTTPPort, dmsgDC, log); dmsgErr != nil {
log.Errorf("dmsghttp.ListenAndServe: %v", dmsgErr)
cancel()
}
}()
Expand Down Expand Up @@ -467,7 +467,7 @@ func getServers(ctx context.Context, a *api.API, dmsgServerType string, log logr
case <-ctx.Done():
return []*disc.Entry{}
case <-ticker.C:
getServers(ctx, a, dmsgServerType, log)
return getServers(ctx, a, dmsgServerType, log)
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions cmd/dmsgcurl/commands/dmsgcurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ var RootCmd = &cobra.Command{
httpClient = &http.Client{
Transport: transport,
}
ctx = context.WithValue(context.Background(), "socks5_proxy", proxyAddr) //nolint
ctx = context.WithValue(ctx, "socks5_proxy", proxyAddr) //nolint
}

cErr = handleRequest(ctx, pk, sk, httpClient, parsedURL, dmsgcurlData)
Expand All @@ -166,7 +166,7 @@ func handleRequest(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, http
Code: errorCode["WRITE_INIT"],
}
}
defer closeAndCleanFile(file, err)
defer func() { closeAndCleanFile(file, err) }()
var httpC http.Client

if flags.UseDC {
Expand Down Expand Up @@ -256,9 +256,8 @@ func handleRequest(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, http
dlog.WithError(err).Debug("Failed to perform HTTP request after maximum retries")
continue // Retry outer attempt
}
defer closeResponseBody(resp)

n, err := cancellableCopy(ctx, file, resp.Body, resp.ContentLength)
closeResponseBody(resp)
if err != nil {
dlog.WithError(err).Errorf("Download failed at %d/%dB", n, resp.ContentLength)
select {
Expand Down Expand Up @@ -373,7 +372,12 @@ func (pw *progressWriter) Write(p []byte) (int, error) {
n := len(p)
current := atomic.AddInt64(&pw.Current, int64(n))
total := atomic.LoadInt64(&pw.Total)
pc := fmt.Sprintf("%d%%", current*100/total)
var pc string
if total > 0 {
pc = fmt.Sprintf("%d%%", current*100/total)
} else {
pc = "unknown"
}
if dmsgcurlOutput != "" {
fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc)
if current != total {
Expand Down
4 changes: 2 additions & 2 deletions cmd/dmsgweb/commands/dmsgweb.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ dmsgweb conf file detected: ` + dwcfg
httpClient = &http.Client{
Transport: transport,
}
ctx = context.WithValue(context.Background(), "socks5_proxy", proxyAddr) //nolint
ctx = context.WithValue(ctx, "socks5_proxy", proxyAddr) //nolint
}

dmsgC, closeDmsg, err = cli.InitDmsgWithFlags(ctx, dlog, pk, sk, httpClient, "")
Expand Down Expand Up @@ -390,7 +390,7 @@ func proxyHTTPConn(n int) {
} else {
dmsgp = "80"
}
urlStr = fmt.Sprintf("dmsg://%s:%s%s", strings.TrimRight(hostParts[0], filterDomainSuffix), dmsgp, c.Param("path"))
urlStr = fmt.Sprintf("dmsg://%s:%s%s", strings.TrimSuffix(hostParts[0], filterDomainSuffix), dmsgp, c.Param("path"))
if c.Request.URL.RawQuery != "" {
urlStr = fmt.Sprintf("%s?%s", urlStr, c.Request.URL.RawQuery)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/dmsgweb/commands/dmsgwebsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func server() {
httpClient = &http.Client{
Transport: transport,
}
ctx = context.WithValue(context.Background(), "socks5_proxy", proxyAddr) //nolint
ctx = context.WithValue(ctx, "socks5_proxy", proxyAddr) //nolint
}

dmsgC, closeDmsg, err = cli.InitDmsgWithFlags(ctx, dlog, pk, sk, httpClient, "")
Expand Down
21 changes: 21 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package cli

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -519,8 +520,28 @@ func NewFallbackRoundTripper(ctx context.Context, clients []*dmsg.Client) http.R

// RoundTrip tries each DMSG client in order until a successful response is received.
func (f *FallbackRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// Buffer the request body so it can be replayed on retry.
// Without this, the first failed transport consumes the body
// and subsequent transports receive an empty body.
var bodyBytes []byte
if req.Body != nil {
var err error
bodyBytes, err = io.ReadAll(req.Body)
if err != nil {
return nil, fmt.Errorf("failed to read request body for retry: %w", err)
}
req.Body.Close() //nolint:errcheck,gosec
}

var lastErr error
for _, client := range f.clients {
// Reset the body for each attempt
if bodyBytes != nil {
req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
} else {
req.Body = nil
}

rt := dmsghttp.MakeHTTPTransport(f.ctx, client)
resp, err := rt.RoundTrip(req)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions internal/dmsg-discovery/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type API struct {

// New returns a new API object, which can be started as a server
func New(log logrus.FieldLogger, db store.Storer, m discmetrics.Metrics, testMode, enableLoadTesting, enableMetrics bool, dmsgAddr, authPassphrase string) *API {
if log != nil {
if log == nil {
log = logging.MustGetLogger("dmsg_disc")
}

Expand Down Expand Up @@ -358,6 +358,7 @@ func (a *API) setEntry() func(w http.ResponseWriter, r *http.Request) {
// json serialized entry object
func (a *API) delEntry() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close() //nolint:errcheck
entry := new(disc.Entry)
if err := json.NewDecoder(r.Body).Decode(entry); err != nil {
a.handleError(w, r, disc.ErrUnexpected)
Expand Down Expand Up @@ -526,7 +527,11 @@ func isLoopbackAddr(addr string) (bool, error) {
return true, nil
}

return net.ParseIP(host).IsLoopback(), nil
ip := net.ParseIP(host)
if ip == nil {
return false, nil
}
return ip.IsLoopback(), nil
}

// writeJSON writes a json object on a http.ResponseWriter with the given code.
Expand Down
10 changes: 7 additions & 3 deletions internal/dmsg-discovery/api/error_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package api

import (
"errors"
"net/http"

"github.com/skycoin/dmsg/pkg/disc"
Expand Down Expand Up @@ -36,9 +37,12 @@ func (a *API) handleError(w http.ResponseWriter, r *http.Request, e error) {
code = http.StatusUnprocessableEntity
msg = e.Error()
} else {
f, ok := apiErrors[e]
if !ok {
f = func() (int, string) { return http.StatusInternalServerError, disc.ErrUnexpected.Error() }
f := func() (int, string) { return http.StatusInternalServerError, disc.ErrUnexpected.Error() }
for target, handler := range apiErrors {
if errors.Is(e, target) {
f = handler
break
}
}
code, msg = f()
}
Expand Down
20 changes: 9 additions & 11 deletions internal/dmsg-server/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,24 @@ func (a *API) SetDmsgServer(srv *dmsg.Server) {

// ListenAndServe runs dmsg Serve function alongside health endpoint
func (a *API) ListenAndServe(lAddr, pAddr, httpAddr string) error {
errCh := make(chan error)
errCh := make(chan error, 2)

dmsgLn, err := net.Listen("tcp", lAddr)
if err != nil {
return err
}
dmsgLis := &proxyproto.Listener{Listener: dmsgLn}
defer dmsgLis.Close() // nolint:errcheck
go func(l net.Listener, address string) {
if err := a.dmsgServer.Serve(l, address); err != nil {
errCh <- err
}
errCh <- a.dmsgServer.Serve(l, address)
l.Close() //nolint:errcheck,gosec
}(dmsgLis, pAddr)

ln, err := net.Listen("tcp", httpAddr)
if err != nil {
dmsgLis.Close() //nolint:errcheck,gosec
return err
}
lis := &proxyproto.Listener{Listener: ln}
defer lis.Close() // nolint:errcheck
srv := &http.Server{
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
Expand All @@ -110,9 +108,8 @@ func (a *API) ListenAndServe(lAddr, pAddr, httpAddr string) error {
//Addr: lis,
Handler: a.router,
}
if err := srv.Serve(lis); err != nil {
errCh <- err
}
errCh <- srv.Serve(lis)
lis.Close() //nolint:errcheck,gosec

return <-errCh
}
Expand Down Expand Up @@ -161,6 +158,9 @@ func (a *API) updateInternalState() {
// UpdateAverageNumberOfPacketsPerMinute is function which needs to called every minute.
func (a *API) updateAverageNumberOfPacketsPerMinute() {
if a.dmsgServer != nil {
a.sMu.Lock()
defer a.sMu.Unlock()

newDecValues, newEncValues, average := calculateThroughput(
a.dmsgServer.GetSessions(),
a.minuteDecValues,
Expand All @@ -169,8 +169,6 @@ func (a *API) updateAverageNumberOfPacketsPerMinute() {

a.metrics.SetPacketsPerMinute(average)

a.sMu.Lock()
defer a.sMu.Unlock()
a.minuteDecValues = newDecValues
a.minuteEncValues = newEncValues
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/direct/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ func NewClient(entries []*disc.Entry, log *logging.Logger) disc.APIClient {
func (c *directClient) Entry(_ context.Context, pubKey cipher.PubKey) (*disc.Entry, error) {
c.mx.RLock()
defer c.mx.RUnlock()
for _, entry := range c.entries {
if entry.Static == pubKey {
return entry, nil
}
if entry, ok := c.entries[pubKey]; ok {
return entry, nil
}
return nil, disc.ErrKeyNotFound
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/disc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *httpClient) PostEntry(ctx context.Context, entry *Entry) error {
Error()
return errFromString(httpResponse.Message)
}
_, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck
return nil
}

Expand Down Expand Up @@ -198,6 +199,7 @@ func (c *httpClient) DelEntry(ctx context.Context, entry *Entry) error {
Error()
return errFromString(httpResponse.Message)
}
_, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck
return nil
}

Expand All @@ -206,10 +208,11 @@ func (c *httpClient) PutEntry(ctx context.Context, sk cipher.SecKey, entry *Entr
c.updateMux.Lock()
defer c.updateMux.Unlock()

entry.Sequence++
sequence := entry.Sequence + 1
entry.Timestamp = time.Now().UnixNano()

for {
entry.Sequence = sequence
err := entry.Sign(sk)
if err != nil {
return err
Expand All @@ -219,18 +222,17 @@ func (c *httpClient) PutEntry(ctx context.Context, sk cipher.SecKey, entry *Entr
return nil
}
if err != ErrValidationWrongSequence {
entry.Sequence--
return err
}
rE, entryErr := c.Entry(ctx, entry.Static)
if entryErr != nil {
return err
return entryErr
}
if rE.Timestamp > entry.Timestamp { // If there is a more up to date entry drop update
entry.Sequence = rE.Sequence
return nil
}
entry.Sequence = rE.Sequence + 1
sequence = rE.Sequence + 1
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/disc/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func Copy(dst, src *Entry) {
dst.Client = nil
} else {
*dst.Client = *src.Client
if src.Client.DelegatedServers != nil {
dst.Client.DelegatedServers = make([]cipher.PubKey, len(src.Client.DelegatedServers))
copy(dst.Client.DelegatedServers, src.Client.DelegatedServers)
}
}

dst.Static = src.Static
Expand Down
26 changes: 22 additions & 4 deletions pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ type Client struct {
conf *Config
porter *netutil.Porter

bo time.Duration // initial backoff duration
initBO time.Duration // initial backoff duration (constant)
bo time.Duration // current backoff duration
maxBO time.Duration // maximum backoff duration
factor float64 // multiplier for the backoff duration that is applied on every retry

Expand All @@ -106,6 +107,7 @@ func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Conf
errCh: make(chan error, 10),
done: make(chan struct{}),
conf: conf,
initBO: time.Second * 5,
bo: time.Second * 5,
maxBO: time.Minute,
factor: netutil.DefaultFactor,
Expand Down Expand Up @@ -210,6 +212,7 @@ func (ce *Client) Serve(ctx context.Context) {
if len(entries) == 0 {
ce.log.Warnf("No entries found. Retrying after %s...", ce.bo.String())
ce.serveWait()
continue
}
// randomize dmsg servers list using crypto/rand seed for true randomization
// This ensures each client connects to servers in a different order,
Expand Down Expand Up @@ -280,6 +283,9 @@ func (ce *Client) Serve(ctx context.Context) {
ce.log.WithField("remote_pk", entry.Static).WithError(err).WithField("current_backoff", ce.bo.String()).
Warn("Failed to establish session.")
ce.serveWait()
} else {
// Reset backoff on successful session establishment.
ce.bo = ce.initBO
}
}

Expand Down Expand Up @@ -373,10 +379,16 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
}

// Range client's delegated servers.
// See if we are already connected to a delegated server.
// Try existing sessions first, falling back to next server on failure.
for _, srvPK := range entry.Client.DelegatedServers {
if dSes, ok := ce.clientSession(ce.porter, srvPK); ok {
return dSes.DialStream(addr)
stream, err := dSes.DialStream(addr)
if err != nil {
ce.log.WithError(err).WithField("server", srvPK).
Debug("DialStream failed via existing session, trying next server")
continue
}
return stream, nil
}
}

Expand All @@ -387,7 +399,13 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
if err != nil {
continue
}
return dSes.DialStream(addr)
stream, err := dSes.DialStream(addr)
if err != nil {
ce.log.WithError(err).WithField("server", srvPK).
Debug("DialStream failed via new session, trying next server")
continue
}
return stream, nil
}

return nil, ErrCannotConnectToDelegated
Expand Down
Loading
Loading