Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 203 additions & 29 deletions libazureinit/src/kvp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use uuid::Uuid;
const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512;
const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048;
const HV_KVP_AZURE_MAX_VALUE_SIZE: usize = 1022;
const EVENT_PREFIX: &str = concat!("azure-init-", env!("CARGO_PKG_VERSION"));
/// The default event prefix used when no custom prefix is provided.
pub const EVENT_PREFIX: &str =
concat!("azure-init-", env!("CARGO_PKG_VERSION"));

/// Encapsulates the KVP (Key-Value Pair) tracing infrastructure.
///
Expand Down Expand Up @@ -75,6 +77,7 @@ impl Kvp {
pub(crate) fn new(
file_path: std::path::PathBuf,
vm_id: &str,
event_prefix: &str,
graceful_shutdown: CancellationToken,
) -> Result<Self, anyhow::Error> {
truncate_guest_pool_file(&file_path)?;
Expand All @@ -96,6 +99,7 @@ impl Kvp {
tracing_layer: EmitKVPLayer {
events_tx,
vm_id: vm_id.to_string(),
event_prefix: event_prefix.to_string(),
},
writer,
})
Expand Down Expand Up @@ -167,7 +171,10 @@ impl Kvp {
/// # Arguments
/// * `file` - A mutable reference to the file to write to.
/// * `kvps` - A slice of encoded KVP messages to write.
fn write_kvps(file: &mut File, kvps: &[Vec<u8>]) -> io::Result<()> {
pub(crate) fn write_kvps(
file: &mut File,
kvps: &[Vec<u8>],
) -> io::Result<()> {
FileExt::lock_exclusive(file).map_err(|e| {
io::Error::other(format!("Failed to lock KVP file: {e}"))
})?;
Expand Down Expand Up @@ -254,6 +261,7 @@ impl Visit for StringVisitor<'_> {
pub struct EmitKVPLayer {
events_tx: UnboundedSender<Vec<u8>>,
vm_id: String,
event_prefix: String,
}

impl EmitKVPLayer {
Expand All @@ -275,8 +283,13 @@ impl EmitKVPLayer {
span_id: &str,
event_value: &str,
) {
let event_key =
generate_event_key(&self.vm_id, event_level, event_name, span_id);
let event_key = generate_event_key(
&self.event_prefix,
&self.vm_id,
event_level,
event_name,
span_id,
);
let encoded_kvp = encode_kvp_item(&event_key, event_value);
let encoded_kvp_flattened: Vec<u8> = encoded_kvp.concat();
self.send_event(encoded_kvp_flattened);
Expand Down Expand Up @@ -425,19 +438,24 @@ where
}
}

/// Generates a unique event key by combining the event level, name, and span ID.
/// Generates a unique event key by combining the event prefix, VM ID, level,
/// name, and span ID.
///
/// # Arguments
/// * `event_prefix` - A prefix identifying the emitting application or library
/// (e.g., `"azure-init-0.1.1"` or `"my-library-2.0"`).
/// * `vm_id` - The unique identifier for the VM.
/// * `event_level` - The logging level (e.g., "INFO", "DEBUG").
/// * `event_name` - The name of the event.
/// * `span_id` - A unique identifier for the span.
fn generate_event_key(
event_prefix: &str,
vm_id: &str,
event_level: &str,
event_name: &str,
span_id: &str,
) -> String {
format!("{EVENT_PREFIX}|{vm_id}|{event_level}|{event_name}|{span_id}")
format!("{event_prefix}|{vm_id}|{event_level}|{event_name}|{span_id}")
}

/// Encodes a key-value pair (KVP) into one or more byte slices. If the value
Expand All @@ -452,7 +470,7 @@ fn generate_event_key(
/// # Arguments
/// * `key` - The key as a string slice.
/// * `value` - The value associated with the key.
fn encode_kvp_item(key: &str, value: &str) -> Vec<Vec<u8>> {
pub(crate) fn encode_kvp_item(key: &str, value: &str) -> Vec<Vec<u8>> {
let key_buf = key
.as_bytes()
.iter()
Expand Down Expand Up @@ -532,37 +550,50 @@ pub fn decode_kvp_item(
}

/// Truncates the guest pool KVP file if it contains stale data (i.e., data
/// older than the system's boot time). Logs whether the file was truncated
/// or no action was needed.
/// older than the system's boot time).
///
/// An exclusive `flock` is held while checking metadata and truncating so
/// that concurrent processes don't race on the same check-then-truncate
/// sequence.
fn truncate_guest_pool_file(kvp_file: &Path) -> Result<(), anyhow::Error> {
let boot_time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
- get_uptime().as_secs();

match kvp_file.metadata() {
Ok(metadata) => {
if metadata.mtime() < boot_time as i64 {
OpenOptions::new()
.write(true)
.truncate(true)
.open(kvp_file)?;
println!("Truncated the KVP file due to stale data.");
} else {
println!(
"File has been truncated since boot, no action taken."
);
}
}
// Try to open the file; if it doesn't exist there is nothing to truncate.
let file = match OpenOptions::new().read(true).write(true).open(kvp_file) {
Ok(f) => f,
Err(ref e) if e.kind() == ErrorKind::NotFound => {
println!("File not found: {kvp_file:?}");
return Ok(());
}
Err(e) => {
return Err(anyhow::Error::from(e)
.context("Failed to access file metadata"));
.context("Failed to open KVP file for truncation"));
}
}
};

// Hold an exclusive lock for the metadata-check + truncate window so
// that two concurrent callers cannot both decide the file is stale and
// truncate data the other has already written.
FileExt::lock_exclusive(&file).map_err(|e| {
anyhow::Error::from(e).context("Failed to lock KVP file for truncation")
})?;

let result = (|| -> Result<(), anyhow::Error> {
let metadata = file.metadata()?;
if metadata.mtime() < boot_time as i64 {
file.set_len(0)?;
println!("Truncated the KVP file due to stale data.");
} else {
println!("File has been truncated since boot, no action taken.");
}
Ok(())
})();

Ok(())
// Always release the lock, even if the inner operation failed.
let _ = FileExt::unlock(&file);

result
}

/// Retrieves the system's uptime using the `sysinfo` crate, returning the duration
Expand Down Expand Up @@ -686,9 +717,13 @@ mod tests {
let test_vm_id = "00000000-0000-0000-0000-000000000001";

let graceful_shutdown = CancellationToken::new();
let kvp =
Kvp::new(temp_path.clone(), test_vm_id, graceful_shutdown.clone())
.expect("Failed to create Kvp");
let kvp = Kvp::new(
temp_path.clone(),
test_vm_id,
EVENT_PREFIX,
graceful_shutdown.clone(),
)
.expect("Failed to create Kvp");

let subscriber = Registry::default().with(kvp.tracing_layer);
let default_guard = tracing::subscriber::set_default(subscriber);
Expand Down Expand Up @@ -871,6 +906,7 @@ mod tests {
Kvp::new(
temp_path.clone(),
test_vm_id,
EVENT_PREFIX,
graceful_shutdown.clone(),
)
.expect("Failed to create Kvp")
Expand Down Expand Up @@ -899,4 +935,142 @@ mod tests {

println!("KVP file is empty as expected because kvp_diagnostics is disabled.");
}

/// Helper: verify that a file contains exactly `expected` well-formed KVP
/// records (each `HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE`
/// bytes).
fn assert_kvp_record_count(path: &std::path::Path, expected: usize) {
let contents = std::fs::read(path).expect("Failed to read KVP file");
let record_size =
HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE;

assert_eq!(
contents.len() % record_size,
0,
"File size ({}) is not a multiple of the record size ({record_size})",
contents.len()
);

let actual = contents.len() / record_size;
assert_eq!(
actual, expected,
"Expected {expected} KVP records but found {actual}"
);

// Validate every record is decodable.
for i in 0..actual {
let start = i * record_size;
let end = start + record_size;
decode_kvp_item(&contents[start..end])
.unwrap_or_else(|e| panic!("Record {i} failed to decode: {e}"));
}
}

/// 4 threads × 5,000 iterations writing to the same file via separate FDs.
#[test]
fn test_multi_thread_kvp_concurrent_writes() {
let temp_file =
NamedTempFile::new().expect("Failed to create tempfile");
let temp_path = temp_file.path().to_path_buf();

let num_threads: usize = 4;
let iterations: usize = 5_000;

let handles: Vec<_> = (0..num_threads)
.map(|tid| {
let path = temp_path.clone();
std::thread::spawn(move || {
// Each thread opens its own file descriptor.
let mut file = OpenOptions::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if kvp::new() should be responsible for opening the file as we're already opening it there?

Seems like having the interface dictate initialization avoids allowing the caller to do unexpected things when opening the file.

.append(true)
.create(true)
.open(&path)
.expect("Failed to open KVP file");

for i in 0..iterations {
let key = format!("thread-{tid}-iter-{i}");
let value = format!("value-{tid}-{i}");
let encoded = encode_kvp_item(&key, &value).concat();
Kvp::write_kvps(&mut file, &[encoded])
.expect("write_kvps failed");
}
})
})
.collect();

for h in handles {
h.join().expect("Thread panicked");
}

let expected_records = num_threads * iterations;
assert_kvp_record_count(&temp_path, expected_records);
println!(
"Multi-thread test passed: {expected_records} records verified."
);
}

/// 4 child processes × 5,000 iterations writing to the same file.
///
/// When the env var `__KVP_CHILD_WORKER` is set the process acts as a
/// worker (encode + write); otherwise it orchestrates the children.
#[test]
fn test_multi_process_kvp_concurrent_writes() {
let num_processes: usize = 4;
let iterations: usize = 5_000;

// --- Child worker path ---
if let Ok(path) = std::env::var("__KVP_CHILD_WORKER_PATH") {
let pid = std::process::id();
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(&path)
.expect("Child: failed to open KVP file");

for i in 0..iterations {
let key = format!("proc-{pid}-iter-{i}");
let value = format!("value-{pid}-{i}");
let encoded = encode_kvp_item(&key, &value).concat();
Kvp::write_kvps(&mut file, &[encoded])
.expect("Child: write_kvps failed");
}
return; // done – the parent will verify the file.
}

// --- Parent orchestrator path ---
let temp_file =
NamedTempFile::new().expect("Failed to create tempfile");
let temp_path = temp_file.path().to_path_buf();

let test_exe = std::env::current_exe()
.expect("Failed to determine test executable path");

let children: Vec<_> = (0..num_processes)
.map(|_| {
std::process::Command::new(&test_exe)
.env("__KVP_CHILD_WORKER_PATH", temp_path.to_str().unwrap())
.arg("--exact")
.arg("kvp::tests::test_multi_process_kvp_concurrent_writes")
.arg("--nocapture")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn child process")
})
.collect();

for mut child in children {
let status = child.wait().expect("Failed to wait on child");
assert!(
status.success(),
"Child process exited with failure: {status}"
);
}

let expected_records = num_processes * iterations;
assert_kvp_record_count(&temp_path, expected_records);
println!(
"Multi-process test passed: {expected_records} records verified."
);
}
}
14 changes: 12 additions & 2 deletions libazureinit/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tracing_subscriber::{
};

use crate::config::Config;
pub use crate::kvp::EVENT_PREFIX;
use crate::kvp::{EmitKVPLayer, Kvp as KvpInternal};

pub type LoggingSetup = (
Expand Down Expand Up @@ -153,7 +154,7 @@ struct KvpLayer<S: Subscriber>(Filtered<EmitKVPLayer, EnvFilter, S>);
///
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// let mut kvp = Kvp::new("a-unique-id")?;
/// let mut kvp = Kvp::new("a-unique-id", None)?;
/// let registry = tracing_subscriber::Registry::default().with(kvp.layer());
///
/// // When it's time to shut down, doing this ensures all writes are flushed
Expand All @@ -173,12 +174,20 @@ pub struct Kvp<S: Subscriber> {
impl<S: Subscriber + for<'lookup> LookupSpan<'lookup>> Kvp<S> {
/// Create a new tracing layer for KVP.
///
/// When `event_prefix` is `None`, the default [`EVENT_PREFIX`] is used.
/// Pass `Some("my-library-1.0")` to identify a different emitting library.
///
/// Refer to [`libazureinit::get_vm_id`] to retrieve the VM's unique identifier.
pub fn new<T: AsRef<str>>(vm_id: T) -> Result<Self, anyhow::Error> {
pub fn new<T: AsRef<str>>(
vm_id: T,
event_prefix: Option<&str>,
) -> Result<Self, anyhow::Error> {
let event_prefix = event_prefix.unwrap_or(EVENT_PREFIX);
let shutdown = CancellationToken::new();
let inner = KvpInternal::new(
std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"),
vm_id.as_ref(),
event_prefix,
shutdown.clone(),
)?;

Expand Down Expand Up @@ -244,6 +253,7 @@ pub fn setup_layers(
match KvpInternal::new(
std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"),
vm_id,
EVENT_PREFIX,
graceful_shutdown,
) {
Ok(kvp) => {
Expand Down