Skip to content
Draft
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ members = [
"id",
"init",
"interface-manager",
"pkt-io",
"mgmt",
"nat",
"net",
Expand Down Expand Up @@ -53,6 +54,7 @@ mgmt = { path = "./mgmt", package = "dataplane-mgmt" }
nat = { path = "./nat", package = "dataplane-nat" }
net = { path = "./net", package = "dataplane-net", features = ["test_buffer"] }
pipeline = { path = "./pipeline", package = "dataplane-pipeline" }
pkt-io = { path = "./pkt-io", package = "dataplane-pkt-io" }
pkt-meta = { path = "./pkt-meta", package = "dataplane-pkt-meta" }
rekon = { path = "./rekon", package = "dataplane-rekon" }
routing = { path = "./routing", package = "dataplane-routing" }
Expand All @@ -79,6 +81,7 @@ caps = { version = "0.5.6", default-features = false, features = [] }
chrono = { version = "0.4.42", default-features = false, features = ["clock"] }
clap = { version = "4.5.49", default-features = true, features = [] }
crossbeam-channel = { version = "0.5.15", default-features = false, features = [] }
crossbeam = { version = "0.8.4", features = ["crossbeam-channel", "crossbeam-deque", "crossbeam-queue"] }
ctrlc = { version = "3.5.0", default-features = false, features = [] }
dashmap = { version = "6.1.0", default-features = false, features = [] }
derive_builder = { version = "0.20.2", default-features = false, features = ["default", "std"] }
Expand Down
1 change: 1 addition & 0 deletions dataplane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ once_cell = { workspace = true }
ordermap = { workspace = true, features = ["std"] }
parking_lot = { workspace = true }
pipeline = { workspace = true }
pkt-io = { workspace = true }
pkt-meta = { workspace = true }
routing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion dataplane/src/packet_processor/ipforward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl IpForwarder {
We can't re-inject packet on ingress, so let's disable this to avoid churn
packet.get_meta_mut().oif = Some(packet.get_meta().iif);
*/
packet.done(DoneReason::Local);
packet.done(DoneReason::Unhandled);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions dataplane/src/packet_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::packet_processor::ipforward::IpForwarder;

use concurrency::sync::Arc;

use pkt_io::PktIo;
use pkt_meta::dst_vpcd_lookup::{DstVpcdLookup, VpcDiscTablesWriter};
use pkt_meta::flow_table::{ExpirationsNF, FlowTable, LookupNF};

Expand Down Expand Up @@ -63,6 +64,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
let pipeline_builder = move || {
// Build network functions
let stage_ingress = Ingress::new("Ingress", iftr_factory.handle());
let pktio1 = PktIo::new(10_000usize, 100).set_name("pkt-io");
let stage_egress = Egress::new("Egress", iftr_factory.handle(), atabler_factory.handle());
let dst_vpcd_lookup = DstVpcdLookup::new("dst-vni-lookup", vpcdtablesr_factory.handle());
let iprouter1 = IpForwarder::new("IP-Forward-1", fibtr_factory.handle());
Expand All @@ -87,6 +89,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
.add_stage(stateless_nat)
.add_stage(stateful_nat)
.add_stage(iprouter2)
.add_stage(pktio1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct that this new stage has no effect until we set the queues calling set_injectq() and set_puntq()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have the behavior that you mention if we create the pkt-io without buffers.
In this commit, I added 10K buffer queue for injection and 100 pkts for punting.
Since some packets may be marked as local, they will be punted. But since no one checks the punt queue, they will be cached there forever. So this PR is not to be merged (I set 100 pkts so that we'd not cache megabytes) as is. The expectation was to add more commits from @daniel-noland to do the actual consumption of those packets).
If we'd want to merge this PR, we'd just need to set zero buffering to the pkt-io, in which case it would be transparent.

.add_stage(stage_egress)
.add_stage(dumper2)
.add_stage(flow_expirations_nf)
Expand Down
15 changes: 13 additions & 2 deletions dpdk/src/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use dpdk_sys::{
rte_pktmbuf_tailroom, rte_pktmbuf_trim,
};
// unfortunately, we need the standard library to swap allocators
use net::buffer::{Append, Headroom, Prepend, Tailroom, TrimFromEnd, TrimFromStart};
use net::buffer::{Append, Create, Headroom, Prepend, Tailroom, TrimFromEnd, TrimFromStart};
use std::alloc::System;
use std::ffi::CString;

Expand Down Expand Up @@ -422,7 +422,18 @@ impl AsMut<[u8]> for Mbuf {
self.raw_data_mut()
}
}

impl Create for Mbuf {
type PoolType = Pool;
fn create(pool: &mut Self::PoolType) -> Result<Self, ()> {
unsafe {
let raw = dpdk_sys::rte_pktmbuf_alloc(pool.inner().as_mut_ptr());
Ok(Self {
raw: NonNull::new(raw).ok_or_else(|| error!("Mbuf allocation failure"))?,
marker: PhantomData,
})
}
}
}
impl Headroom for Mbuf {
fn headroom(&self) -> u16 {
unsafe { rte_pktmbuf_headroom(self.raw.as_ptr()) }
Expand Down
21 changes: 20 additions & 1 deletion net/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ impl<T> PacketBuffer for T where T: AsRef<[u8]> + Headroom + Debug + 'static {}

/// Super trait representing the abstract operations which may be performed on mutable a packet buffer.
pub trait PacketBufferMut:
PacketBuffer + AsMut<[u8]> + Prepend + Send + TrimFromStart + TrimFromEnd + Headroom + Tailroom
PacketBuffer
+ AsMut<[u8]>
+ Prepend
+ Send
+ TrimFromStart
+ TrimFromEnd
+ Headroom
+ Tailroom
+ Create
{
}
impl<T> PacketBufferMut for T where
Expand All @@ -31,9 +39,20 @@ impl<T> PacketBufferMut for T where
+ TrimFromEnd
+ Headroom
+ Tailroom
+ Create
{
}

/// Trait representing the ability to create a packet buffer
pub trait Create {
/// Type of auxiliary object to allocate buffers from
type PoolType;
/// Allocate a packet buffer
fn create(pool: &mut Self::PoolType) -> Result<Self, ()>
where
Self: Sized;
}

/// Trait representing the ability to get the unused headroom in a packet buffer.
pub trait Headroom {
/// Get the (unused) headroom in a packet buffer.
Expand Down
11 changes: 9 additions & 2 deletions net/src/buffer/test_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
pub use contract::*;

use crate::buffer::{
Append, Headroom, MemoryBufferNotLongEnough, NotEnoughHeadRoom, NotEnoughTailRoom, Prepend,
Tailroom, TrimFromEnd, TrimFromStart,
Append, Create, Headroom, MemoryBufferNotLongEnough, NotEnoughHeadRoom, NotEnoughTailRoom,
Prepend, Tailroom, TrimFromEnd, TrimFromStart,
};
use tracing::trace;

Expand Down Expand Up @@ -103,6 +103,13 @@ impl AsMut<[u8]> for TestBuffer {
}
}

impl Create for TestBuffer {
type PoolType = ();
fn create(_: &mut Self::PoolType) -> Result<Self, ()> {
Ok(Self::new())
}
}

impl Headroom for TestBuffer {
fn headroom(&self) -> u16 {
self.headroom
Expand Down
26 changes: 22 additions & 4 deletions net/src/packet/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,27 @@ impl Display for BridgeDomain {
write!(f, "{}", self.get_id())
}
}

#[inline]
fn fmt_metadata_flags(meta: &PacketMeta, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, " Flags:")?;
if meta.is_l2bcast() {
write!(f, " bcast")?;
}
if meta.nat() {
write!(f, " do-nat")?;
}
if meta.local() {
write!(f, " local")?;
}
if meta.keep() {
write!(f, " keep")?;
}
if meta.checksum_refresh() {
write!(f, " refresh-chksum")?;
}
writeln!(f)
}
impl Display for PacketMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
writeln!(f, " metadata:")?;
Expand All @@ -246,16 +267,13 @@ impl Display for PacketMeta {
}
}
fmt_opt(f, " oif", self.oif, true)?;

writeln!(f, " bcast: {}", self.is_l2bcast())?;
fmt_opt(f, " src-vpcd", self.src_vpcd, false)?;
fmt_opt(f, " dst-vpcd", self.dst_vpcd, true)?;
writeln!(f, " do-nat: {}", self.nat())?;
fmt_opt(f, " vrf", self.vrf, false)?;
fmt_opt(f, " bd", self.bridge, true)?;
fmt_opt(f, " next-hop", self.nh_addr, true)?;
fmt_opt(f, " done", self.done, true)?;
writeln!(f, " keep: {}", self.keep())
fmt_metadata_flags(self, f)
}
}

Expand Down
16 changes: 14 additions & 2 deletions net/src/packet/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ pub enum DoneReason {
MissingEtherType, /* can't determine ethertype to use */
Unroutable, /* we don't have state to forward the packet */
NatFailure, /* It was not possible to NAT the packet */
Local, /* the packet has to be locally consumed by kernel */
Delivered, /* the packet buffer was delivered by the NF - e.g. for xmit */
InternalDrop, /* the packet is bydropped by dataplane (e.g. due to lack of resources like queue space) */
Delivered, /* the packet buffer was delivered by the NF - e.g. for xmit */
}

bitflags! {
Expand All @@ -125,6 +125,7 @@ bitflags! {
const NAT = 0b0000_0100; /* if true, NAT stage should attempt to NAT the packet */
const REFR_CHKSUM = 0b0000_1000; /* if true, an indication that packet checksums need to be refreshed */
const KEEP = 0b0001_0000; /* Keep the Packet even if it should be dropped */
const LOCAL = 0b0010_0000; /* Packet is to be locally consumed */
}
}

Expand Down Expand Up @@ -201,6 +202,17 @@ impl PacketMeta {
self.flags.remove(MetaFlags::KEEP);
}
}
#[must_use]
pub fn local(&self) -> bool {
self.flags.contains(MetaFlags::LOCAL)
}
pub fn set_local(&mut self, value: bool) {
if value {
self.flags.insert(MetaFlags::LOCAL);
} else {
self.flags.remove(MetaFlags::LOCAL);
}
}
}
impl Drop for PacketMeta {
fn drop(&mut self) {
Expand Down
20 changes: 20 additions & 0 deletions pkt-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "dataplane-pkt-io"
version = "0.1.0"
edition = "2024"
publish = false
license = "Apache-2.0"

[dependencies]
# internal
net = { workspace = true }
pipeline = { workspace = true }
tracectl = { workspace = true }

# external
crossbeam = { workspace = true }
linkme = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
tracing-test = { workspace = true }
10 changes: 10 additions & 0 deletions pkt-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Open Network Fabric Authors

//! Local packet I/O for gateway

mod nf;
mod tests;

pub use nf::PktIo;
pub use nf::PktQueue;
Loading
Loading