From 2833120c4ab9ba940f85d07a55737dc2bf96d19d Mon Sep 17 00:00:00 2001 From: wangjunwei Date: Fri, 12 Nov 2021 22:19:58 +0800 Subject: [PATCH 01/24] fix extract term panic --- v2/websocket/channels.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/v2/websocket/channels.go b/v2/websocket/channels.go index 020a60cf2..d856bdb82 100644 --- a/v2/websocket/channels.go +++ b/v2/websocket/channels.go @@ -22,7 +22,7 @@ import ( ) type Heartbeat struct { - //ChannelIDs []int64 + // ChannelIDs []int64 } func (c *Client) handleChannel(socketId SocketId, msg []byte) error { @@ -165,7 +165,13 @@ func (c *Client) handlePrivateChannel(raw []interface{}) error { // authenticated snapshots? if len(raw) > 2 { if arr, ok := raw[2].([]interface{}); ok { - obj, err := c.handlePrivateDataMessage(raw[1].(string), arr) + term, ok := raw[1].(string) + if !ok { + c.log.Warningf("extract term err, raw: %v", raw) + return nil + } + + obj, err := c.handlePrivateDataMessage(term, arr) if err != nil { return err } @@ -425,7 +431,7 @@ func (c *Client) convertRaw(term string, raw []interface{}) interface{} { } flc := fundingloan.Cancel(*o) return &flc - //case "uac": + // case "uac": case "hb": return &Heartbeat{} case "ats": From 70da130bb3f538646ef4c09e3232456866aceab1 Mon Sep 17 00:00:00 2001 From: wangjunwei Date: Wed, 24 Nov 2021 16:50:16 +0800 Subject: [PATCH 02/24] fix notification panic on deposit_new message --- pkg/models/notification/notification.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/models/notification/notification.go b/pkg/models/notification/notification.go index 85f1b1d16..e660b1747 100644 --- a/pkg/models/notification/notification.go +++ b/pkg/models/notification/notification.go @@ -38,9 +38,16 @@ func FromRaw(raw []interface{}) (n *Notification, err error) { return } - nraw := raw[4].([]interface{}) - if len(nraw) == 0 { - return + nraw, ok := raw[4].([]interface{}) + if !ok { + // raw[4] type of deposit_new message is object + if n.Type != "deposit_new" { + return nil, fmt.Errorf("raw[4] of %v message is not array, raw message: %v", n.Type, raw) + } + } else { + if len(nraw) == 0 { + return + } } switch n.Type { From 9b504321304675f8699d3db7374e0ad7446206de Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Fri, 26 Nov 2021 19:34:50 +0800 Subject: [PATCH 03/24] fix panic --- v2/websocket/transport.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index 09c6841b7..993166464 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -5,21 +5,24 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/op/go-logging" "net" "net/http" "sync" "time" + "github.com/op/go-logging" + "github.com/gorilla/websocket" ) // size of channel that the websocket writer // routine pulls from const WS_WRITE_CAPACITY = 5000 + // size of channel that the websocket reader // routine pushes websocket updates into const WS_READ_CAPACITY = 10 + // seconds to wait in between re-sending // the keep alive ping const KEEP_ALIVE_TIMEOUT = 10 @@ -50,7 +53,7 @@ type ws struct { writeChan chan []byte kill chan interface{} // signal to routines to kill - quit chan error // signal to parent with error, if applicable + quit chan error // signal to parent with error, if applicable } func (w *ws) Connect() error { @@ -58,10 +61,10 @@ func (w *ws) Connect() error { return nil // no op } var d = websocket.Dialer{ - Subprotocols: []string{"p1", "p2"}, - ReadBufferSize: 1024, - WriteBufferSize: 1024, - Proxy: http.ProxyFromEnvironment, + Subprotocols: []string{"p1", "p2"}, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + Proxy: http.ProxyFromEnvironment, HandshakeTimeout: time.Second * 10, } @@ -107,9 +110,9 @@ func (w *ws) Send(ctx context.Context, msg interface{}) error { } select { - case <- ctx.Done(): + case <-ctx.Done(): return ctx.Err() - case <- w.kill: // ws closed + case <-w.kill: // ws closed return fmt.Errorf("websocket connection closed") default: } @@ -126,10 +129,14 @@ func (w *ws) Done() <-chan error { // listen for write requests and perform them func (w *ws) listenWriteChannel() { for { + if w.ws == nil { + return + } + select { case <-w.kill: // ws closed return - case message := <- w.writeChan: + case message := <-w.writeChan: wsWriter, err := w.ws.NextWriter(websocket.TextMessage) if err != nil { w.log.Error("Unable to provision ws connection writer: ", err) From bd039c1220a360b02047bba0dae30c12b8687daf Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Mon, 29 Nov 2021 21:14:23 +0800 Subject: [PATCH 04/24] withdraw add payment_id support --- examples/v2/rest-wallet/main.go | 5 +++-- v2/rest/wallet.go | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/v2/rest-wallet/main.go b/examples/v2/rest-wallet/main.go index 2ab3e2ec6..850e5e25e 100644 --- a/examples/v2/rest-wallet/main.go +++ b/examples/v2/rest-wallet/main.go @@ -4,8 +4,9 @@ import ( "log" "os" - "github.com/bitfinexcom/bitfinex-api-go/v2/rest" "github.com/davecgh/go-spew/spew" + + "github.com/bitfinexcom/bitfinex-api-go/v2/rest" ) // Set BFX_API_KEY and BFX_API_SECRET as : @@ -65,7 +66,7 @@ func createDepositAddress(c *rest.Client) { } func withdraw(c *rest.Client) { - notfication, err := c.Wallet.Withdraw("exchange", "ethereum", 0.1, "0x5B4Dbe55dE0B565db6C63405D942886140083cE8") + notfication, err := c.Wallet.Withdraw("exchange", "ethereum", 0.1, "0x5B4Dbe55dE0B565db6C63405D942886140083cE8", "") if err != nil { log.Fatalf("withdraw %s", err) } diff --git a/v2/rest/wallet.go b/v2/rest/wallet.go index a5ed7bab4..4bcabeb9e 100644 --- a/v2/rest/wallet.go +++ b/v2/rest/wallet.go @@ -86,13 +86,16 @@ func (ws *WalletService) CreateDepositAddress(wallet, method string) (*notificat // Submits a request to withdraw funds from the given Bitfinex wallet to the given address // See https://docs.bitfinex.com/reference#withdraw for more info -func (ws *WalletService) Withdraw(wallet, method string, amount float64, address string) (*notification.Notification, error) { +func (ws *WalletService) Withdraw(wallet, method string, amount float64, address string, paymentId string) (*notification.Notification, error) { body := map[string]interface{}{ "wallet": wallet, "method": method, "amount": strconv.FormatFloat(amount, 'f', -1, 64), "address": address, } + if paymentId != "" { + body["payment_id"] = paymentId + } req, err := ws.requestFactory.NewAuthenticatedRequestWithData(common.PermissionWrite, "withdraw", body) if err != nil { return nil, err From 45f820882b115c9851dff9f8dc32efd57723ad9f Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Tue, 30 Nov 2021 18:50:09 +0800 Subject: [PATCH 05/24] fix params dump --- v2/websocket/client.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/v2/websocket/client.go b/v2/websocket/client.go index 88b228bf5..148b0f63a 100644 --- a/v2/websocket/client.go +++ b/v2/websocket/client.go @@ -3,6 +3,9 @@ package websocket import ( "bytes" "context" + "crypto/hmac" + "crypto/sha512" + "encoding/hex" "fmt" "strings" "sync" @@ -14,10 +17,6 @@ import ( "github.com/bitfinexcom/bitfinex-api-go/pkg/models/common" "github.com/bitfinexcom/bitfinex-api-go/pkg/utils" - - "crypto/hmac" - "crypto/sha512" - "encoding/hex" ) var productionBaseURL = "wss://api-pub.bitfinex.com/ws/2" @@ -357,7 +356,7 @@ func (c *Client) reconnect(socket *Socket, err error) error { func (c *Client) dumpParams() { c.log.Debug("----Bitfinex Client Parameters----") c.log.Debugf("AutoReconnect=%t", c.parameters.AutoReconnect) - c.log.Debugf("CapacityPerConnection=%t", c.parameters.CapacityPerConnection) + c.log.Debugf("CapacityPerConnection=%v", c.parameters.CapacityPerConnection) c.log.Debugf("ReconnectInterval=%s", c.parameters.ReconnectInterval) c.log.Debugf("ReconnectAttempts=%d", c.parameters.ReconnectAttempts) c.log.Debugf("ShutdownTimeout=%s", c.parameters.ShutdownTimeout) From ddd7107bb80d4361ffc4a9314c6db5b0c05e9da2 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Sat, 4 Dec 2021 21:01:47 +0800 Subject: [PATCH 06/24] order book add BidsAndAsks method --- v2/websocket/orderbook.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/v2/websocket/orderbook.go b/v2/websocket/orderbook.go index 665b510aa..5b0b4d2f7 100644 --- a/v2/websocket/orderbook.go +++ b/v2/websocket/orderbook.go @@ -44,6 +44,13 @@ func (ob *Orderbook) Bids() []book.Book { return ob.copySide(ob.bids) } +func (ob *Orderbook) BidsAndAsks() ([]book.Book, []book.Book) { + ob.lock.RLock() + defer ob.lock.RUnlock() + + return ob.copySide(ob.bids), ob.copySide(ob.asks) +} + func (ob *Orderbook) SetWithSnapshot(bs *book.Snapshot) { ob.lock.Lock() defer ob.lock.Unlock() From 57f1109f22d6f17ff9fb6c7144923d5864a7c5e0 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Thu, 9 Dec 2021 17:41:49 +0800 Subject: [PATCH 07/24] fix order book update when there is not matched price level --- v2/websocket/orderbook.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/v2/websocket/orderbook.go b/v2/websocket/orderbook.go index 5b0b4d2f7..58de2b4f0 100644 --- a/v2/websocket/orderbook.go +++ b/v2/websocket/orderbook.go @@ -1,6 +1,7 @@ package websocket import ( + "fmt" "hash/crc32" "sort" "strings" @@ -96,6 +97,14 @@ func (ob *Orderbook) UpdateWith(b *book.Book) { *side = append((*side)[:index], (*side)[index+1:]...) } } + + // price may not match at previous step + if b.Count <= 0 { + fmt.Printf("bitfinex matched price level %v not found at local cache, id: %v, symbol: %v", + b.Price, b.ID, b.Symbol) + return + } + *side = append(*side, b) // add to the orderbook and sort lowest to highest sort.Slice(*side, func(i, j int) bool { From 43d5db103be2655a210a7052f4345b6ec7e905d1 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Fri, 10 Dec 2021 18:44:41 +0800 Subject: [PATCH 08/24] fix checksum handling error --- v2/websocket/channels.go | 15 ++-------- v2/websocket/events.go | 37 +++++++++++++++++++---- v2/websocket/orderbook.go | 2 +- v2/websocket/subscriptions.go | 56 +++++++++++++++++------------------ 4 files changed, 63 insertions(+), 47 deletions(-) diff --git a/v2/websocket/channels.go b/v2/websocket/channels.go index d856bdb82..c60d7d900 100644 --- a/v2/websocket/channels.go +++ b/v2/websocket/channels.go @@ -92,23 +92,14 @@ func (c *Client) handleChecksumChannel(sub *subscription, checksum int) error { if bChecksum == oChecksum { c.log.Debugf("Orderbook '%s' checksum verification successful.", symbol) } else { - c.log.Warningf("Orderbook '%s' checksum is invalid got %d bot got %d. Data Out of sync, reconnecting.", + c.log.Warningf("Orderbook '%s' checksum is invalid, want %d, but got %d. Data Out of sync, resubscribing.", symbol, bChecksum, oChecksum) err := c.sendUnsubscribeMessage(context.Background(), sub) if err != nil { return err } - newSub := &SubscriptionRequest{ - SubID: c.nonce.GetNonce(), // generate new subID - Event: sub.Request.Event, - Channel: sub.Request.Channel, - Symbol: sub.Request.Symbol, - } - _, err_sub := c.Subscribe(context.Background(), newSub) - if err_sub != nil { - c.log.Warningf("could not resubscribe: %s", err_sub.Error()) - return err_sub - } + + // NOTE: do not resubscribe here, since we need to wait unsubscribe success first to avoid dup err("msg":"subscribe: dup","code":10301) } } return nil diff --git a/v2/websocket/events.go b/v2/websocket/events.go index 34ec8aab7..e95b3caec 100644 --- a/v2/websocket/events.go +++ b/v2/websocket/events.go @@ -1,6 +1,7 @@ package websocket import ( + "context" "encoding/json" ) @@ -107,7 +108,7 @@ func (c *Client) handleEvent(socketId SocketId, msg []byte) error { if err != nil { return err } - //var e interface{} + // var e interface{} switch event.Event { case "info": i := InfoEvent{} @@ -154,10 +155,34 @@ func (c *Client) handleEvent(socketId SocketId, msg []byte) error { if err != nil { return err } - err_rem := c.subscriptions.removeByChannelID(s.ChanID) - if err_rem != nil { - return err_rem + + sub, errRem := c.subscriptions.removeByChannelID(s.ChanID) + if errRem != nil { + c.log.Warningf("remove local sub, chan_id: %v, err: %v", s.ChanID, errRem) + return errRem } + + c.log.Infof("remove local sub success, chan_id: %v, sub_id: %v, sub: %+v", s.ChanID, sub.SubID(), sub) + + if sub.Request.Channel == ChanBook { + newSubReq := &SubscriptionRequest{ + SubID: c.nonce.GetNonce(), // generate new subID + Event: sub.Request.Event, + Channel: sub.Request.Channel, + Symbol: sub.Request.Symbol, + Precision: sub.Request.Precision, + Len: sub.Request.Len, + Frequency: sub.Request.Frequency, + } + + c.log.Infof("resubscribing, original chan_id: %v, sub_id: %v, new sub_id: %v, sub_req: %v", s.ChanID, sub.SubID(), newSubReq.SubID, newSubReq) + _, errSub := c.Subscribe(context.Background(), newSubReq) + if errSub != nil { + c.log.Warningf("could not resubscribe, new sub_id: %v, err: %s", newSubReq.SubID, errSub.Error()) + return errSub + } + } + c.listener <- &s case "error": er := ErrorEvent{} @@ -177,8 +202,8 @@ func (c *Client) handleEvent(socketId SocketId, msg []byte) error { c.log.Warningf("unknown event: %s", msg) } - //err = json.Unmarshal(msg, &e) - //TODO raw message isn't ever published + // err = json.Unmarshal(msg, &e) + // TODO raw message isn't ever published return err } diff --git a/v2/websocket/orderbook.go b/v2/websocket/orderbook.go index 58de2b4f0..1d2e529c6 100644 --- a/v2/websocket/orderbook.go +++ b/v2/websocket/orderbook.go @@ -100,7 +100,7 @@ func (ob *Orderbook) UpdateWith(b *book.Book) { // price may not match at previous step if b.Count <= 0 { - fmt.Printf("bitfinex matched price level %v not found at local cache, id: %v, symbol: %v", + fmt.Printf("bitfinex matched price level %v not found at local cache, id: %v, symbol: %v.\n", b.Price, b.ID, b.Symbol) return } diff --git a/v2/websocket/subscriptions.go b/v2/websocket/subscriptions.go index 94f05ab69..f5e798aec 100644 --- a/v2/websocket/subscriptions.go +++ b/v2/websocket/subscriptions.go @@ -2,10 +2,11 @@ package websocket import ( "fmt" - "github.com/op/go-logging" "strings" "sync" "time" + + "github.com/op/go-logging" ) type SubscriptionRequest struct { @@ -56,12 +57,12 @@ type UnsubscribeRequest struct { } type subscription struct { - ChanID int64 - SocketId SocketId - pending bool - Public bool + ChanID int64 + SocketId SocketId + pending bool + Public bool - Request *SubscriptionRequest + Request *SubscriptionRequest hbDeadline time.Time } @@ -84,11 +85,11 @@ func isPublic(request *SubscriptionRequest) bool { func newSubscription(socketId SocketId, request *SubscriptionRequest) *subscription { return &subscription{ - ChanID: -1, + ChanID: -1, SocketId: socketId, - Request: request, - pending: true, - Public: isPublic(request), + Request: request, + pending: true, + Public: isPublic(request), } } @@ -102,15 +103,15 @@ func (s subscription) Pending() bool { func newSubscriptions(heartbeatTimeout time.Duration, log *logging.Logger) *subscriptions { subs := &subscriptions{ - subsBySubID: make(map[string]*subscription), - subsByChanID: make(map[int64]*subscription), + subsBySubID: make(map[string]*subscription), + subsByChanID: make(map[int64]*subscription), subsBySocketId: make(map[SocketId]SubscriptionSet), - hbTimeout: heartbeatTimeout, - hbShutdown: make(chan struct{}), - hbDisconnect: make(chan HeartbeatDisconnect), - hbSleep: heartbeatTimeout / time.Duration(4), - log: log, - lock: &sync.RWMutex{}, + hbTimeout: heartbeatTimeout, + hbShutdown: make(chan struct{}), + hbDisconnect: make(chan HeartbeatDisconnect), + hbSleep: heartbeatTimeout / time.Duration(4), + log: log, + lock: &sync.RWMutex{}, } go subs.control() return subs @@ -123,12 +124,12 @@ type heartbeat struct { } type subscriptions struct { - lock *sync.RWMutex - log *logging.Logger + lock *sync.RWMutex + log *logging.Logger subsBySocketId map[SocketId]SubscriptionSet // subscripts map indexed by socket id - subsBySubID map[string]*subscription // subscription map indexed by subscription ID - subsByChanID map[int64]*subscription // subscription map indexed by channel ID + subsBySubID map[string]*subscription // subscription map indexed by subscription ID + subsByChanID map[int64]*subscription // subscription map indexed by channel ID hbActive bool hbDisconnect chan HeartbeatDisconnect // disconnect parent due to heartbeat timeout @@ -198,7 +199,7 @@ func (s *subscriptions) sweep(exp time.Time) { s.hbActive = false hbErr := HeartbeatDisconnect{ Subscription: sub, - Error: fmt.Errorf("heartbeat disconnect on channel %d expired at %s (%s timeout)", sub.ChanID, sub.hbDeadline, s.hbTimeout), + Error: fmt.Errorf("heartbeat disconnect on channel %d expired at %s (%s timeout)", sub.ChanID, sub.hbDeadline, s.hbTimeout), } disconnects = append(disconnects, hbErr) } @@ -227,7 +228,6 @@ func (s *subscriptions) Close() { close(s.hbShutdown) } - // Reset clears all subscriptions assigned to the given socket ID, and returns // a slice of the existing subscriptions prior to reset func (s *subscriptions) ResetSocketSubscriptions(socketId SocketId) []*subscription { @@ -273,13 +273,13 @@ func (s *subscriptions) add(socketId SocketId, sub *SubscriptionRequest) *subscr return subscription } -func (s *subscriptions) removeByChannelID(chanID int64) error { +func (s *subscriptions) removeByChannelID(chanID int64) (*subscription, error) { s.lock.Lock() defer s.lock.Unlock() // remove from socketId map sub, ok := s.subsByChanID[chanID] if !ok { - return fmt.Errorf("could not find channel ID %d", chanID) + return nil, fmt.Errorf("could not find sub by channel ID %d", chanID) } delete(s.subsByChanID, chanID) delete(s.subsBySubID, sub.SubID()) @@ -287,7 +287,7 @@ func (s *subscriptions) removeByChannelID(chanID int64) error { if _, ok := s.subsBySocketId[sub.SocketId]; ok { s.subsBySocketId[sub.SocketId] = s.subsBySocketId[sub.SocketId].RemoveByChannelId(chanID) } - return nil + return sub, nil } // nolint:megacheck @@ -314,7 +314,7 @@ func (s *subscriptions) activate(subID string, chanID int64) error { if sub, ok := s.subsBySubID[subID]; ok { if chanID != 0 { - s.log.Infof("activated subscription %s %s for channel %d", sub.Request.Channel, sub.Request.Symbol, chanID) + s.log.Infof("activated subscription, channel: %s, symbol: %s, sub_id: %v, chan_id: %d", sub.Request.Channel, sub.Request.Symbol, subID, chanID) } sub.pending = false sub.ChanID = chanID From a3bee293ff7da07f6ef01b3c6d72b7af14fe4c94 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Fri, 10 Dec 2021 23:35:46 +0800 Subject: [PATCH 09/24] prevent further msg processing after checksum invalid --- v2/websocket/channels.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/v2/websocket/channels.go b/v2/websocket/channels.go index c60d7d900..529275b4d 100644 --- a/v2/websocket/channels.go +++ b/v2/websocket/channels.go @@ -50,6 +50,11 @@ func (c *Client) handleChannel(socketId SocketId, msg []byte) error { return err } c.subscriptions.heartbeat(chanID) + + if sub.Pending() { + return fmt.Errorf("subscription is pending, may be unsubscribed by invalid checksum, channel id: %v, sub p: %p, sub: %+v", chanID, sub, sub) + } + if sub.Public { switch data := raw[1].(type) { case string: @@ -90,10 +95,13 @@ func (c *Client) handleChecksumChannel(sub *subscription, checksum int) error { oChecksum := orderbook.Checksum() // compare bitfinex checksum with local checksum if bChecksum == oChecksum { - c.log.Debugf("Orderbook '%s' checksum verification successful.", symbol) + c.log.Debugf("Orderbook '%s' checksum verification successful, sub: %p.", symbol, sub) } else { - c.log.Warningf("Orderbook '%s' checksum is invalid, want %d, but got %d. Data Out of sync, resubscribing.", - symbol, bChecksum, oChecksum) + c.log.Warningf("Orderbook '%s' checksum is invalid, sub: %p, want %d, but got %d. Data Out of sync, resubscribing.", + sub, symbol, bChecksum, oChecksum) + + // TODO visibility of sub.pending may have issue + sub.pending = true err := c.sendUnsubscribeMessage(context.Background(), sub) if err != nil { return err From 78538f31f7134484b6b0b244fda0e523d304885b Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Sat, 11 Dec 2021 18:57:23 +0800 Subject: [PATCH 10/24] add IsClosed method to Socket --- v2/websocket/client.go | 7 +++++++ v2/websocket/transport.go | 27 ++++++++++++--------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/v2/websocket/client.go b/v2/websocket/client.go index 148b0f63a..03d65e9d5 100644 --- a/v2/websocket/client.go +++ b/v2/websocket/client.go @@ -78,6 +78,13 @@ type Socket struct { IsAuthenticated bool } +func (s *Socket) IsClosed() bool { + if v, ok := s.Asynchronous.(*ws); ok { + return v.IsClosed() + } + return false +} + // AsynchronousFactory provides an interface to re-create asynchronous transports during reconnect events. type AsynchronousFactory interface { Create() Asynchronous diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index 993166464..404ff0dcb 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/op/go-logging" @@ -38,6 +39,7 @@ func newWs(baseURL string, logTransport bool, log *logging.Logger) *ws { lock: &sync.RWMutex{}, createTime: time.Now(), writeChan: make(chan []byte, WS_WRITE_CAPACITY), + isClosed: 0, } } @@ -54,6 +56,8 @@ type ws struct { kill chan interface{} // signal to routines to kill quit chan error // signal to parent with error, if applicable + + isClosed uint32 } func (w *ws) Connect() error { @@ -137,23 +141,12 @@ func (w *ws) listenWriteChannel() { case <-w.kill: // ws closed return case message := <-w.writeChan: - wsWriter, err := w.ws.NextWriter(websocket.TextMessage) - if err != nil { - w.log.Error("Unable to provision ws connection writer: ", err) - w.stop(err) - return - } - _, err = wsWriter.Write(message) + err := w.ws.WriteMessage(websocket.TextMessage, message) if err != nil { w.log.Error("Unable to write to ws: ", err) w.stop(err) return } - if err := wsWriter.Close(); err != nil { - w.log.Error("Unable to close ws connection writer: ", err) - w.stop(err) - return - } } } } @@ -170,9 +163,7 @@ func (w *ws) listenWs() { default: _, msg, err := w.ws.ReadMessage() if err != nil { - if cl, ok := err.(*websocket.CloseError); ok { - w.log.Errorf("close error code: %d", cl.Code) - } + w.log.Errorf("ws read err: %s", err.Error()) // a read during normal shutdown results in an OpError: op on closed connection if _, ok := err.(*net.OpError); ok { // general read error on a closed network connection, OK @@ -202,6 +193,8 @@ func (w *ws) stop(err error) { w.lock.Lock() defer w.lock.Unlock() if w.ws != nil { + atomic.StoreUint32(&w.isClosed, 1) + close(w.kill) w.quit <- err // pass error back close(w.quit) // signal to parent listeners @@ -218,3 +211,7 @@ func (w *ws) stop(err error) { func (w *ws) Close() { w.stop(fmt.Errorf("transport connection Close called")) } + +func (w *ws) IsClosed() bool { + return atomic.LoadUint32(&w.isClosed) == 1 +} From af9eb18080cdd729a18da4994c211329741a23a3 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Thu, 16 Dec 2021 22:03:06 +0800 Subject: [PATCH 11/24] fix log print --- v2/websocket/channels.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/v2/websocket/channels.go b/v2/websocket/channels.go index 529275b4d..1982d3648 100644 --- a/v2/websocket/channels.go +++ b/v2/websocket/channels.go @@ -95,10 +95,10 @@ func (c *Client) handleChecksumChannel(sub *subscription, checksum int) error { oChecksum := orderbook.Checksum() // compare bitfinex checksum with local checksum if bChecksum == oChecksum { - c.log.Debugf("Orderbook '%s' checksum verification successful, sub: %p.", symbol, sub) + c.log.Debugf("Orderbook '%s' checksum verification successful, chan_id: %v, sub: %p.", symbol, sub.ChanID, sub) } else { - c.log.Warningf("Orderbook '%s' checksum is invalid, sub: %p, want %d, but got %d. Data Out of sync, resubscribing.", - sub, symbol, bChecksum, oChecksum) + c.log.Warningf("Orderbook '%s' checksum is invalid, sub: %p, chan_id: %v, want %d, but got %d. Data Out of sync, resubscribing.", + symbol, sub, sub.ChanID, bChecksum, oChecksum) // TODO visibility of sub.pending may have issue sub.pending = true From 119afa72f9034bf76e07a7823e05f3a3e6c9d892 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Sun, 26 Dec 2021 20:32:58 +0800 Subject: [PATCH 12/24] restv2 add movements support --- pkg/models/common/common.go | 5 ++ pkg/models/wallet/wallet.go | 110 ++++++++++++++++++++++++++++++++++++ v2/rest/wallet.go | 28 +++++++++ 3 files changed, 143 insertions(+) diff --git a/pkg/models/common/common.go b/pkg/models/common/common.go index 84cd62b2a..eb5790166 100644 --- a/pkg/models/common/common.go +++ b/pkg/models/common/common.go @@ -64,6 +64,9 @@ const ( FrequencyTwoPerSecond BookFrequency = "F1" // PriceLevelDefault provides a constant default price level for book subscriptions. PriceLevelDefault int = 25 + + Withdraw MovementType = "WITHDRAW" + Deposit MovementType = "DEPOSIT" ) var ( @@ -96,6 +99,8 @@ type StatusType string type OrderType string +type MovementType string + func CandleResolutionFromString(str string) (CandleResolution, error) { switch str { case string(OneMinute): diff --git a/pkg/models/wallet/wallet.go b/pkg/models/wallet/wallet.go index 0e5aa05bd..9ad08637a 100644 --- a/pkg/models/wallet/wallet.go +++ b/pkg/models/wallet/wallet.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/bitfinexcom/bitfinex-api-go/pkg/convert" + "github.com/bitfinexcom/bitfinex-api-go/pkg/models/common" ) type Wallet struct { @@ -79,3 +80,112 @@ func SnapshotFromRaw(raw []interface{}) (s *Snapshot, err error) { return } + +type Movement struct { + ID int64 + common.MovementType + Currency string + CurrencyName string + StartedAt int64 + LastUpdatedAt int64 + Status string + Amount float64 + Fee float64 + DestinationAddress string + AddrTag string + TransactionId string + WithdrawTransactionNote string +} + +type MovementSnapshot struct { + Snapshot []*Movement +} + +// MovementFromRaw ... +// [ +// ID, +// CURRENCY, +// CURRENCY_NAME, +// null, +// null, +// MTS_STARTED, +// MTS_UPDATED, +// null, +// null, +// STATUS, +// null, +// null, +// AMOUNT, +// FEES, +// null, +// null, +// DESTINATION_ADDRESS, +// null, +// null, +// null, +// TRANSACTION_ID, +// WITHDRAW_TRANSACTION_NOTE +// ] +// +func MovementFromRaw(raw []interface{}) (w *Movement, err error) { + defer func() { + if err1 := recover(); err1 != nil { + err = fmt.Errorf("parse movement err: %v", err1) + } + }() + + if len(raw) < 22 { + err = fmt.Errorf("data slice too short for movement: %#v", raw) + return + } + + w = &Movement{ + ID: convert.I64ValOrZero(raw[0]), + Currency: convert.SValOrEmpty(raw[1]), + CurrencyName: convert.SValOrEmpty(raw[2]), + StartedAt: convert.I64ValOrZero(raw[5]), + LastUpdatedAt: convert.I64ValOrZero(raw[6]), + Status: convert.SValOrEmpty(raw[9]), + Fee: -convert.F64ValOrZero(raw[13]), + DestinationAddress: convert.SValOrEmpty(raw[16]), + AddrTag: convert.SValOrEmpty(raw[17]), + TransactionId: convert.SValOrEmpty(raw[20]), + WithdrawTransactionNote: convert.SValOrEmpty(raw[21]), + } + + amount := convert.F64ValOrZero(raw[12]) + if amount > 0 { + w.MovementType = common.Deposit + w.Amount = amount + } else { + w.MovementType = common.Withdraw + w.Amount = -amount + } + + return +} + +func MovementSnapshotFromRaw(raw []interface{}) (s *MovementSnapshot, err error) { + if len(raw) == 0 { + return s, fmt.Errorf("data slice too short for movements: %#v", raw) + } + + movements := make([]*Movement, 0) + switch raw[0].(type) { + case []interface{}: + for _, v := range raw { + if l, ok := v.([]interface{}); ok { + w, err := MovementFromRaw(l) + if err != nil { + return s, err + } + movements = append(movements, w) + } + } + default: + return s, fmt.Errorf("not an movements snapshot") + } + + s = &MovementSnapshot{Snapshot: movements} + return +} diff --git a/v2/rest/wallet.go b/v2/rest/wallet.go index 4bcabeb9e..fa8c4fc77 100644 --- a/v2/rest/wallet.go +++ b/v2/rest/wallet.go @@ -1,6 +1,8 @@ package rest import ( + "context" + "fmt" "strconv" "github.com/bitfinexcom/bitfinex-api-go/pkg/models/common" @@ -106,3 +108,29 @@ func (ws *WalletService) Withdraw(wallet, method string, amount float64, address } return notification.FromRaw(raw) } + +// Movements view your past deposits/withdrawals. Currency can be specified to retrieve movements specific to that currency. +// See https://docs.bitfinex.com/reference#rest-auth-movements for more info +func (ws *WalletService) Movements(ctx context.Context, currency string, start, end int64, limit int) (*wallet.MovementSnapshot, error) { + body := map[string]interface{}{} + if start != 0 { + body["start"] = start + } + if end != 0 { + body["end"] = end + } + if limit != 0 { + body["limit"] = limit + } + + req, err := ws.requestFactory.NewAuthenticatedRequestWithData(common.PermissionRead, fmt.Sprintf("movements/%s/hist", currency), body) + if err != nil { + return nil, err + } + raw, err := ws.Request(req) + if err != nil { + return nil, err + } + + return wallet.MovementSnapshotFromRaw(raw) +} From 3b020a47262ca4f079bcf851fc08a078a16ed281 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Mon, 3 Jan 2022 18:04:41 +0800 Subject: [PATCH 13/24] fix ineffective heartbeat when resubscrption error on reconnect --- v2/websocket/subscriptions.go | 6 ++++-- v2/websocket/transport.go | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/v2/websocket/subscriptions.go b/v2/websocket/subscriptions.go index f5e798aec..66c2d7a99 100644 --- a/v2/websocket/subscriptions.go +++ b/v2/websocket/subscriptions.go @@ -194,12 +194,14 @@ func (s *subscriptions) sweep(exp time.Time) { return } disconnects := make([]HeartbeatDisconnect, 0) - for _, sub := range s.subsByChanID { + // use subsBySubID instead of subsByChanID to avoid ineffective heartbeat when re sub err on reconnect + // since subsByChanID is empty when subscription err + for _, sub := range s.subsBySubID { if exp.After(sub.hbDeadline) { s.hbActive = false hbErr := HeartbeatDisconnect{ Subscription: sub, - Error: fmt.Errorf("heartbeat disconnect on channel %d expired at %s (%s timeout)", sub.ChanID, sub.hbDeadline, s.hbTimeout), + Error: fmt.Errorf("sub %v heartbeat disconnect on channel %d expired at %s (%s timeout)", sub.SubID(), sub.ChanID, sub.hbDeadline, s.hbTimeout), } disconnects = append(disconnects, hbErr) } diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index 404ff0dcb..bbde98294 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -58,6 +58,8 @@ type ws struct { quit chan error // signal to parent with error, if applicable isClosed uint32 + + connStr string } func (w *ws) Connect() error { @@ -89,6 +91,8 @@ func (w *ws) Connect() error { // so we need to keep sending a message down the channel to stop // tcp killing the connection go w.keepAlivePinger() + + w.connStr = w.getConnStr() return nil } @@ -120,7 +124,7 @@ func (w *ws) Send(ctx context.Context, msg interface{}) error { return fmt.Errorf("websocket connection closed") default: } - w.log.Debug("ws->srv: %s", string(bs)) + w.log.Debug("%s ws->srv: %s", w.connStr, string(bs)) // push request into writer channel w.writeChan <- bs return nil @@ -143,7 +147,7 @@ func (w *ws) listenWriteChannel() { case message := <-w.writeChan: err := w.ws.WriteMessage(websocket.TextMessage, message) if err != nil { - w.log.Error("Unable to write to ws: ", err) + w.log.Error("%s Unable to write to ws: ", w.connStr, err) w.stop(err) return } @@ -163,7 +167,7 @@ func (w *ws) listenWs() { default: _, msg, err := w.ws.ReadMessage() if err != nil { - w.log.Errorf("ws read err: %s", err.Error()) + w.log.Errorf("%s ws read err: %s", w.connStr, err.Error()) // a read during normal shutdown results in an OpError: op on closed connection if _, ok := err.(*net.OpError); ok { // general read error on a closed network connection, OK @@ -173,7 +177,7 @@ func (w *ws) listenWs() { w.stop(err) return } - w.log.Debugf("srv->ws: %s", string(msg)) + w.log.Debugf("%s srv->ws: %s", w.connStr, string(msg)) w.lock.RLock() if w.downstream == nil { w.lock.RUnlock() @@ -215,3 +219,7 @@ func (w *ws) Close() { func (w *ws) IsClosed() bool { return atomic.LoadUint32(&w.isClosed) == 1 } + +func (w *ws) getConnStr() string { + return fmt.Sprintf("%s->%s", w.ws.LocalAddr().String(), w.ws.RemoteAddr().String()) +} From e5695951c5b1b77c90f082957932bfcd6bce2e51 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Mon, 3 Jan 2022 18:19:23 +0800 Subject: [PATCH 14/24] fix log print --- v2/websocket/transport.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index bbde98294..79159a168 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -124,7 +124,7 @@ func (w *ws) Send(ctx context.Context, msg interface{}) error { return fmt.Errorf("websocket connection closed") default: } - w.log.Debug("%s ws->srv: %s", w.connStr, string(bs)) + w.log.Debugf("%s ws->srv: %s", w.connStr, string(bs)) // push request into writer channel w.writeChan <- bs return nil @@ -147,7 +147,7 @@ func (w *ws) listenWriteChannel() { case message := <-w.writeChan: err := w.ws.WriteMessage(websocket.TextMessage, message) if err != nil { - w.log.Error("%s Unable to write to ws: ", w.connStr, err) + w.log.Errorf("%s Unable to write to ws: ", w.connStr, err) w.stop(err) return } From 5aca7264c7938ae00699ea4be29f78cfd1ba9634 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Tue, 4 Jan 2022 19:26:46 +0800 Subject: [PATCH 15/24] support GetOrderById --- v2/rest/orders.go | 63 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/v2/rest/orders.go b/v2/rest/orders.go index 5e66ed0fa..428da3775 100644 --- a/v2/rest/orders.go +++ b/v2/rest/orders.go @@ -51,14 +51,15 @@ func (s *OrderService) GetBySymbol(symbol string) (*order.Snapshot, error) { // Retrieve an active order by the given ID // See https://docs.bitfinex.com/reference#rest-auth-orders for more info -func (s *OrderService) GetByOrderId(orderID int64) (o *order.Order, err error) { - os, err := s.All() +func (s *OrderService) GetByOrderId(symbol string, orderID int64) (o *order.Order, err error) { + os, err := s.getActiveOrders(symbol, orderID) if err != nil { return nil, err } - for _, order := range os.Snapshot { - if order.ID == orderID { - return order, nil + + for _, od := range os.Snapshot { + if od.ID == orderID { + return od, nil } } return nil, common.ErrNotFound @@ -68,26 +69,26 @@ func (s *OrderService) GetByOrderId(orderID int64) (o *order.Order, err error) { // See https://docs.bitfinex.com/reference#orders-history for more info func (s *OrderService) AllHistory() (*order.Snapshot, error) { // use no symbol, this will get all orders - return s.getHistoricalOrders("") + return s.getHistoricalOrders("", 0, 0, 0) } // Retrieves all past orders with the given symbol // See https://docs.bitfinex.com/reference#orders-history for more info func (s *OrderService) GetHistoryBySymbol(symbol string) (*order.Snapshot, error) { // use no symbol, this will get all orders - return s.getHistoricalOrders(symbol) + return s.getHistoricalOrders(symbol, 0, 0, 0) } // Retrieve a single order in history with the given id // See https://docs.bitfinex.com/reference#orders-history for more info -func (s *OrderService) GetHistoryByOrderId(orderID int64) (o *order.Order, err error) { - os, err := s.AllHistory() +func (s *OrderService) GetHistoryByOrderId(symbol string, orderID int64) (o *order.Order, err error) { + os, err := s.getHistoricalOrders(symbol, 0, 0, 0, orderID) if err != nil { return nil, err } - for _, order := range os.Snapshot { - if order.ID == orderID { - return order, nil + for _, od := range os.Snapshot { + if od.ID == orderID { + return od, nil } } return nil, common.ErrNotFound @@ -108,8 +109,18 @@ func (s *OrderService) OrderTrades(symbol string, orderID int64) (*tradeexecutio return tradeexecutionupdate.SnapshotFromRaw(raw) } -func (s *OrderService) getActiveOrders(symbol string) (*order.Snapshot, error) { - req, err := s.requestFactory.NewAuthenticatedRequest(common.PermissionRead, path.Join("orders", symbol)) +func (s *OrderService) getActiveOrders(symbol string, ids ...int64) (*order.Snapshot, error) { + var req Request + var err error + if len(ids) > 0 { + data := map[string]interface{}{ + "id": ids, + } + req, err = s.requestFactory.NewAuthenticatedRequestWithData(common.PermissionRead, path.Join("orders", symbol), data) + } else { + req, err = s.requestFactory.NewAuthenticatedRequest(common.PermissionRead, path.Join("orders", symbol)) + } + if err != nil { return nil, err } @@ -127,8 +138,28 @@ func (s *OrderService) getActiveOrders(symbol string) (*order.Snapshot, error) { return os, nil } -func (s *OrderService) getHistoricalOrders(symbol string) (*order.Snapshot, error) { - req, err := s.requestFactory.NewAuthenticatedRequest(common.PermissionRead, path.Join("orders", symbol, "hist")) +func (s *OrderService) getHistoricalOrders(symbol string, start, end int64, limit int, ids ...int64) (*order.Snapshot, error) { + data := map[string]interface{}{} + if start != 0 { + data["start"] = start + } + if end != 0 { + data["end"] = end + } + if limit != 0 { + data["limit"] = limit + } + if len(ids) > 0 { + data["id"] = ids + } + + var req Request + var err error + if len(data) > 0 { + req, err = s.requestFactory.NewAuthenticatedRequestWithData(common.PermissionRead, path.Join("orders", symbol, "hist"), data) + } else { + req, err = s.requestFactory.NewAuthenticatedRequest(common.PermissionRead, path.Join("orders", symbol, "hist")) + } if err != nil { return nil, err } From 482c87fb260f29969fef0c45f11ae7f5bb2a57aa Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Fri, 7 Jan 2022 11:43:52 +0800 Subject: [PATCH 16/24] print unwritten msg when ws conn broken --- v2/websocket/transport.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index 79159a168..ce31dc678 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -121,6 +121,7 @@ func (w *ws) Send(ctx context.Context, msg interface{}) error { case <-ctx.Done(): return ctx.Err() case <-w.kill: // ws closed + w.log.Debugf("%s want write to closed ws: %s", w.connStr, string(bs)) return fmt.Errorf("websocket connection closed") default: } @@ -143,11 +144,18 @@ func (w *ws) listenWriteChannel() { select { case <-w.kill: // ws closed + unWriteNum := len(w.writeChan) + if unWriteNum > 0 { + for i := 0; i < unWriteNum; i++ { + m := <-w.writeChan + w.log.Errorf("%s ws killed with unwritten msg: %v", w.connStr, string(m)) + } + } return case message := <-w.writeChan: err := w.ws.WriteMessage(websocket.TextMessage, message) if err != nil { - w.log.Errorf("%s Unable to write to ws: ", w.connStr, err) + w.log.Errorf("%s Unable to write to ws: %v", w.connStr, err) w.stop(err) return } From 3796664c742ea9abd93ef8eb9b2297f2a6f33e47 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Fri, 7 Jan 2022 11:55:57 +0800 Subject: [PATCH 17/24] print unwritten msg when ws conn broken --- v2/websocket/transport.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index ce31dc678..614947407 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -144,25 +144,31 @@ func (w *ws) listenWriteChannel() { select { case <-w.kill: // ws closed - unWriteNum := len(w.writeChan) - if unWriteNum > 0 { - for i := 0; i < unWriteNum; i++ { - m := <-w.writeChan - w.log.Errorf("%s ws killed with unwritten msg: %v", w.connStr, string(m)) - } - } + w.printUnWrittenMsg() return case message := <-w.writeChan: err := w.ws.WriteMessage(websocket.TextMessage, message) if err != nil { w.log.Errorf("%s Unable to write to ws: %v", w.connStr, err) w.stop(err) + w.printUnWrittenMsg() return } } } } +func (w *ws) printUnWrittenMsg() { + unWriteNum := len(w.writeChan) + if unWriteNum == 0 { + return + } + for i := 0; i < unWriteNum; i++ { + m := <-w.writeChan + w.log.Errorf("%s ws killed with %d of %d unwritten msg: %v", w.connStr, i, unWriteNum, string(m)) + } +} + // listen on ws & fwd to listen() func (w *ws) listenWs() { for { From f336e5fd30172d468a37571337102cd144adf7d0 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Fri, 7 Jan 2022 21:20:16 +0800 Subject: [PATCH 18/24] GetHistoryBySymbol add more parameter support --- v2/rest/orders.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/rest/orders.go b/v2/rest/orders.go index 428da3775..1811999bc 100644 --- a/v2/rest/orders.go +++ b/v2/rest/orders.go @@ -74,9 +74,9 @@ func (s *OrderService) AllHistory() (*order.Snapshot, error) { // Retrieves all past orders with the given symbol // See https://docs.bitfinex.com/reference#orders-history for more info -func (s *OrderService) GetHistoryBySymbol(symbol string) (*order.Snapshot, error) { +func (s *OrderService) GetHistoryBySymbol(symbol string, startMs, endMs int64, limit int) (*order.Snapshot, error) { // use no symbol, this will get all orders - return s.getHistoricalOrders(symbol, 0, 0, 0) + return s.getHistoricalOrders(symbol, startMs, endMs, limit) } // Retrieve a single order in history with the given id From 49c67d1c8eb3b17e9fc23bf72acd90f538ca5e2e Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Thu, 13 Jan 2022 18:08:27 +0800 Subject: [PATCH 19/24] fix some bug --- v2/websocket/api.go | 23 ----------------------- v2/websocket/client.go | 15 ++++++++++----- v2/websocket/subscriptions.go | 4 +++- v2/websocket/transport.go | 8 +------- 4 files changed, 14 insertions(+), 36 deletions(-) diff --git a/v2/websocket/api.go b/v2/websocket/api.go index 253094913..e977349e8 100644 --- a/v2/websocket/api.go +++ b/v2/websocket/api.go @@ -28,29 +28,6 @@ func (c *Client) Send(ctx context.Context, msg interface{}) error { return socket.Asynchronous.Send(ctx, msg) } -// Submit a request to enable the given flag -func (c *Client) EnableFlag(ctx context.Context, flag int) (string, error) { - req := &FlagRequest{ - Event: "conf", - Flags: flag, - } - // TODO enable flag on reconnect? - // create sublist to stop concurrent map read - socks := make([]*Socket, len(c.sockets)) - c.mtx.RLock() - for i, socket := range c.sockets { - socks[i] = socket - } - c.mtx.RUnlock() - for _, socket := range socks { - err := socket.Asynchronous.Send(ctx, req) - if err != nil { - return "", err - } - } - return "", nil -} - // Gen the count of currently active websocket connections func (c *Client) ConnectionCount() int { c.mtx.RLock() diff --git a/v2/websocket/client.go b/v2/websocket/client.go index 03d65e9d5..0fc7de424 100644 --- a/v2/websocket/client.go +++ b/v2/websocket/client.go @@ -493,24 +493,29 @@ func (c *Client) checkResubscription(socketId SocketId) { if c.parameters.ManageOrderbook { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, err_flag := c.EnableFlag(ctx, common.Checksum) - if err_flag != nil { - c.log.Errorf("could not enable checksum flag %s ", err_flag) + req := &FlagRequest{ + Event: "conf", + Flags: common.Checksum, + } + if err := socket.Asynchronous.Send(ctx, req); err != nil { + c.log.Errorf("socket(%d) could not enable checksum flag %s ", socket.Id, err) } } + if c.parameters.ResubscribeOnReconnect && socket.ResetSubscriptions != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() for _, sub := range socket.ResetSubscriptions { if sub.Request.Event == "auth" { continue } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() sub.Request.SubID = c.nonce.GetNonce() // new nonce c.log.Infof("socket (id=%d) resubscribing to %s with nonce %s", socket.Id, sub.Request.String(), sub.Request.SubID) _, err := c.subscribeBySocket(ctx, socket, sub.Request) if err != nil { c.log.Errorf("could not resubscribe: %s", err.Error()) } + time.Sleep(50 * time.Millisecond) } socket.ResetSubscriptions = nil } diff --git a/v2/websocket/subscriptions.go b/v2/websocket/subscriptions.go index 66c2d7a99..e5e38f65f 100644 --- a/v2/websocket/subscriptions.go +++ b/v2/websocket/subscriptions.go @@ -193,12 +193,14 @@ func (s *subscriptions) sweep(exp time.Time) { s.lock.RUnlock() return } + disconnects := make([]HeartbeatDisconnect, 0) // use subsBySubID instead of subsByChanID to avoid ineffective heartbeat when re sub err on reconnect // since subsByChanID is empty when subscription err for _, sub := range s.subsBySubID { if exp.After(sub.hbDeadline) { - s.hbActive = false + // 22-01-13, do not change hbActive to false on heartbeat timeout, so we always heartbeat after first successful conn + // s.hbActive = false hbErr := HeartbeatDisconnect{ Subscription: sub, Error: fmt.Errorf("sub %v heartbeat disconnect on channel %d expired at %s (%s timeout)", sub.SubID(), sub.ChanID, sub.hbDeadline, s.hbTimeout), diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index 614947407..f91d097eb 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "encoding/json" "fmt" - "net" "net/http" "sync" "sync/atomic" @@ -182,16 +181,11 @@ func (w *ws) listenWs() { _, msg, err := w.ws.ReadMessage() if err != nil { w.log.Errorf("%s ws read err: %s", w.connStr, err.Error()) - // a read during normal shutdown results in an OpError: op on closed connection - if _, ok := err.(*net.OpError); ok { - // general read error on a closed network connection, OK - return - } - w.stop(err) return } w.log.Debugf("%s srv->ws: %s", w.connStr, string(msg)) + w.lock.RLock() if w.downstream == nil { w.lock.RUnlock() From 250a90131a428e1650328fa70624625c79690b3e Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Thu, 13 Jan 2022 20:19:05 +0800 Subject: [PATCH 20/24] add some log --- v2/websocket/subscriptions.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/v2/websocket/subscriptions.go b/v2/websocket/subscriptions.go index e5e38f65f..dfe52cee8 100644 --- a/v2/websocket/subscriptions.go +++ b/v2/websocket/subscriptions.go @@ -194,6 +194,8 @@ func (s *subscriptions) sweep(exp time.Time) { return } + s.log.Debugf("begin to sweep, subs len: %v", len(s.subsBySubID)) + disconnects := make([]HeartbeatDisconnect, 0) // use subsBySubID instead of subsByChanID to avoid ineffective heartbeat when re sub err on reconnect // since subsByChanID is empty when subscription err @@ -269,6 +271,7 @@ func (s *subscriptions) add(socketId SocketId, sub *SubscriptionRequest) *subscr s.lock.Lock() defer s.lock.Unlock() subscription := newSubscription(socketId, sub) + subscription.hbDeadline = time.Now().Add(s.hbTimeout) s.subsBySubID[sub.SubID] = subscription if _, ok := s.subsBySocketId[socketId]; !ok { s.subsBySocketId[socketId] = make(SubscriptionSet, 0) From bfa7d26c1b4b55cacb38478f2f30ff4a6d17ffc1 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Sat, 12 Feb 2022 17:26:17 +0800 Subject: [PATCH 21/24] add dirty data log when checksum is valid --- v2/websocket/channels.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/v2/websocket/channels.go b/v2/websocket/channels.go index 1982d3648..daabed0e3 100644 --- a/v2/websocket/channels.go +++ b/v2/websocket/channels.go @@ -96,6 +96,10 @@ func (c *Client) handleChecksumChannel(sub *subscription, checksum int) error { // compare bitfinex checksum with local checksum if bChecksum == oChecksum { c.log.Debugf("Orderbook '%s' checksum verification successful, chan_id: %v, sub: %p.", symbol, sub.ChanID, sub) + bids, asks := orderbook.BidsAndAsks() + if len(bids) > 0 && len(asks) > 0 && bids[0].Price >= asks[0].Price { + c.log.Errorf("Orderbook '%s' checksum verification successful with dirty data, chan_id: %v, sub: %p.", symbol, sub.ChanID, sub) + } } else { c.log.Warningf("Orderbook '%s' checksum is invalid, sub: %p, chan_id: %v, want %d, but got %d. Data Out of sync, resubscribing.", symbol, sub, sub.ChanID, bChecksum, oChecksum) From 571200e375cd89fdcf978d68d19b2b587e9a0df7 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Thu, 17 Feb 2022 16:48:33 +0800 Subject: [PATCH 22/24] tiny change --- v2/websocket/transport.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index f91d097eb..979c58286 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -84,6 +84,8 @@ func (w *ws) Connect() error { return err } w.ws = ws + w.connStr = w.getConnStr() + go w.listenWriteChannel() go w.listenWs() // Gorilla/go dont natively support keep alive pinging @@ -91,7 +93,6 @@ func (w *ws) Connect() error { // tcp killing the connection go w.keepAlivePinger() - w.connStr = w.getConnStr() return nil } From e6460a450ffae7bb4725c126c44ba3ecbb5b01f7 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Thu, 17 Feb 2022 23:37:46 +0800 Subject: [PATCH 23/24] catche all panic --- pkg/utils/routine.go | 31 +++++++++++++++++++++++++++++++ v2/websocket/client.go | 20 ++++++++++++-------- v2/websocket/subscriptions.go | 4 +++- v2/websocket/transport.go | 9 ++++++--- 4 files changed, 52 insertions(+), 12 deletions(-) create mode 100644 pkg/utils/routine.go diff --git a/pkg/utils/routine.go b/pkg/utils/routine.go new file mode 100644 index 000000000..66a132719 --- /dev/null +++ b/pkg/utils/routine.go @@ -0,0 +1,31 @@ +package utils + +import ( + "fmt" + "os" + "runtime/debug" + "syscall" +) + +var PanicMessage string + +func GoWithRecover(f func()) { + go func() { + defer func() { + if err := recover(); err != nil { + PanicMessage = fmt.Sprintf("panic: %v\n%v\n", err, string(debug.Stack())) + _, _ = fmt.Fprint(os.Stderr, PanicMessage) + + if err := os.Stderr.Sync(); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "sync stdout err: %v\n", err.Error()) + } + + if err := syscall.Kill(os.Getpid(), syscall.SIGUSR1); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "send signal err: %v", err) + } + } + }() + + f() + }() +} diff --git a/v2/websocket/client.go b/v2/websocket/client.go index 0fc7de424..62ff5125b 100644 --- a/v2/websocket/client.go +++ b/v2/websocket/client.go @@ -223,7 +223,7 @@ func NewWithParamsAsyncFactoryNonce(params *Parameters, async AsynchronousFactor func (c *Client) Connect() error { c.dumpParams() c.terminal = false - go c.listenDisconnect() + utils.GoWithRecover(c.listenDisconnect) return c.connectSocket(SocketId(len(c.sockets))) } @@ -257,10 +257,12 @@ func (c *Client) Close() { if socket.IsConnected { wg.Add(1) socket.IsConnected = false - go func(s *Socket) { - c.closeAsyncAndWait(s, c.parameters.ShutdownTimeout) - wg.Done() - }(socket) + utils.GoWithRecover(func() { + func(s *Socket) { + c.closeAsyncAndWait(s, c.parameters.ShutdownTimeout) + wg.Done() + }(socket) + }) } } wg.Wait() @@ -292,14 +294,14 @@ func (c *Client) listenDisconnect() { c.log.Infof("restarting socket (id=%d) connection", socket.Id) socket.IsConnected = false // reconnect to the socket - go func() { + utils.GoWithRecover(func() { c.closeAsyncAndWait(socket, c.parameters.ShutdownTimeout) err := c.reconnect(socket, hbErr.Error) if err != nil { c.log.Warningf("socket disconnect: %s", err.Error()) return } - }() + }) } } c.mtx.Unlock() @@ -400,7 +402,9 @@ func (c *Client) connectSocket(socketId SocketId) error { return err } socket.IsConnected = true - go c.listenUpstream(socket) + utils.GoWithRecover(func() { + c.listenUpstream(socket) + }) return nil } diff --git a/v2/websocket/subscriptions.go b/v2/websocket/subscriptions.go index dfe52cee8..727bbec52 100644 --- a/v2/websocket/subscriptions.go +++ b/v2/websocket/subscriptions.go @@ -7,6 +7,8 @@ import ( "time" "github.com/op/go-logging" + + "github.com/bitfinexcom/bitfinex-api-go/pkg/utils" ) type SubscriptionRequest struct { @@ -113,7 +115,7 @@ func newSubscriptions(heartbeatTimeout time.Duration, log *logging.Logger) *subs log: log, lock: &sync.RWMutex{}, } - go subs.control() + utils.GoWithRecover(subs.control) return subs } diff --git a/v2/websocket/transport.go b/v2/websocket/transport.go index 979c58286..01712ff2c 100644 --- a/v2/websocket/transport.go +++ b/v2/websocket/transport.go @@ -13,6 +13,8 @@ import ( "github.com/op/go-logging" "github.com/gorilla/websocket" + + "github.com/bitfinexcom/bitfinex-api-go/pkg/utils" ) // size of channel that the websocket writer @@ -86,12 +88,12 @@ func (w *ws) Connect() error { w.ws = ws w.connStr = w.getConnStr() - go w.listenWriteChannel() - go w.listenWs() + utils.GoWithRecover(w.listenWriteChannel) + utils.GoWithRecover(w.listenWs) // Gorilla/go dont natively support keep alive pinging // so we need to keep sending a message down the channel to stop // tcp killing the connection - go w.keepAlivePinger() + utils.GoWithRecover(w.keepAlivePinger) return nil } @@ -137,6 +139,7 @@ func (w *ws) Done() <-chan error { // listen for write requests and perform them func (w *ws) listenWriteChannel() { + for { if w.ws == nil { return From d7133b828ff91311a5cb57c4fc6b1e83f69128b1 Mon Sep 17 00:00:00 2001 From: mm2175 <30251661+mm2175@users.noreply.github.com> Date: Wed, 9 Mar 2022 21:49:18 +0800 Subject: [PATCH 24/24] add support for channel filter --- v2/websocket/client.go | 5 +++-- v2/websocket/parameters.go | 29 ++++++++++++++++------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/v2/websocket/client.go b/v2/websocket/client.go index 62ff5125b..9a3beb542 100644 --- a/v2/websocket/client.go +++ b/v2/websocket/client.go @@ -373,6 +373,7 @@ func (c *Client) dumpParams() { c.log.Debugf("HeartbeatTimeout=%s", c.parameters.HeartbeatTimeout) c.log.Debugf("URL=%s", c.parameters.URL) c.log.Debugf("ManageOrderbook=%t", c.parameters.ManageOrderbook) + c.log.Debugf("ChannelFilter=%t", c.parameters.ChannelFilter) } func (c *Client) connectSocket(socketId SocketId) error { @@ -576,7 +577,7 @@ func (c *Client) hasCredentials() bool { // Authenticate creates the payload for the authentication request and sends it // to the API. The filters will be applied to the authenticated channel, i.e. // only subscribe to the filtered messages. -func (c *Client) authenticate(ctx context.Context, socketId SocketId, filter ...string) error { +func (c *Client) authenticate(ctx context.Context, socketId SocketId) error { nonce := c.nonce.GetNonce() payload := "AUTH" + nonce sig, err := c.sign(payload) @@ -589,7 +590,7 @@ func (c *Client) authenticate(ctx context.Context, socketId SocketId, filter ... AuthSig: sig, AuthPayload: payload, AuthNonce: nonce, - Filter: filter, + Filter: c.parameters.ChannelFilter, SubID: nonce, } if c.cancelOnDisconnect { diff --git a/v2/websocket/parameters.go b/v2/websocket/parameters.go index 20ee969fd..c9ab274f6 100644 --- a/v2/websocket/parameters.go +++ b/v2/websocket/parameters.go @@ -1,27 +1,30 @@ package websocket import ( - "github.com/op/go-logging" "time" + + "github.com/op/go-logging" ) // Parameters defines adapter behavior. type Parameters struct { - AutoReconnect bool - ReconnectInterval time.Duration - ReconnectAttempts int - reconnectTry int - ShutdownTimeout time.Duration - CapacityPerConnection int - Logger *logging.Logger + AutoReconnect bool + ReconnectInterval time.Duration + ReconnectAttempts int + reconnectTry int + ShutdownTimeout time.Duration + CapacityPerConnection int + Logger *logging.Logger ResubscribeOnReconnect bool - HeartbeatTimeout time.Duration - LogTransport bool + HeartbeatTimeout time.Duration + LogTransport bool + + URL string + ManageOrderbook bool - URL string - ManageOrderbook bool + ChannelFilter []string } func NewDefaultParameters() *Parameters { @@ -36,7 +39,7 @@ func NewDefaultParameters() *Parameters { ShutdownTimeout: time.Second * 5, ResubscribeOnReconnect: true, HeartbeatTimeout: time.Second * 30, - LogTransport: false, // log transport send/recv + LogTransport: false, // log transport send/recv Logger: logging.MustGetLogger("bitfinex-ws"), } }