Skip to content

Overhaul BeeSerde and msg buffers #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ publish = false

[workspace.dependencies]
anyhow = "1"
bytes = "1"
clap = { version = "4", features = ["derive"] }
env_logger = "0"
itertools = "0"
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::bee_msg::dispatch_request;
use crate::license::LicenseVerifier;
use crate::{ClientPulledStateNotification, StaticInfo};
use anyhow::Result;
use shared::conn::Pool;
use shared::conn::msg_dispatch::*;
use shared::conn::outgoing::Pool;
use shared::run_state::WeakRunStateHandle;
use shared::types::{NodeId, NodeType};
use std::ops::Deref;
Expand Down
10 changes: 5 additions & 5 deletions mgmtd/src/db/import_v7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ targetStates=1
fn check_target_states(f: &Path) -> Result<()> {
let s = std::fs::read(f)?;

let mut des = Deserializer::new(&s, 0);
let mut des = Deserializer::new(&s);
let states = des.map(
false,
|des| TargetId::deserialize(des),
Expand Down Expand Up @@ -184,7 +184,7 @@ struct ReadNodesResult {
fn read_nodes(f: &Path) -> Result<ReadNodesResult> {
let s = std::fs::read(f)?;

let mut des = Deserializer::new(&s, 0);
let mut des = Deserializer::new(&s);
let version = des.u32()?;
let root_id = des.u32()?;
let root_mirrored = des.u8()?;
Expand Down Expand Up @@ -286,7 +286,7 @@ fn storage_targets(tx: &Transaction, targets_path: &Path) -> Result<()> {
fn storage_pools(tx: &Transaction, f: &Path) -> Result<()> {
let s = std::fs::read(f)?;

let mut des = Deserializer::new(&s, 0);
let mut des = Deserializer::new(&s);
// Serialized as size_t, which should usually be 64 bit.
let count = des.i64()?;
let mut used_aliases = vec![];
Expand Down Expand Up @@ -415,7 +415,7 @@ fn quota_default_limits(tx: &Transaction, f: &Path, pool_id: PoolId) -> Result<(
Err(err) => return Err(err.into()),
};

let mut des = Deserializer::new(&s, 0);
let mut des = Deserializer::new(&s);
let user_inode_limit = des.u64()?;
let user_space_limit = des.u64()?;
let group_inode_limit = des.u64()?;
Expand Down Expand Up @@ -497,7 +497,7 @@ fn quota_limits(
Err(err) => return Err(err.into()),
};

let mut des = Deserializer::new(&s, 0);
let mut des = Deserializer::new(&s);
let limits = des.seq(false, |des| QuotaEntry::deserialize(des))?;
des.finish()?;

Expand Down
3 changes: 2 additions & 1 deletion mgmtd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use db::node_nic::ReplaceNic;
use license::LicenseVerifier;
use shared::NetworkAddr;
use shared::bee_msg::target::RefreshTargetStates;
use shared::conn::{Pool, incoming};
use shared::conn::incoming;
use shared::conn::outgoing::Pool;
use shared::run_state::{self, RunStateControl};
use shared::types::{AuthSecret, MGMTD_UID, NicType, NodeId, NodeType};
use sqlite::TransactionExt;
Expand Down
1 change: 0 additions & 1 deletion shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ publish.workspace = true
bee_serde_derive = { path = "../bee_serde_derive" }

anyhow = { workspace = true }
bytes = { workspace = true }
log = { workspace = true }
pnet_datalink = "0"
protobuf = { workspace = true, optional = true }
Expand Down
168 changes: 166 additions & 2 deletions shared/src/bee_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

use crate::bee_serde::*;
use crate::types::*;
use anyhow::Result;
use anyhow::{Context, Result, anyhow};
use bee_serde_derive::BeeSerde;
use std::collections::{HashMap, HashSet};

pub mod buddy_group;
pub mod header;
pub mod misc;
pub mod node;
pub mod quota;
Expand Down Expand Up @@ -41,3 +40,168 @@ impl OpsErr {
pub const AGAIN: Self = Self(22);
pub const UNKNOWN_POOL: Self = Self(30);
}

/// The BeeMsg header
#[derive(Clone, Debug, PartialEq, Eq, BeeSerde)]
pub struct Header {
/// Total length of the serialized message, including the header itself
msg_len: u32,
/// Sometimes used for additional message specific payload and/or serialization info
pub msg_feature_flags: u16,
/// Sometimes used for additional message specific payload and/or serialization info
pub msg_compat_feature_flags: u8,
/// Sometimes used for additional message specific payload and/or serialization info
pub msg_flags: u8,
/// Fixed value to identify a BeeMsg header (see MSG_PREFIX below)
msg_prefix: u64,
/// Uniquely identifies the message type as defined in the C++ codebase in NetMessageTypes.h
msg_id: MsgId,
/// Sometimes used for additional message specific payload and/or serialization info
pub msg_target_id: TargetId,
/// Sometimes used for additional message specific payload and/or serialization info
pub msg_user_id: u32,
/// Mirroring related information
pub msg_seq: u64,
/// Mirroring related information
pub msg_seq_done: u64,
}

impl Header {
/// The serialized length of the header
pub const LEN: usize = 40;
/// Fixed value for identifying BeeMsges. In theory, this has some kind of version modifier
/// (thus the + 0), but it is unused
#[allow(clippy::identity_op)]
pub const MSG_PREFIX: u64 = (0x42474653 << 32) + 0;

/// The total length of the serialized message
pub fn msg_len(&self) -> usize {
self.msg_len as usize
}

/// The messages id
pub fn msg_id(&self) -> MsgId {
self.msg_id
}
}

impl Default for Header {
fn default() -> Self {
Self {
msg_len: 0,
msg_feature_flags: 0,
msg_compat_feature_flags: 0,
msg_flags: 0,
msg_prefix: Self::MSG_PREFIX,
msg_id: 0,
msg_target_id: 0,
msg_user_id: 0,
msg_seq: 0,
msg_seq_done: 0,
}
}
}

/// Serializes a BeeMsg body into the provided buffer.
///
/// The data is written from the beginning of the slice, it's up to the caller to pass the correct
/// sub slice if space for the header should be reserved.
///
/// # Return value
/// Returns the number of bytes written and the header modified by serialization function.
pub fn serialize_body<M: Msg + Serializable>(msg: &M, buf: &mut [u8]) -> Result<(usize, Header)> {
let mut ser = Serializer::new(buf);
msg.serialize(&mut ser)
.context("BeeMsg body serialization failed")?;

Ok((ser.bytes_written(), ser.finish()))
}

/// Serializes a BeeMsg header into the provided buffer.
///
/// # Return value
/// Returns the number of bytes written.
pub fn serialize_header(header: &Header, buf: &mut [u8]) -> Result<usize> {
let mut ser_header = Serializer::new(buf);
header
.serialize(&mut ser_header)
.context("BeeMsg header serialization failed")?;

Ok(ser_header.bytes_written())
}

/// Serializes a complete BeeMsg (header + body) into the provided buffer.
///
/// # Return value
/// Returns the number of bytes written.
pub fn serialize<M: Msg + Serializable>(msg: &M, buf: &mut [u8]) -> Result<usize> {
let (written, mut header) = serialize_body(msg, &mut buf[Header::LEN..])?;

header.msg_len = (written + Header::LEN) as u32;
header.msg_id = M::ID;

let _ = serialize_header(&header, &mut buf[0..Header::LEN])?;

Ok(header.msg_len())
}

/// Deserializes a BeeMsg header from the provided buffer.
///
/// # Return value
/// Returns the deserialized header.
pub fn deserialize_header(buf: &[u8]) -> Result<Header> {
const CTX: &str = "BeeMsg header deserialization failed";

let header_buf = buf
.get(..Header::LEN)
.ok_or_else(|| {
anyhow!(
"Header buffer must be at least {} bytes big, got {}",
Header::LEN,
buf.len()
)
})
.context(CTX)?;

let mut des = Deserializer::new(header_buf);
let header = Header::deserialize(&mut des).context(CTX)?;
des.finish().context(CTX)?;

if header.msg_prefix != Header::MSG_PREFIX {
return Err(anyhow!(
"Invalid BeeMsg prefix: Must be {}, got {}",
Header::MSG_PREFIX,
header.msg_prefix
))
.context(CTX);
}

Ok(header)
}

/// Deserializes a BeeMsg body from the provided buffer.
///
/// The data is read from the beginning of the slice, it's up to the caller to pass the correct
/// sub slice if space for the header should be excluded from the source.
///
/// # Return value
/// Returns the deserialized message.
pub fn deserialize_body<M: Msg + Deserializable>(header: &Header, buf: &[u8]) -> Result<M> {
const CTX: &str = "BeeMsg body deserialization failed";

let mut des = Deserializer::with_header(&buf[0..(header.msg_len() - Header::LEN)], header);
let des_msg = M::deserialize(&mut des).context(CTX)?;
des.finish().context(CTX)?;

Ok(des_msg)
}

/// Deserializes a complete BeeMsg (header + body) from the provided buffer.
///
/// # Return value
/// Returns the deserialized message.
pub fn deserialize<M: Msg + Deserializable>(buf: &[u8]) -> Result<M> {
let header = deserialize_header(&buf[0..Header::LEN])?;
let msg = deserialize_body(&header, &buf[Header::LEN..])?;
Ok(msg)
}
66 changes: 0 additions & 66 deletions shared/src/bee_msg/header.rs

This file was deleted.

Loading
Loading