From cf24c2ab5f93544462d49d0b68568fe6ea78a545 Mon Sep 17 00:00:00 2001 From: Ben Kornmeier Date: Tue, 3 Oct 2023 20:51:53 -0600 Subject: [PATCH 1/2] messing around with a ring buffer --- src/log/mod.rs | 71 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/src/log/mod.rs b/src/log/mod.rs index 302ed0e..d5eed0c 100644 --- a/src/log/mod.rs +++ b/src/log/mod.rs @@ -1,16 +1,83 @@ use crate::error::*; use bytes::{Buf, BufMut, BytesMut}; -use parking_lot::Mutex; +use parking_lot::{Mutex, Condvar}; use std::{ cmp::Ordering, fs::File, io::{Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, atomic::{AtomicUsize, AtomicBool}}, }; +use std::sync::atomic::{Ordering as AOrd}; + //pub struct Recv +macro_rules! load { + () => { + + }; + ($($to_load:tt)*) => { + { + ($($to_load)*).load(AOrd::SeqCst) + } + }; +} + +#[derive(Clone, Default)] +struct Blocker(Arc<(Mutex<()>, Condvar )>); + +#[derive(Clone, Default)] +struct LogState { + writer_pos: Arc, // max position of the writer, reader can not pass this. + reader_pos: Arc, // position + writer_is_behind_reader: Arc, // writer can circle back if the reader is far enough along + max_size: usize, // our size limit that writer can't pass + blocker: Blocker, +} + + +impl LogState { + fn new(max_size: usize) -> Self { + Self { + max_size, + ..Default::default() + } + } + /// + /// Returns the position to start writing at + fn reserve_write(&self, size: usize) { + if load!(self.writer_is_behind_reader) { + // writer has wrapped behind reader + //let new_writer_pos = self.writer_pos.load(A) + } + + } +} + pub struct LogQueue { path: PathBuf, + state: LogState +} + +impl LogQueue { + pub fn new(path: PathBuf, max_size: usize) -> Self { + Self { + path, + state: LogState::default(), + } + } } + +pub struct LogSender { + inner: LogQueue, + state: LogState +} + +impl LogSender { + pub fn send(&self, data: &[u8]) -> Result<()> { + Ok(()) + + } + +} \ No newline at end of file From 9cb142c5de33b45d12fe7549a0fdcc43ec0a3f1d Mon Sep 17 00:00:00 2001 From: Ben Kornmeier Date: Wed, 31 Jan 2024 18:28:30 -0700 Subject: [PATCH 2/2] stuff --- src/log/mod.rs | 49 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/src/log/mod.rs b/src/log/mod.rs index d5eed0c..f8df347 100644 --- a/src/log/mod.rs +++ b/src/log/mod.rs @@ -9,14 +9,12 @@ use std::{ sync::{Arc, atomic::{AtomicUsize, AtomicBool}}, }; -use std::sync::atomic::{Ordering as AOrd}; - -//pub struct Recv +use std::sync::atomic::Ordering as AOrd; macro_rules! load { - () => { + // () => { - }; + // }; ($($to_load:tt)*) => { { ($($to_load)*).load(AOrd::SeqCst) @@ -27,11 +25,14 @@ macro_rules! load { #[derive(Clone, Default)] struct Blocker(Arc<(Mutex<()>, Condvar )>); -#[derive(Clone, Default)] + +#[derive(Default)] struct LogState { - writer_pos: Arc, // max position of the writer, reader can not pass this. - reader_pos: Arc, // position - writer_is_behind_reader: Arc, // writer can circle back if the reader is far enough along + writer_reserve_pos: AtomicUsize, // max position of the writer, reader can not pass this. + writer_end_position: AtomicUsize, // max position of the writer, reader can not pass this. + max_reader_pos: AtomicUsize, // max position of the writer, reader can not pass this. + min_reader_pos: AtomicUsize, // max position of the writer, reader can not pass this. + writer_is_behind_reader: AtomicBool, // writer can circle back if the reader is far enough along max_size: usize, // our size limit that writer can't pass blocker: Blocker, } @@ -46,10 +47,36 @@ impl LogState { } /// /// Returns the position to start writing at - fn reserve_write(&self, size: usize) { + fn reserve_write(&self, size: usize) -> Option { + // reserve space before hand + + let current_writer_end_position = load!(self.writer_end_position); + + let current_writer_position = self.writer_reserve_pos.fetch_add(size, AOrd::SeqCst); + let next_writer_position = current_writer_position + size; + + let writer_is_behind = load!(self.writer_is_behind_reader); + let min_reader_pos = load!(self.min_reader_pos); + + if next_writer_position >= self.max_size { + //wrap around + self.writer_is_behind_reader.store(true, AOrd::SeqCst); + self.writer_reserve_pos.fetch_sub(size, AOrd::SeqCst); //Rollback our previous reservation + self.writer_end_position.fetch_max(current_writer_end_position, AOrd::SeqCst); + return self.reserve_write(size) + } + + if writer_is_behind && next_writer_position >= min_reader_pos { + //need to block here + + } + if load!(self.writer_is_behind_reader) { + // writer has wrapped behind reader - //let new_writer_pos = self.writer_pos.load(A) + } else { + + } }