diff --git a/kernel/src/driver/net/mod.rs b/kernel/src/driver/net/mod.rs index 98187cd86..67a9cd1b6 100644 --- a/kernel/src/driver/net/mod.rs +++ b/kernel/src/driver/net/mod.rs @@ -203,30 +203,38 @@ impl IfaceCommon { D: smoltcp::phy::Device + ?Sized, { let timestamp = crate::time::Instant::now().into(); - let mut sockets = self.sockets.lock_irqsave(); let mut interface = self.smol_iface.lock_irqsave(); - let (has_events, poll_at) = { - ( - matches!( - interface.poll(timestamp, device, &mut sockets), - smoltcp::iface::PollResult::SocketStateChanged - ), - loop { - let poll_at = interface.poll_at(timestamp, &sockets); - let Some(instant) = poll_at else { - break poll_at; - }; - if instant > timestamp { - break poll_at; - } - }, - ) + // 逐包处理并通知 + loop { + let mut sockets = self.sockets.lock_irqsave(); + let result = interface.poll_ingress_single(timestamp, device, &mut sockets); + drop(sockets); + match result { + smoltcp::iface::PollIngressSingleResult::None => break, + smoltcp::iface::PollIngressSingleResult::PacketProcessed => {}, + smoltcp::iface::PollIngressSingleResult::SocketStateChanged => { + self.bounds.read_irqsave().iter().for_each(|bound_socket| { + bound_socket.notify(); + let _woke = bound_socket + .wait_queue() + .wakeup(Some(ProcessState::Blocked(true))); + }); + } + } + } + let mut sockets = self.sockets.lock_irqsave(); + let _ = interface.poll_egress(timestamp, device, &mut sockets); + let poll_at = loop { + let poll_at = interface.poll_at(timestamp, &sockets); + let Some(instant) = poll_at else { + break poll_at; + }; + if instant > timestamp { + break poll_at; + } }; - - // drop sockets here to avoid deadlock drop(interface); - drop(sockets); use core::sync::atomic::Ordering; if let Some(instant) = poll_at { @@ -242,16 +250,6 @@ impl IfaceCommon { self.poll_at_ms.store(0, Ordering::Relaxed); } - self.bounds.read_irqsave().iter().for_each(|bound_socket| { - // incase our inet socket missed the event, we manually notify it each time we poll - if has_events { - bound_socket.notify(); - let _woke = bound_socket - .wait_queue() - .wakeup(Some(ProcessState::Blocked(true))); - } - }); - // TODO: remove closed sockets // let closed_sockets = self // .closing_sockets diff --git a/kernel/src/net/socket/inet/stream/inner.rs b/kernel/src/net/socket/inet/stream/inner.rs index 3577df352..2a254ca7e 100644 --- a/kernel/src/net/socket/inet/stream/inner.rs +++ b/kernel/src/net/socket/inet/stream/inner.rs @@ -4,10 +4,12 @@ use crate::filesystem::epoll::EPollEventType; use crate::libs::rwlock::RwLock; use crate::net::socket::{self, inet::Types}; use alloc::boxed::Box; -use alloc::vec::Vec; +use alloc::sync::Arc; +use alloc::collections::VecDeque; use smoltcp; use smoltcp::socket::tcp; use system_error::SystemError; +use log::debug; // pub const DEFAULT_METADATA_BUF_SIZE: usize = 1024; pub const DEFAULT_RX_BUF_SIZE: usize = 128 * 1024; @@ -134,25 +136,7 @@ impl Init { smoltcp::wire::IpListenEndpoint::from(local) }; log::debug!("listen at {:?}", listen_addr); - let mut inners = Vec::new(); - if let Err(err) = || -> Result<(), SystemError> { - for _ in 0..(backlog - 1) { - // -1 because the first one is already bound - let new_listen = socket::inet::BoundInner::bind( - new_listen_smoltcp_socket(listen_addr), - listen_addr - .addr - .as_ref() - .unwrap_or(&smoltcp::wire::IpAddress::from( - smoltcp::wire::Ipv4Address::UNSPECIFIED, - )), - )?; - inners.push(new_listen); - } - Ok(()) - }() { - return Err((Init::Bound((inner, local)), err)); - } + if let Err(err) = inner.with_mut::(|socket| { socket @@ -162,11 +146,12 @@ impl Init { return Err((Init::Bound((inner, local)), err)); } - inners.push(inner); + let mut inners = VecDeque::new(); + inners.push_back(Arc::new(inner)); return Ok(Listening { - inners, - connect: AtomicUsize::new(0), + inners: RwLock::new(inners), listen_addr, + backlog, }); } @@ -289,21 +274,45 @@ impl Connecting { #[derive(Debug)] pub struct Listening { - inners: Vec, - connect: AtomicUsize, + inners: RwLock>>, + backlog: usize, listen_addr: smoltcp::wire::IpListenEndpoint, } impl Listening { pub fn accept(&mut self) -> Result<(Established, smoltcp::wire::IpEndpoint), SystemError> { - let connected: &mut socket::inet::BoundInner = self - .inners - .get_mut(self.connect.load(core::sync::atomic::Ordering::Relaxed)) - .unwrap(); + let inners_guard = self.inners.read(); + let inner = match inners_guard.front() { + Some(v) => v, + None => return Err(SystemError::EAGAIN_OR_EWOULDBLOCK), + }; - if connected.with::(|socket| !socket.is_active()) { + if inner.with::(|socket| !socket.is_active()) { return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); } + drop(inners_guard); + + let mut inners_writer = self.inners.write(); + let connected = match inners_writer.pop_front() { + Some(v) => v, + None => return Err(SystemError::EAGAIN_OR_EWOULDBLOCK), + }; + + if inners_writer.len() == 0 { + let new_listen = match socket::inet::BoundInner::bind( + new_listen_smoltcp_socket(self.listen_addr), + self.listen_addr + .addr + .as_ref() + .unwrap_or(&smoltcp::wire::IpAddress::from( + smoltcp::wire::Ipv4Address::UNSPECIFIED, + )), + ) { + Ok(inner) => inner, + Err(_) => return Err(SystemError::EAGAIN_OR_EWOULDBLOCK), + }; + inners_writer.push_back(Arc::new(new_listen)); + } let remote_endpoint = connected.with::(|socket| { socket @@ -311,33 +320,32 @@ impl Listening { .expect("A Connected Tcp With No Remote Endpoint") }); - // log::debug!("local at {:?}", local_endpoint); - - let mut new_listen = socket::inet::BoundInner::bind( - new_listen_smoltcp_socket(self.listen_addr), - self.listen_addr - .addr - .as_ref() - .unwrap_or(&smoltcp::wire::IpAddress::from( - smoltcp::wire::Ipv4Address::UNSPECIFIED, - )), - )?; - - // swap the connected socket with the new_listen socket - // TODO is smoltcp socket swappable? - core::mem::swap(&mut new_listen, connected); - - return Ok((Established { inner: new_listen }, remote_endpoint)); + return Ok((Established { inner: Arc::try_unwrap(connected).unwrap() }, remote_endpoint)); } pub fn update_io_events(&self, pollee: &AtomicUsize) { - let position = self.inners.iter().position(|inner| { - inner.with::(|socket| socket.is_active()) - }); + let mut inners_guard = self.inners.write(); + let inner = match inners_guard.back() { + Some(inner) => inner, + None => return debug!("the tcp socket inners is empty"), + }; - if let Some(position) = position { - self.connect - .store(position, core::sync::atomic::Ordering::Relaxed); + if inner.with::(|socket| socket.is_active()) { + if inners_guard.len() < self.backlog { + let new_listen = match socket::inet::BoundInner::bind( + new_listen_smoltcp_socket(self.listen_addr), + self.listen_addr + .addr + .as_ref() + .unwrap_or(&smoltcp::wire::IpAddress::from( + smoltcp::wire::Ipv4Address::UNSPECIFIED, + )), + ) { + Ok(inner) => inner, + Err(e) => return debug!("bind err: {:#?}", e), + }; + inners_guard.push_back(Arc::new(new_listen)); + } pollee.fetch_or( EPollEventType::EPOLL_LISTEN_CAN_ACCEPT.bits() as usize, core::sync::atomic::Ordering::Relaxed, @@ -364,18 +372,16 @@ impl Listening { pub fn close(&self) { // log::debug!("Close Listening Socket"); let port = self.get_name().port; - for inner in self.inners.iter() { + let inners_guard = self.inners.read(); + for inner in inners_guard.iter() { inner.with_mut::(|socket| socket.close()); + inner.port_manager().unbind_port(Types::Tcp, port); } - self.inners[0] - .iface() - .port_manager() - .unbind_port(Types::Tcp, port); } pub fn release(&self) { // log::debug!("Release Listening Socket"); - for inner in self.inners.iter() { + for inner in self.inners.read().iter() { inner.release(); } } @@ -489,8 +495,14 @@ impl Inner { Inner::Init(_) => DEFAULT_TX_BUF_SIZE, Inner::Connecting(conn) => conn.with_mut(|socket| socket.send_capacity()), // only the first socket in the list is used for sending - Inner::Listening(listen) => listen.inners[0] - .with_mut::(|socket| socket.send_capacity()), + Inner::Listening(listen) => { + listen + .inners + .read() + .front() + .unwrap() + .with_mut::(|socket| socket.send_capacity()) + } Inner::Established(est) => est.with_mut(|socket| socket.send_capacity()), } } @@ -500,18 +512,24 @@ impl Inner { Inner::Init(_) => DEFAULT_RX_BUF_SIZE, Inner::Connecting(conn) => conn.with_mut(|socket| socket.recv_capacity()), // only the first socket in the list is used for receiving - Inner::Listening(listen) => listen.inners[0] - .with_mut::(|socket| socket.recv_capacity()), + Inner::Listening(listen) => { + listen + .inners + .read() + .front() + .unwrap() + .with_mut::(|socket| socket.recv_capacity()) + } Inner::Established(est) => est.with_mut(|socket| socket.recv_capacity()), } } - pub fn iface(&self) -> Option<&alloc::sync::Arc> { + pub fn iface(&self) -> Option> { match self { Inner::Init(_) => None, - Inner::Connecting(conn) => Some(conn.inner.iface()), - Inner::Listening(listen) => Some(listen.inners[0].iface()), - Inner::Established(est) => Some(est.inner.iface()), + Inner::Connecting(conn) => Some(conn.inner.iface().clone()), + Inner::Listening(listen) => Some(listen.inners.read().front().unwrap().iface().clone()), + Inner::Established(est) => Some(est.inner.iface().clone()), } } } diff --git a/user/apps/c_unitest/test_backlogc.c b/user/apps/c_unitest/test_backlogc.c new file mode 100644 index 000000000..7bf14bcda --- /dev/null +++ b/user/apps/c_unitest/test_backlogc.c @@ -0,0 +1,87 @@ +#include +#include +#include +#include +#include +#include +#include +#include + + + +int main() +{ + int sockfd; + struct sockaddr_in server_addr; + + // 创建套接字 + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + { + perror("socket creation failed"); + return EXIT_FAILURE; + } + + // 设置 SO_REUSEADDR 选项 + // int optval = 1; + // if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) { + // perror("setsockopt(SO_REUSEADDR) failed"); + // close(sockfd); + // exit(EXIT_FAILURE); + // } + + // 配置服务器地址 + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(12580); + + // 绑定套接字 + if (bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) + { + perror("bind failed"); + close(sockfd); + return EXIT_FAILURE; + } + + // 调用 listen 系统调用 + if (listen(sockfd, 10) < 0) + { + perror("listen failed"); + close(sockfd); + return EXIT_FAILURE; + } + + printf("Listening on port 12580......\n"); + + struct sockaddr_in client_addr; + socklen_t client_len = sizeof(client_addr); + char buffer[1024]; + int cnt = 0; + + while (cnt < 100) + { + int *client_sockfd = malloc(sizeof(int)); + if (!client_sockfd) + { + perror("malloc failed"); + continue; + } + + // 接受客户端连接 + *client_sockfd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len); + if (*client_sockfd < 0) + { + perror("accept failed"); + free(client_sockfd); + continue; + } + printf("the %dth connection\n", ++cnt); + close(*client_sockfd); + + } + + // 关闭套接字 + close(sockfd); + return EXIT_SUCCESS; +}