@@ -3,10 +3,14 @@ use crate::localstorage::MutinyBrowserStorage;
33use crate :: wallet:: { esplora_from_network, MutinyWallet } ;
44use bdk:: blockchain:: EsploraBlockchain ;
55use bitcoin:: Network ;
6+ use futures:: StreamExt ;
7+ use gloo_net:: websocket:: Message ;
68use lightning:: chain:: { chainmonitor, Filter } ;
9+ use lightning:: ln:: msgs:: NetAddress ;
710use std:: net:: { SocketAddr , ToSocketAddrs } ;
811use std:: str:: FromStr ;
912use std:: sync:: Arc ;
13+ use wasm_bindgen_futures:: spawn_local;
1014
1115use crate :: tcpproxy:: { SocketDescriptor , TcpProxy } ;
1216use crate :: {
@@ -21,10 +25,10 @@ use bitcoin::secp256k1::PublicKey;
2125use lightning:: chain:: keysinterface:: { InMemorySigner , KeysInterface , KeysManager , Recipient } ;
2226use lightning:: ln:: peer_handler:: {
2327 ErroringMessageHandler , IgnoringMessageHandler , MessageHandler as LdkMessageHandler ,
24- PeerManager as LdkPeerManager ,
28+ PeerManager as LdkPeerManager , SocketDescriptor as LdkSocketDescriptor ,
2529} ;
2630use lightning:: routing:: gossip;
27- use log:: info;
31+ use log:: { debug , error , info, warn } ;
2832
2933pub ( crate ) type NetworkGraph = gossip:: NetworkGraph < Arc < MutinyLogger > > ;
3034
@@ -138,9 +142,14 @@ impl Node {
138142 }
139143 } ;
140144
141- if connect_peer_if_necessary ( websocket_proxy_addr, pubkey, peer_addr)
142- . await
143- . is_err ( )
145+ if connect_peer_if_necessary (
146+ websocket_proxy_addr,
147+ pubkey,
148+ peer_addr,
149+ self . peer_manager . clone ( ) ,
150+ )
151+ . await
152+ . is_err ( )
144153 {
145154 Err ( MutinyError :: PeerInfoParseFailed )
146155 . with_context ( || format ! ( "could not connect to peer: {pubkey}" ) ) ?
@@ -152,10 +161,10 @@ impl Node {
152161
153162pub ( crate ) async fn connect_peer_if_necessary (
154163 websocket_proxy_addr : String ,
155- _pubkey : PublicKey ,
164+ pubkey : PublicKey ,
156165 peer_addr : SocketAddr ,
157- // peer_manager: Arc<PeerManager>,
158- ) -> Result < ( ) , ( ) > {
166+ peer_manager : Arc < PeerManager > ,
167+ ) -> Result < ( ) , MutinyError > {
159168 // TODO add this when the peer manager is ready
160169 /*
161170 for node_pubkey in peer_manager.get_peer_node_ids() {
@@ -166,18 +175,68 @@ pub(crate) async fn connect_peer_if_necessary(
166175 */
167176
168177 // first make a connection to the node
169- let tcp_proxy = TcpProxy :: new ( websocket_proxy_addr, peer_addr) . await ;
170-
171- // TODO remove the test send
172- tcp_proxy. send ( String :: from ( "test\n " ) . into_bytes ( ) . to_vec ( ) ) ;
173-
174- // TODO then give that connection to the peer manager
178+ let tcp_proxy = Arc :: new ( TcpProxy :: new ( websocket_proxy_addr, peer_addr) . await ) ;
179+ let mut descriptor = SocketDescriptor :: new ( tcp_proxy) ;
180+
181+ // then give that connection to the peer manager
182+ let initial_bytes = peer_manager. new_outbound_connection (
183+ pubkey,
184+ descriptor. clone ( ) ,
185+ Some ( get_net_addr_from_socket ( peer_addr) ) ,
186+ ) ?;
187+
188+ let sent_bytes = descriptor. send_data ( & initial_bytes, true ) ;
189+ debug ! ( "sent {sent_bytes} to node: {pubkey}" ) ;
190+
191+ // schedule a reader on the connection
192+ let mut new_descriptor = descriptor. clone ( ) ;
193+ spawn_local ( async move {
194+ while let Some ( msg) = descriptor. conn . read . lock ( ) . await . next ( ) . await {
195+ if let Ok ( msg_contents) = msg {
196+ match msg_contents {
197+ Message :: Text ( t) => {
198+ warn ! (
199+ "received text from websocket when we should only receive binary: {}" ,
200+ t
201+ )
202+ }
203+ Message :: Bytes ( b) => {
204+ debug ! ( "received binary data from websocket" ) ;
205+
206+ let read_res = peer_manager. read_event ( & mut new_descriptor, & b) ;
207+ match read_res {
208+ // TODO handle read boolean event
209+ Ok ( _read_bool) => {
210+ debug ! ( "read event from the node" ) ;
211+ peer_manager. process_events ( ) ;
212+ }
213+ Err ( e) => error ! ( "got an error reading event: {}" , e) ,
214+ }
215+ }
216+ } ;
217+ }
218+ }
175219
176- // TODO then schedule a reader on the connection
220+ // TODO when we detect an error, lock the writes and close connection.
221+ debug ! ( "WebSocket Closed" )
222+ } ) ;
177223
178224 Ok ( ( ) )
179225}
180226
227+ fn get_net_addr_from_socket ( socket_addr : SocketAddr ) -> NetAddress {
228+ match socket_addr {
229+ SocketAddr :: V4 ( sockaddr) => NetAddress :: IPv4 {
230+ addr : sockaddr. ip ( ) . octets ( ) ,
231+ port : sockaddr. port ( ) ,
232+ } ,
233+ SocketAddr :: V6 ( sockaddr) => NetAddress :: IPv6 {
234+ addr : sockaddr. ip ( ) . octets ( ) ,
235+ port : sockaddr. port ( ) ,
236+ } ,
237+ }
238+ }
239+
181240pub ( crate ) fn create_peer_manager (
182241 km : Arc < KeysManager > ,
183242 lightning_msg_handler : MessageHandler ,
0 commit comments