diff --git a/shinkai-bin/shinkai-node/src/network/node.rs b/shinkai-bin/shinkai-node/src/network/node.rs index edf9f5c3b..06372a819 100644 --- a/shinkai-bin/shinkai-node/src/network/node.rs +++ b/shinkai-bin/shinkai-node/src/network/node.rs @@ -716,73 +716,111 @@ impl Node { &format!("{} > TCP: Starting listen and reconnect loop.", self.listen_address), ); - let mut retry_count = 0; + let identity_manager = self.identity_manager.clone(); + let network_job_manager = self.network_job_manager.clone(); + let node_name = self.node_name.clone(); + let identity_secret_key = self.identity_secret_key.clone(); + let proxy_connection_info_clone = proxy_connection_info.clone(); - loop { - // let listen_address = self.listen_address; - let identity_manager = self.identity_manager.clone(); - let network_job_manager = self.network_job_manager.clone(); - // let conn_limiter = self.conn_limiter.clone(); - let node_name = self.node_name.clone(); - let identity_secret_key = self.identity_secret_key.clone(); - - let proxy_info = { - let proxy_info_lock = proxy_connection_info.lock().await; - proxy_info_lock.clone() - }; + // Spawn proxy connection attempts so startup can continue in parallel + tokio::spawn(async move { + let mut retry_count = 0; + loop { + let proxy_info = { + let proxy_info_lock = proxy_connection_info_clone.lock().await; + proxy_info_lock.clone() + }; - if let Some(proxy_info) = proxy_info { - let connection_result = Node::establish_proxy_connection( - identity_manager.clone(), - &proxy_info, - node_name, - identity_secret_key, - ) - .await; + if let Some(proxy_info) = proxy_info { + let connection_result = Node::establish_proxy_connection( + identity_manager.clone(), + &proxy_info, + node_name.clone(), + identity_secret_key.clone(), + ) + .await; - match connection_result { - Ok(Some((reader, writer))) => { - let _ = Self::handle_proxy_listen_connection( - reader, - writer, - proxy_info.proxy_identity.clone(), - proxy_connection_info.clone(), - network_job_manager.clone(), - identity_manager.clone(), - ) - .await; - } - Ok(None) | Err(_) => { - // Increment retry count and determine sleep duration - retry_count += 1; - let sleep_duration = match retry_count { - 1 => Duration::from_secs(5), - 2 => Duration::from_secs(10), - 3 => Duration::from_secs(30), - _ => Duration::from_secs(300), // 5 minutes - }; - - tokio::time::sleep(sleep_duration).await; + match connection_result { + Ok(Some((reader, writer))) => { + let _ = Self::handle_proxy_listen_connection( + reader, + writer, + proxy_info.proxy_identity.clone(), + proxy_connection_info_clone.clone(), + network_job_manager.clone(), + identity_manager.clone(), + ) + .await; + } + Ok(None) => { + shinkai_log( + ShinkaiLogOption::Node, + ShinkaiLogLevel::Error, + "Failed to establish proxy connection: returned None", + ); + { + let mut info = proxy_connection_info_clone.lock().await; + *info = None; + } + break; + } + Err(e) => { + shinkai_log( + ShinkaiLogOption::Node, + ShinkaiLogLevel::Error, + &format!("Failed to establish proxy connection: {}", e), + ); + { + let mut info = proxy_connection_info_clone.lock().await; + *info = None; + } + break; + } } + } else { + break; + } + + // Increment retry count and determine sleep duration + retry_count += 1; + let sleep_duration = match retry_count { + 1 => Duration::from_secs(5), + 2 => Duration::from_secs(10), + 3 => Duration::from_secs(30), + _ => Duration::from_secs(300), // 5 minutes }; - } else { - break; + + tokio::time::sleep(sleep_duration).await; } - } + }); - // Execute direct listening if no proxy was ever connected - let result = Self::handle_listen_connection( - self.listen_address, - self.network_job_manager.clone(), - self.conn_limiter.clone(), - self.node_name.clone(), - ) - .await; - shinkai_log( - ShinkaiLogOption::Node, - ShinkaiLogLevel::Error, - &format!("{} > TCP: Listening error {:?}", self.listen_address, result), - ); + // Execute direct listening in a separate task while the proxy task runs + // in the background so that startup isn't blocked if either fails. + let listen_address = self.listen_address; + let network_job_manager = self.network_job_manager.clone(); + let conn_limiter = self.conn_limiter.clone(); + let node_name = self.node_name.clone(); + + tokio::spawn(async move { + let result = Self::handle_listen_connection( + listen_address, + network_job_manager, + conn_limiter, + node_name, + ) + .await; + + shinkai_log( + ShinkaiLogOption::Node, + ShinkaiLogLevel::Error, + &format!("{} > TCP: Listening error {:?}", listen_address, result), + ); + }); + + // Keep this future alive indefinitely. The listening task above will + // run until cancelled, ensuring the select! loop in `start` does not + // terminate early. + futures::future::pending::<()>().await; } async fn establish_proxy_connection(