Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 115 additions & 6 deletions applications/awkernel_services/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use alloc::{collections::BTreeMap, format};
use awkernel_async_lib::{
future::FutureExt, pubsub, scheduler::SchedulerType, select_biased, session_types::*,
};
use awkernel_lib::net::NetManagerError;

const NETWORK_SERVICE_RENDEZVOUS: &str = "/awkernel/network_service";

Expand All @@ -25,6 +26,7 @@ pub async fn run() {
let mut ch_irq_handlers = BTreeMap::new();
let mut ch_poll_handlers = BTreeMap::new();
let mut ch_tick_handlers = BTreeMap::new();
let mut ch_transmit_handlers = BTreeMap::new();

for if_status in awkernel_lib::net::get_all_interface() {
log::info!("Waking {} up.", if_status.device_name);
Expand All @@ -34,6 +36,7 @@ pub async fn run() {
&mut ch_irq_handlers,
&mut ch_poll_handlers,
&mut ch_tick_handlers,
&mut ch_transmit_handlers,
)
.await;
}
Expand Down Expand Up @@ -68,6 +71,7 @@ pub async fn run() {
&mut ch_irq_handlers,
&mut ch_poll_handlers,
&mut ch_tick_handlers,
&mut ch_transmit_handlers,
)
.await;
}
Expand Down Expand Up @@ -95,6 +99,22 @@ pub async fn run() {
let (ch, _) = ch.recv().await;
ch.close();
}

// Close tick handlers.
if let Some(ch) = ch_tick_handlers.remove(&if_status.interface_id) {
let ch = ch.send(()).await;
let (ch, _) = ch.recv().await;
ch.close();
}

// Close transmit handlers.
if let Some(chs) = ch_transmit_handlers.remove(&if_status.interface_id) {
for (_, ch) in chs {
let ch = ch.send(()).await;
let (ch, _) = ch.recv().await;
ch.close();
}
}
}
_ => (),
}
Expand All @@ -106,6 +126,7 @@ async fn spawn_handlers(
ch_irq_handlers: &mut BTreeMap<u16, ChanProtoInterruptHandlerDual>,
ch_poll_handlers: &mut BTreeMap<u64, ChanProtoInterruptHandlerDual>,
ch_tick_handlers: &mut BTreeMap<u64, ChanProtoInterruptHandlerDual>,
ch_transmit_handlers: &mut BTreeMap<u64, BTreeMap<usize, ChanProtoInterruptHandlerDual>>,
) {
for irq in if_status.irqs {
let (server, client) = session_channel::<ProtoInterruptHandler>();
Expand Down Expand Up @@ -143,19 +164,41 @@ async fn spawn_handlers(
.await;
}

if let Some(tick_msec) = if_status.tick_msec {
let (server, client) = session_channel::<ProtoInterruptHandler>();
ch_tick_handlers.insert(if_status.interface_id, client);

//if let Some(tick_msec) = if_status.tick_msec {
//let (server, client) = session_channel::<ProtoInterruptHandler>();
//ch_tick_handlers.insert(if_status.interface_id, client);

//let name = format!(
//"{}:{}: tick = {tick_msec} [msec]",
//crate::NETWORK_SERVICE_NAME,
//if_status.device_name,
//);

//awkernel_async_lib::spawn(
//name.into(),
//tick_handler(if_status.interface_id, tick_msec, server),
//SchedulerType::FIFO,
//)
//.await;
//}

for que_id in 0..if_status.num_queues {
let name = format!(
"{}:{}: tick = {tick_msec} [msec]",
"{}:{}: transmitter, que_id = {que_id}",
crate::NETWORK_SERVICE_NAME,
if_status.device_name,
);

let (server, client) = session_channel::<ProtoInterruptHandler>();

ch_transmit_handlers
.entry(if_status.interface_id)
.or_insert_with(BTreeMap::new)
.insert(que_id, client);

awkernel_async_lib::spawn(
name.into(),
tick_handler(if_status.interface_id, tick_msec, server),
transmitter(if_status.interface_id, que_id, server),
SchedulerType::FIFO,
)
.await;
Expand Down Expand Up @@ -329,3 +372,69 @@ async fn tick_handler(interface_id: u64, tick: u64, ch: Chan<(), ProtoInterruptH
}
}
}

// Transmit

struct NetworkTransmit {
interface_id: u64,
que_id: usize,
wait: bool,
}

impl Future for NetworkTransmit {
type Output = Result<(), NetManagerError>;

fn poll(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
let m = self.get_mut();

if !m.wait {
return Poll::Ready(Ok(()));
}

m.wait = false;

match awkernel_lib::net::register_waker_for_transmit(
m.interface_id,
m.que_id,
cx.waker().clone(),
) {
Ok(true) => Poll::Pending,
Ok(false) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
}
}
}

async fn transmitter(interface_id: u64, que_id: usize, ch: Chan<(), ProtoInterruptHandler>) {
let mut ch = ch.recv().boxed().fuse();
let dur: Duration = Duration::from_micros(50);

loop {
// Register the waker.
let mut wait_transmit = NetworkTransmit {
interface_id,
que_id,
wait: true,
}
.fuse();

select_biased! {
(ch, _) = ch => {
let ch = ch.send(()).await;
ch.close();
return;
},
_ = wait_transmit => {},
}

awkernel_async_lib::sleep(dur).await;

// Transmit the packet.
awkernel_lib::net::transmit(interface_id, que_id);

// awkernel_async_lib::r#yield().await;
}
}
20 changes: 15 additions & 5 deletions applications/load_test_udp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
#![no_std]

extern crate alloc;
use alloc::format;

use core::{net::Ipv4Addr, time::Duration};

use awkernel_async_lib::net::{udp::UdpConfig, IpAddr};
use awkernel_lib::delay::uptime_nano;
use core::sync::atomic::{AtomicBool, Ordering};

const INTERFACE_ADDR: Ipv4Addr = Ipv4Addr::new(192, 168, 0, 3);

const BASE_PORT: u16 = 20000;

pub async fn run() {
const NUM_TASKS: [usize; 11] = [1000, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000];
//const NUM_TASKS: [usize; 11] = [1000, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000];
const NUM_TASKS: [usize; 1] = [100];
awkernel_lib::net::add_ipv4_addr(1, INTERFACE_ADDR, 24);

for num_task in NUM_TASKS {
let mut join = alloc::vec::Vec::new();
for task_id in 0..num_task {
let port = BASE_PORT + task_id as u16;

let name = format!("udp_server:{}", port);
let hdl = awkernel_async_lib::spawn(
"test udp".into(),
name.into(),
udp_server(port),
awkernel_async_lib::scheduler::SchedulerType::RR,
)
Expand Down Expand Up @@ -49,13 +54,18 @@ async fn udp_server(port: u16) {

loop {
match socket.recv(&mut buf).await {
Ok((read_bytes, client_addr, port)) => {
Ok((read_bytes, client_addr, client_port)) => {
if read_bytes == 1 {
break;
}
let received_data = &buf[..read_bytes];
let received_data = &mut buf[..read_bytes];
//if port == 20048 {
//let t = uptime_nano();
//let bytes = t.to_le_bytes();
//received_data[50..66].copy_from_slice(&bytes);
//}

if let Err(e) = socket.send(received_data, &client_addr, port).await {
if let Err(e) = socket.send(received_data, &client_addr, client_port).await {
log::error!("Failed to send a UDP packet: {:?}", e);
awkernel_async_lib::sleep(Duration::from_secs(1)).await;
continue;
Expand Down
4 changes: 3 additions & 1 deletion awkernel_async_lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ default-features = false
path = "../awkernel_async_lib_verified"

[dependencies.smoltcp]
version = "0.11.0"
path = "../smoltcp"
default-features = false
features = [
"alloc",
Expand All @@ -37,6 +37,8 @@ features = [
"medium-ethernet",
"proto-igmp",
]
#version = "0.11.0"
#path = "/home/awkernel/.cargo/registry/src/index.crates.io-6f17d22bba15001f/smoltcp-0.11.0"

[features]
default = []
Expand Down
5 changes: 4 additions & 1 deletion awkernel_drivers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ version = "0.11"
optional = true

[dependencies.smoltcp]
version = "0.11"
path = "../smoltcp" # プロジェクトルートからの相対パス
default-features = false
features = [
"alloc",
Expand All @@ -65,9 +65,12 @@ features = [
"socket-tcp",
"proto-ipv4",
"proto-ipv6",
"socket-icmp",
"medium-ethernet",
"proto-igmp",
]
#version = "0.11"
#path = "/home/awkernel/.cargo/registry/src/index.crates.io-6f17d22bba15001f/smoltcp-0.11.0"

[features]
default = []
Expand Down
2 changes: 2 additions & 0 deletions awkernel_drivers/src/pcie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ fn read_bar(config_space: &ConfigSpace, offset: usize) -> BaseAddress {
let size = {
let high_bar = config_space.read_u32(high_offset);

config_space.write_u32(4, 0x04);
config_space.write_u32(!0, offset);
config_space.write_u32(!0, high_offset);

Expand All @@ -1121,6 +1122,7 @@ fn read_bar(config_space: &ConfigSpace, offset: usize) -> BaseAddress {

config_space.write_u32(bar, offset);
config_space.write_u32(high_bar, high_offset);
config_space.write_u32(6, 0x04);

(!((high_size as u64) << 32 | ((low_size & BAR_MEM_ADDR_MASK) as u64)) + 1) as usize
};
Expand Down
10 changes: 7 additions & 3 deletions awkernel_drivers/src/pcie/intel/e1000e_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,17 @@ impl NetDevice for E1000eExample {
fn recv(
&self,
_que_id: usize,
) -> Result<Option<net_device::EtherFrameBuf>, net_device::NetDevError> {
) -> Result<Option<net_device::EtherFrameDMA>, net_device::NetDevError> {
todo!("recv");
}

fn send(
fn send(&self, _que_id: usize) -> Result<(), net_device::NetDevError> {
todo!("send");
}

fn push(
&self,
_data: net_device::EtherFrameRef,
_data: net_device::EtherFrameDMAcsum,
_que_id: usize,
) -> Result<(), net_device::NetDevError> {
todo!("send");
Expand Down
52 changes: 30 additions & 22 deletions awkernel_drivers/src/pcie/intel/igb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use awkernel_lib::{
ipv6::Ip6Hdr,
multicast::MulticastAddrs,
net_device::{
EtherFrameBuf, EtherFrameRef, LinkStatus, NetCapabilities, NetDevError, NetDevice,
NetFlags, PacketHeaderFlags,
EtherFrameBuf, EtherFrameDMA, EtherFrameDMAcsum, EtherFrameRef, LinkStatus,
NetCapabilities, NetDevError, NetDevice, NetFlags, PacketHeaderFlags,
},
tcp::TCPHdr,
udp::UDPHdr,
Expand Down Expand Up @@ -2061,26 +2061,27 @@ impl NetDevice for Igb {
inner.hw.get_mac_addr()
}

fn recv(&self, que_id: usize) -> Result<Option<EtherFrameBuf>, NetDevError> {
{
let mut node = MCSNode::new();
let mut rx = self.que[que_id].rx.lock(&mut node);
fn recv(&self, que_id: usize) -> Result<Option<EtherFrameDMA>, NetDevError> {
//{
//let mut node = MCSNode::new();
//let mut rx = self.que[que_id].rx.lock(&mut node);

let data = rx.read_queue.pop();
if data.is_some() {
return Ok(data);
}
}
//let data = rx.read_queue.pop();
//if data.is_some() {
//return Ok(data);
//}
//}

self.rx_recv(que_id).or(Err(NetDevError::DeviceError))?;
//self.rx_recv(que_id).or(Err(NetDevError::DeviceError))?;

let mut node = MCSNode::new();
let mut rx = self.que[que_id].rx.lock(&mut node);
if let Some(data) = rx.read_queue.pop() {
Ok(Some(data))
} else {
Ok(None)
}
//let mut node = MCSNode::new();
//let mut rx = self.que[que_id].rx.lock(&mut node);
//if let Some(data) = rx.read_queue.pop() {
//Ok(Some(data))
//} else {
//Ok(None)
//}
Err(NetDevError::DeviceError)
}

fn can_send(&self) -> bool {
Expand All @@ -2096,9 +2097,16 @@ impl NetDevice for Igb {
true
}

fn send(&self, data: EtherFrameRef, que_id: usize) -> Result<(), NetDevError> {
let frames = [data];
self.send(que_id, &frames).or(Err(NetDevError::DeviceError))
fn send(&self, _que_id: usize) -> Result<(), NetDevError> {
//let frames = [data];
//self.send(que_id, &frames).or(Err(NetDevError::DeviceError))
Err(NetDevError::DeviceError)
}

fn push(&self, _data: EtherFrameDMAcsum, _que_id: usize) -> Result<(), NetDevError> {
//let frames = [data];
//self.send(que_id, &frames).or(Err(NetDevError::DeviceError))
Err(NetDevError::DeviceError)
}

fn up(&self) -> Result<(), NetDevError> {
Expand Down
Loading