Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benchmark/bench_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ struct CliArgs {
/// IO mode, available options: uring, uring-direct, std-blocking, tokio, std-spawn-blocking
#[arg(long = "io-mode", default_value = "uring-multi-async")]
io_mode: IoMode,

#[arg(long = "fixed-buffer-pool-size-mb", default_value = "0")]
fixed_buffer_pool_size_mb: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -80,6 +83,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Box::new(LiquidPolicy::new()),
squeeze_policy,
Some(args.io_mode),
args.fixed_buffer_pool_size_mb,
)?;

let liquid_cache_server = Arc::new(liquid_cache_server);
Expand Down
9 changes: 7 additions & 2 deletions benchmark/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use fastrace::prelude::*;
use liquid_cache_benchmarks::{
BenchmarkManifest, InProcessBenchmarkMode, InProcessBenchmarkRunner, setup_observability,
};
use liquid_cache_common::IoMode;
use liquid_cache_common::{IoMode, memory::pool::FixedBufferPool};
use mimalloc::MiMalloc;
use serde::Serialize;
use std::path::PathBuf;
Expand Down Expand Up @@ -66,6 +66,9 @@ struct InProcessBenchmark {
/// IO mode, available options: uring, uring-direct, std-blocking, tokio, std-spawn-blocking
#[arg(long = "io-mode", default_value = "uring-multi-async")]
io_mode: IoMode,

#[arg(long = "fixed-buffer-pool-size-mb", default_value = "0")]
fixed_buffer_pool_size_mb: usize,
}

impl InProcessBenchmark {
Expand All @@ -83,7 +86,8 @@ impl InProcessBenchmark {
.with_cache_dir(self.cache_dir.clone())
.with_query_filter(self.query_index)
.with_io_mode(self.io_mode)
.with_output_dir(self.output_dir.clone());
.with_output_dir(self.output_dir.clone())
.with_fixed_buffer_pool_size_mb(self.fixed_buffer_pool_size_mb);
runner.run(manifest, self, output).await?;
Ok(())
}
Expand All @@ -97,6 +101,7 @@ async fn main() -> Result<()> {
let _guard = root.set_local_parent();

benchmark.run().await?;
FixedBufferPool::print_stats();
fastrace::flush();
Ok(())
}
9 changes: 9 additions & 0 deletions benchmark/src/inprocess_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct InProcessBenchmarkRunner {
pub cache_dir: Option<PathBuf>,
pub io_mode: IoMode,
pub output_dir: Option<PathBuf>,
pub fixed_buffer_pool_size_mb: usize,
}

impl Default for InProcessBenchmarkRunner {
Expand All @@ -129,6 +130,7 @@ impl InProcessBenchmarkRunner {
cache_dir: None,
io_mode: IoMode::default(),
output_dir: None,
fixed_buffer_pool_size_mb: 0,
}
}

Expand Down Expand Up @@ -182,6 +184,11 @@ impl InProcessBenchmarkRunner {
self
}

pub fn with_fixed_buffer_pool_size_mb(mut self, fixed_buffer_pool_size_mb: usize) -> Self {
self.fixed_buffer_pool_size_mb = fixed_buffer_pool_size_mb;
self
}

#[fastrace::trace]
async fn setup_context(
&self,
Expand Down Expand Up @@ -245,6 +252,7 @@ impl InProcessBenchmarkRunner {
.with_cache_policy(Box::new(LiquidPolicy::new()))
.with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
.with_io_mode(self.io_mode)
.with_fixed_buffer_pool_size_mb(self.fixed_buffer_pool_size_mb)
.build(session_config)?;
(v.0, Some(v.1))
}
Expand All @@ -255,6 +263,7 @@ impl InProcessBenchmarkRunner {
.with_cache_policy(Box::new(LiquidPolicy::new()))
.with_squeeze_policy(Box::new(TranscodeEvict))
.with_io_mode(self.io_mode)
.with_fixed_buffer_pool_size_mb(self.fixed_buffer_pool_size_mb)
.build(session_config)?;
(v.0, Some(v.1))
}
Expand Down
18 changes: 17 additions & 1 deletion dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ LiquidCache exports OpenTelemetry traces. Spin up a Jaeger v2
```bash
docker run \
--name jaeger \
--replace \
-e COLLECTOR_OTLP_ENABLED=true \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
cr.jaegertracing.io/jaegertracing/jaeger:2.11.0
```

If a container named `jaeger` already exists, remove it first: `docker rm -f jaeger` (or `podman rm -f jaeger`).

This image contains the Jaeger v2 distribution.
Port 16686 exposes the frontend UI at http://localhost:16686.
4317 and 4318 expose OTLP over gRPC and HTTP respectively.
Expand Down Expand Up @@ -76,6 +77,21 @@ This will trace the execution of `iteration = 2` (`arg1 == 2`) and print the `io
[512, 1K) 194 |@@@ |
```

```bash
sudo bpftrace -e '
usdt:./target/release/in_process:liquid_benchmark:iteration_start /arg1 == 2/ {@enable = 1;}
usdt:./target/release/in_process:liquid_benchmark:iteration_start /arg1 > 2/ {@enable = 0;}
usdt:./target/release/in_process:io_submitted /@enable/ {
@t[arg0] = nsecs;
}
usdt:./target/release/in_process:io_completed /@enable && @t[arg0]/ {
$us = (nsecs - @t[arg0]) / 1000;
@lat = hist($us);
delete(@t[arg0]);
}
'
```

If you're using blocking io mode, try this:
```bash
sudo bpftrace -e '
Expand Down
1 change: 1 addition & 0 deletions examples/example_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Box::new(LruPolicy::new()),
Box::new(TranscodeSqueezeEvict),
Some(IoMode::default()),
0,
)?;

let flight = FlightServiceServer::new(liquid_cache);
Expand Down
6 changes: 6 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ arrow-flight = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = "0.4.42"
crossbeam = "0.8.4"
futures = { workspace = true }
io-uring = "0.7.11"
libc = "0.2.177"
log.workspace = true
object_store = { workspace = true }
prost = { workspace = true }
rand = "0.9.2"
serde = { workspace = true }
tempfile.workspace = true
thiserror = "2.0.17"
tokio = { workspace = true }
url = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub mod mock_store;
pub mod rpc;
pub mod utils;
pub use io_mode::IoMode;
pub mod memory;
125 changes: 125 additions & 0 deletions src/common/src/memory/arena.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::{io, os::raw::c_void, ptr::null_mut};

use io_uring::IoUring;

use crate::memory::{
page::Slice, pool::{FIXED_BUFFER_BITS, FIXED_BUFFER_SIZE_BYTES}, segment::{SEGMENT_SIZE, SEGMENT_SIZE_BITS, Segment}
};

pub struct Arena {
size: usize,
slices: Vec<Slice>,
used_bitmap: Vec<u8>,
/**
* Segments need to be aligned to 32MB boundaries. Hence the first segment's starting address
* could be different from the starting address of the allocated memory
*/
aligned_start_ptr: *mut u8,
actual_start_ptr: *mut u8,
buffers_registered: bool,
}

unsafe impl Send for Arena {}
unsafe impl Sync for Arena {}

impl Arena {
pub fn new(capacity: usize) -> Arena {
let mem_start = Self::allocate_memory_from_os(capacity);
assert_ne!(mem_start, null_mut());
let mem_end = mem_start.wrapping_add(capacity);
let ptr_aligned = (mem_start as usize >> SEGMENT_SIZE_BITS) << SEGMENT_SIZE_BITS;
let mut slice_start = ptr_aligned;
if ptr_aligned != (mem_start as usize) {
slice_start = ptr_aligned + SEGMENT_SIZE;
}
let mut slices = Vec::new();
while slice_start + SEGMENT_SIZE <= mem_end as usize {
slices.push(Slice {
ptr: slice_start as *mut u8,
size: SEGMENT_SIZE,
});
slice_start += SEGMENT_SIZE;
}
let mut used_bitmap = Vec::new();
used_bitmap.resize(slices.len(), 0);

Arena {
size: capacity,
slices: slices,
used_bitmap: used_bitmap,
aligned_start_ptr: ptr_aligned as *mut u8,
actual_start_ptr: mem_start,
buffers_registered: false,
}
}

fn allocate_memory_from_os(capacity: usize) -> *mut u8 {
let prot = libc::PROT_READ | libc::PROT_WRITE;
let flags = libc::MAP_ANONYMOUS | libc::MAP_PRIVATE;
unsafe { libc::mmap64(null_mut(), capacity, prot, flags, -1, 0) as *mut u8 }
}

pub fn allocate_segment(self: &mut Self, size: usize) -> Option<*mut Segment> {
let num_slices = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
let mut contiguous = 0;
let mut result: i32 = -1;

for index in 0..self.used_bitmap.len() {
let bit = self.used_bitmap[index];
if bit == 0 {
contiguous += 1;
if contiguous == num_slices {
result = (index + 1 - contiguous) as i32;
break;
}
} else {
contiguous = 0;
}
}
if result == -1 {
return None;
}
for i in 0..contiguous {
self.used_bitmap[result as usize + i] = 1;
}
let combined_slice = Slice {
ptr: self.slices[result as usize].ptr,
size: num_slices * SEGMENT_SIZE,
};
Some(Segment::new_from_slice(combined_slice))
}

pub(crate) fn start_ptr(self: &Self) -> *mut u8 {
self.aligned_start_ptr
}

pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
debug_assert!((self.slices[0].ptr as usize) <= segment as usize);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

self.slices can be empty if the arena is created with a capacity smaller than SEGMENT_SIZE. Accessing self.slices[0] on this line would cause a panic in that case. It's better to handle this case gracefully, for example by adding a check for self.slices.is_empty() before this line.

let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE;
self.used_bitmap[segment_idx] = 0;
Comment on lines +96 to +99
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The retire_segment function incorrectly assumes that a segment is always composed of a single slice from the arena, as it only frees one entry in used_bitmap. However, allocate_segment is capable of allocating multiple contiguous slices. If a segment is ever allocated with more than one slice, this function will only free the first one, leading to a memory leak within the arena. To fix this, the Segment should store the number of slices it's composed of, and retire_segment should use this information to free all corresponding entries in used_bitmap.

}
Comment on lines +96 to +100
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The retire_segment function will panic on self.slices[0] if self.slices is empty. This can happen if the arena is created with a capacity smaller than SEGMENT_SIZE. While current call paths might avoid this, it's safer to make this internal function more robust. Adding a check for self.slices.is_empty() at the beginning of the function would prevent this potential panic, especially in debug builds.

    pub(crate) fn retire_segment(self: &mut Self, segment: *mut Segment) {
        if self.slices.is_empty() {
            return;
        }
        debug_assert!((self.slices[0].ptr as usize) <= segment as usize);
        let segment_idx = (segment as usize - self.slices[0].ptr as usize) / SEGMENT_SIZE;
        self.used_bitmap[segment_idx] = 0;
    }


pub(crate) fn register_buffers_with_ring(self: &mut Self, ring: &IoUring) -> io::Result<()> {
let num_buffers = self.size >> FIXED_BUFFER_BITS;
let mut buffers = Vec::<libc::iovec>::new();
buffers.reserve(num_buffers);
let mut base_ptr = self.aligned_start_ptr;
for _i in 0..num_buffers {
buffers.push(libc::iovec {iov_base: base_ptr as *mut std::ffi::c_void, iov_len: FIXED_BUFFER_SIZE_BYTES});
base_ptr = base_ptr.wrapping_add(FIXED_BUFFER_SIZE_BYTES);
}
let res = unsafe {
ring.submitter().register_buffers(&buffers)
};
self.buffers_registered = res.is_ok();
res
}
}

impl Drop for Arena {
fn drop(self: &mut Self) {
unsafe {
libc::munmap(self.actual_start_ptr as *mut c_void, self.size);
}
}
}
Empty file.
6 changes: 6 additions & 0 deletions src/common/src/memory/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod page;
pub mod pool;
mod segment;
mod arena;
mod global_pool;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This module declaration for global_pool will cause a compilation error because the corresponding file src/common/src/memory/global_pool.rs is not included in this pull request. If this module is intended for future work, it should be commented out to prevent breaking the build.

Suggested change
mod global_pool;
mod global_pool; // This module seems to be missing.

mod tcache;
Loading