Skip to content

Server nodes IPv6 support #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions mgmtd/assets/beegfs-mgmtd.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,35 @@
# The private key file belonging to the above certificate.
# tls-key-file = "/etc/beegfs/key.pem"

# Restricts network interfaces reported to other nodes for incoming BeeMsg communication.
# The interfaces are reported in the given order. If not given, all suitable interfaces can be used.
# interfaces = ["eth0", "eth1"]
# Restricts and prioritizes network interfaces reported to other nodes for incoming BeeMsg
# communication.
#
# Accepts a list of interface/nic filters. Interfaces can be filtered by name, address and protocol
# (ipv4 or ipv6). Each filter entry has the form `[!] [<name>|*] [<addr>|*] [<protocol>|*]`, where
# protocol can be "4" or "6". Each field can be set to "*" to match any value. Stars on the right
# can be omitted. The order of the filter entries determines the priority of the interfaces as they
# should be used by other nodes for BeeMsg communication. The first entry an interface matches is
# that interfaces priority - the earlier the match, the higher the priority. Any interface that
# doesn't match any entry is not reported and will thus not be contacted by other nodes. A single
# `!` before the entry blacklists the matching interfaces - it is not reported even if a later entry
# does match it.
#
# If not given, all suitable interfaces can be used and are reported in default order.
#
# EXAMPLES:
#
# * Prefer IPv6: ["* * 6", "* * 4"]
# * IPv6 only: ["* * 6"]
# * Only the eth0 interface using IPv6: ["eth0 * 6"]
# * Prefer one IPv6 address, allow only IPv4 otherwise: ["* fd00::1", "* * 4"]
# * Deny eth0 interface, allow everything else: ["! eth0", "*"]
#
# interfaces = ["*"]

# Prefers an interfaces ipv6 addresses over ipv4.
# By default, ipv4 addresses are preferred. If the interface filter is given, the interface
# order has higher priority than the address family, which is sorted per interface.
# interfaces_prefer_ipv6 = false

# Maximum number of outgoing connections per node.
# connection-limit = 12
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/bee_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub async fn notify_nodes<M: Msg + Serializable>(
node_types: &'static [NodeType],
msg: &M,
) {
log::trace!("NOTIFICATION to {:?}: {:?}", node_types, msg);
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");

if let Err(err) = async {
for t in node_types {
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/bee_msg/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl HandleWithResponse for SetStorageTargetInfo {
})
.await?;

log::debug!("Updated {:?} target info", node_type,);
log::debug!("Updated {node_type:?} target info");

// in the old mgmtd, a notice to refresh cap pools is sent out here if a cap pool
// changed I consider this being to expensive to check here and just don't
Expand Down
34 changes: 27 additions & 7 deletions mgmtd/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::{Context, Result, bail};
use clap::{Parser, ValueEnum};
use log::LevelFilter;
use serde::{Deserialize, Deserializer};
use shared::nic::{self, NicFilter};
use shared::parser::{duration, integer_range};
use shared::types::{Port, QuotaId};
use std::fmt::Debug;
Expand Down Expand Up @@ -203,14 +204,33 @@ generate_structs! {
#[arg(value_name = "PATH")]
tls_key_file: PathBuf = "/etc/beegfs/key.pem".into(),

/// Restricts network interfaces reported to other nodes for incoming BeeMsg communication.
/// Restricts and prioritizes network interfaces reported to other nodes for incoming BeeMsg
/// communication.
///
/// Accepts a comma separated list of interface names. They are reported in the given order. If
/// not given, all suitable interfaces can be used.
/// Accepts a comma separated list of interface/nic filters. Interfaces can be filtered by
/// name, address and protocol (ipv4 or ipv6). Each filter entry has the form `[!] [<name>|*]
/// [<addr>|*] [<protocol>|*]`, where protocol can be "4" or "6". Each field can be set to
/// "*" to match any value. Stars on the right can be omitted. The order of the filter entries
/// determines the priority of the interfaces as they should be used by other nodes for BeeMsg
/// communication. The first entry an interface matches is that interfaces priority - the
/// earlier the match, the higher the priority. Any interface that doesn't match any entry is
/// not reported and will thus not be contacted by other nodes. A single `!` before the entry
/// blacklists the matching interfaces - it is not reported even if a later entry does match it.
///
/// If not given, all suitable interfaces can be used and are reported in default order.
///
/// EXAMPLES:
///
/// * Prefer IPv6: `* * 6,* * 4`
/// * IPv6 only: `* * 6`
/// * Only the eth0 interface using IPv6: `eth0 * 6`
/// * Prefer one IPv6 address, allow only IPv4 otherwise: `* fd00::1,* * 4`
/// * Deny eth0 interface, allow everything else: `! eth0,*`
#[arg(long)]
#[arg(value_name = "NAMES")]
#[arg(value_name = "FILTERS")]
#[arg(value_delimiter = ',')]
interfaces: Vec<String> = vec![],
#[arg(value_parser = nic::NicFilter::parse)]
interfaces: Vec<NicFilter> = vec![],

/// Maximum number of outgoing BeeMsg connections per node. [default: 12]
#[arg(long)]
Expand Down Expand Up @@ -482,13 +502,13 @@ pub fn load_and_parse() -> Result<(Config, Vec<String>)> {
let file_config: OptionalConfig =
toml::from_str(toml_config).with_context(|| "Could not parse config file")?;

info_log.push(format!("Loaded config file from {:?}", config_file));
info_log.push(format!("Loaded config file from {config_file:?}"));
config.update_from_optional(file_config);
}
Err(err) => {
if config_file != &config.config_file {
return Err(err)
.with_context(|| format!("Could not open config file at {:?}", config_file));
.with_context(|| format!("Could not open config file at {config_file:?}"));
}

info_log.push("No config file found at default location, ignoring".to_string());
Expand Down
9 changes: 3 additions & 6 deletions mgmtd/src/db/import_v7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,7 @@ fn quota(tx: &Transaction, quota_path: &Path) -> Result<()> {

quota_default_limits(tx, &e.path().join("quotaDefaultLimits.store"), pool_id)
.with_context(|| {
format!(
"quota default limits ({}/quotaDefaultLimits.store)",
pool_id
)
format!("quota default limits ({pool_id}/quotaDefaultLimits.store)")
})?;

quota_limits(
Expand All @@ -386,15 +383,15 @@ fn quota(tx: &Transaction, quota_path: &Path) -> Result<()> {
pool_id,
QuotaIdType::User,
)
.with_context(|| format!("quota user limits ({}/quotaUserLimits.store)", pool_id))?;
.with_context(|| format!("quota user limits ({pool_id}/quotaUserLimits.store)"))?;

quota_limits(
tx,
&e.path().join("quotaGroupLimits.store"),
pool_id,
QuotaIdType::Group,
)
.with_context(|| format!("quota group limits ({}/quotaGroupLimits.store)", pool_id))?;
.with_context(|| format!("quota group limits ({pool_id}/quotaGroupLimits.store)"))?;

// We intentionally ignore the quota usage data - it is fetched and updated from the
// nodes on a regular basis anyway.
Expand Down
4 changes: 2 additions & 2 deletions mgmtd/src/db/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ mod test {

assert_eq!(19, targets.len());

assert!(targets.iter().any(|e| *e == new_target_id));
assert!(targets.iter().any(|e| *e == 1000));
assert!(targets.contains(&new_target_id));
assert!(targets.contains(&1000));
})
}
}
2 changes: 1 addition & 1 deletion mgmtd/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ pub(crate) fn serve(ctx: Context, mut shutdown: RunStateHandle) -> Result<()> {
builder
};

let serve_addr = SocketAddr::new("0.0.0.0".parse()?, ctx.info.user_config.grpc_port);
let serve_addr = SocketAddr::new("::".parse()?, ctx.info.user_config.grpc_port);

let service = pm::management_server::ManagementServer::with_interceptor(
ManagementService { ctx: ctx.clone() },
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/grpc/buddy_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ Primary result: {:?}, Secondary result: {:?}",
.await?;

if execute {
log::info!("Buddy group deleted: {}", group);
log::info!("Buddy group deleted: {group}");
}

Ok(pm::DeleteBuddyGroupResponse {
Expand Down
8 changes: 7 additions & 1 deletion mgmtd/src/grpc/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;
use shared::bee_msg::node::RemoveNode;
use std::net::{IpAddr, Ipv6Addr};
use std::str::FromStr;

/// Delivers a list of nodes
pub(crate) async fn get(ctx: Context, req: pm::GetNodesRequest) -> Result<pm::GetNodesResponse> {
Expand Down Expand Up @@ -139,7 +141,11 @@ pub(crate) async fn get(ctx: Context, req: pm::GetNodesRequest) -> Result<pm::Ge
.filter(|(uid, _)| node.id.as_ref().is_some_and(|e| e.uid == Some(*uid)))
.cloned()
.map(|(_, mut nic)| {
nic.addr = format!("{}:{}", nic.addr, node.port);
nic.addr = SocketAddr::new(
IpAddr::from_str(&nic.addr).unwrap_or(Ipv6Addr::UNSPECIFIED.into()),
node.port as u16,
)
.to_string();
nic
})
.collect();
Expand Down
10 changes: 5 additions & 5 deletions mgmtd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use anyhow::Result;
use bee_msg::notify_nodes;
use db::node_nic::ReplaceNic;
use license::LicenseVerifier;
use shared::NetworkAddr;
use shared::bee_msg::target::RefreshTargetStates;
use shared::conn::{Pool, incoming};
use shared::nic::Nic;
use shared::run_state::{self, RunStateControl};
use shared::types::{AuthSecret, MGMTD_UID, NicType, NodeId, NodeType};
use sqlite::TransactionExt;
Expand All @@ -39,7 +39,7 @@ use types::SqliteEnumExt;
pub struct StaticInfo {
pub user_config: Config,
pub auth_secret: Option<AuthSecret>,
pub network_addrs: Vec<NetworkAddr>,
pub network_addrs: Vec<Nic>,
}

/// Starts the management service.
Expand All @@ -63,7 +63,7 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
// UDP socket for in- and outgoing messages
let udp_socket = Arc::new(
UdpSocket::bind(SocketAddr::new(
"0.0.0.0".parse()?,
"::0".parse()?,
info.user_config.beemsg_port,
))
.await?,
Expand Down Expand Up @@ -94,7 +94,7 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont
MGMTD_UID,
info.network_addrs.iter().map(|e| ReplaceNic {
nic_type: NicType::Ethernet,
addr: &e.addr,
addr: &e.address,
name: e.name.as_str().into(),
}),
)
Expand Down Expand Up @@ -122,7 +122,7 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result<RunCont

// Listen for incoming TCP connections
incoming::listen_tcp(
SocketAddr::new("0.0.0.0".parse()?, ctx.info.user_config.beemsg_port),
SocketAddr::new("::0".parse()?, ctx.info.user_config.beemsg_port),
ctx.clone(),
info.auth_secret.is_some(),
run_state.clone(),
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ doc.beegfs.io.",
None
};

let network_addrs = shared::ethernet_interfaces(&user_config.interfaces)?;
let network_addrs = shared::nic::query_nics(&user_config.interfaces)?;

// Configure the tokio runtime
let rt = tokio::runtime::Builder::new_multi_thread()
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn delete_stale_clients(ctx: Context, mut run_state: RunStateHandle) {
{
Ok(affected) => {
if affected > 0 {
log::info!("Deleted {} stale clients", affected);
log::info!("Deleted {affected} stale clients");
}
}
Err(err) => log::error!("Deleting stale clients failed: {err:#}"),
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.85"
channel = "1.88"
profile = "default"
6 changes: 3 additions & 3 deletions shared/src/conn/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Pool {
node_uid: Uid,
msg: &M,
) -> Result<R> {
log::trace!("REQUEST to {:?}: {:?}", node_uid, msg);
log::trace!("REQUEST to {node_uid:?}: {msg:?}");

let mut buf = self.store.pop_buf().unwrap_or_default();

Expand All @@ -58,14 +58,14 @@ impl Pool {

self.store.push_buf(buf);

log::trace!("RESPONSE RECEIVED from {:?}: {:?}", node_uid, resp);
log::trace!("RESPONSE RECEIVED from {node_uid:?}: {resp:?}");

Ok(resp)
}

/// Sends a [Msg] to a node and does **not** receive a response.
pub async fn send<M: Msg + Serializable>(&self, node_uid: Uid, msg: &M) -> Result<()> {
log::trace!("SEND to {:?}: {:?}", node_uid, msg);
log::trace!("SEND to {node_uid:?}: {msg:?}");

let mut buf = self.store.pop_buf().unwrap_or_default();

Expand Down
61 changes: 1 addition & 60 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,7 @@ pub mod conn;
#[cfg(feature = "grpc")]
pub mod grpc;
pub mod journald_logger;
pub mod nic;
pub mod parser;
pub mod run_state;
pub mod types;

use anyhow::{Result, bail};
use std::net::IpAddr;

#[derive(Debug, Clone)]
pub struct NetworkAddr {
pub addr: IpAddr,
pub name: String,
}

/// Retrieve the systems available network interfaces with their addresses
///
/// Only interfaces matching one of the given names in `filter` will be returned, unless the list
/// is empty.
pub fn ethernet_interfaces(filter: &[impl AsRef<str>]) -> Result<Vec<NetworkAddr>> {
let mut filtered_nics = vec![];
for interface in pnet_datalink::interfaces() {
if !filter.is_empty() && !filter.iter().any(|e| interface.name == e.as_ref()) {
continue;
}

for ip in interface.ips {
// TODO Ipv6: Remove the Ipv4 filter when protocol changes (https://github.com/ThinkParQ/beegfs-rs/issues/145)
if !ip.is_ipv4() {
continue;
}

filtered_nics.push(NetworkAddr {
addr: ip.ip(),
name: interface.name.clone(),
});
}
}

// Check all filters have been used
if !filter
.iter()
.all(|e| filtered_nics.iter().any(|g| g.name == e.as_ref()))
{
bail!("At least one network interface doesn't exist");
}

// Sort
filtered_nics.sort_unstable_by_key(|k| {
if filter.is_empty() {
// Move loopbacks to the back
k.addr.is_loopback() as usize
} else {
// Sort by filter
filter
.iter()
.enumerate()
.find(|e| e.1.as_ref() == k.name)
.map(|e| e.0)
.unwrap_or(usize::MAX)
}
});

Ok(filtered_nics)
}
Loading
Loading