Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 98 additions & 60 deletions shinkai-bin/shinkai-node/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,73 +716,111 @@ impl Node {
&format!("{} > TCP: Starting listen and reconnect loop.", self.listen_address),
);

let mut retry_count = 0;
let identity_manager = self.identity_manager.clone();
let network_job_manager = self.network_job_manager.clone();
let node_name = self.node_name.clone();
let identity_secret_key = self.identity_secret_key.clone();
let proxy_connection_info_clone = proxy_connection_info.clone();

loop {
// let listen_address = self.listen_address;
let identity_manager = self.identity_manager.clone();
let network_job_manager = self.network_job_manager.clone();
// let conn_limiter = self.conn_limiter.clone();
let node_name = self.node_name.clone();
let identity_secret_key = self.identity_secret_key.clone();

let proxy_info = {
let proxy_info_lock = proxy_connection_info.lock().await;
proxy_info_lock.clone()
};
// Spawn proxy connection attempts so startup can continue in parallel
tokio::spawn(async move {
let mut retry_count = 0;
loop {
let proxy_info = {
let proxy_info_lock = proxy_connection_info_clone.lock().await;
proxy_info_lock.clone()
};

if let Some(proxy_info) = proxy_info {
let connection_result = Node::establish_proxy_connection(
identity_manager.clone(),
&proxy_info,
node_name,
identity_secret_key,
)
.await;
if let Some(proxy_info) = proxy_info {
let connection_result = Node::establish_proxy_connection(
identity_manager.clone(),
&proxy_info,
node_name.clone(),
identity_secret_key.clone(),
)
.await;

match connection_result {
Ok(Some((reader, writer))) => {
let _ = Self::handle_proxy_listen_connection(
reader,
writer,
proxy_info.proxy_identity.clone(),
proxy_connection_info.clone(),
network_job_manager.clone(),
identity_manager.clone(),
)
.await;
}
Ok(None) | Err(_) => {
// Increment retry count and determine sleep duration
retry_count += 1;
let sleep_duration = match retry_count {
1 => Duration::from_secs(5),
2 => Duration::from_secs(10),
3 => Duration::from_secs(30),
_ => Duration::from_secs(300), // 5 minutes
};

tokio::time::sleep(sleep_duration).await;
match connection_result {
Ok(Some((reader, writer))) => {
let _ = Self::handle_proxy_listen_connection(
reader,
writer,
proxy_info.proxy_identity.clone(),
proxy_connection_info_clone.clone(),
network_job_manager.clone(),
identity_manager.clone(),
)
.await;
}
Ok(None) => {
shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Error,
"Failed to establish proxy connection: returned None",
);
{
let mut info = proxy_connection_info_clone.lock().await;
*info = None;
}
break;
}
Err(e) => {
shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Error,
&format!("Failed to establish proxy connection: {}", e),
);
{
let mut info = proxy_connection_info_clone.lock().await;
*info = None;
}
break;
}
}
} else {
break;
}

// Increment retry count and determine sleep duration
retry_count += 1;
let sleep_duration = match retry_count {
1 => Duration::from_secs(5),
2 => Duration::from_secs(10),
3 => Duration::from_secs(30),
_ => Duration::from_secs(300), // 5 minutes
};
} else {
break;

tokio::time::sleep(sleep_duration).await;
}
}
});

// Execute direct listening if no proxy was ever connected
let result = Self::handle_listen_connection(
self.listen_address,
self.network_job_manager.clone(),
self.conn_limiter.clone(),
self.node_name.clone(),
)
.await;
shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Error,
&format!("{} > TCP: Listening error {:?}", self.listen_address, result),
);
// Execute direct listening in a separate task while the proxy task runs
// in the background so that startup isn't blocked if either fails.
let listen_address = self.listen_address;
let network_job_manager = self.network_job_manager.clone();
let conn_limiter = self.conn_limiter.clone();
let node_name = self.node_name.clone();

tokio::spawn(async move {
let result = Self::handle_listen_connection(
listen_address,
network_job_manager,
conn_limiter,
node_name,
)
.await;

shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Error,
&format!("{} > TCP: Listening error {:?}", listen_address, result),
);
});

// Keep this future alive indefinitely. The listening task above will
// run until cancelled, ensuring the select! loop in `start` does not
// terminate early.
futures::future::pending::<()>().await;
}

async fn establish_proxy_connection(
Expand Down