Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ lmdb = "0.8"
# SQLite
rusqlite = { version = "0.31", features = ["bundled"] }

# File I/O
memmap2 = "0.9"

# Serialization
bincode = "1.3"
serde_json = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/azoth-file-log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
memmap2 = { workspace = true }
parking_lot = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/azoth-file-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! Features:
//! - Fast sequential writes (no ACID overhead)
//! - Memory-mapped reads for iteration
//! - Buffered sequential reads for iteration
//! - Automatic log rotation based on size
//! - EventId allocation via atomic counter + file sync
//! - Multiple concurrent readers, single writer
Expand Down
33 changes: 17 additions & 16 deletions crates/azoth-file-log/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use azoth_core::{
event_log::{EventLog, EventLogIterator, EventLogStats},
types::EventId,
};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
Arc,
};

/// Configuration for file-based event log
Expand Down Expand Up @@ -128,7 +129,7 @@ impl FileEventLog {

/// Save metadata to disk
fn save_meta(&self) -> Result<()> {
let meta = self.meta.lock().unwrap();
let meta = self.meta.lock();
let meta_path = self.config.base_dir.join("meta.json");
// Use compact serialization (not pretty) for better performance
let data = serde_json::to_string(&*meta)
Expand Down Expand Up @@ -156,7 +157,7 @@ impl FileEventLog {
)));
}

let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();

// Write event_id (8 bytes, big-endian)
writer.write_all(&event_id.to_be_bytes())?;
Expand Down Expand Up @@ -191,7 +192,7 @@ impl FileEventLog {
fn rotate_internal(&self) -> Result<Option<PathBuf>> {
// Flush current writer
{
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
writer.flush()?;
}

Expand All @@ -204,7 +205,7 @@ impl FileEventLog {

// Update metadata
{
let mut meta = self.meta.lock().unwrap();
let mut meta = self.meta.lock();
meta.current_file_num = new_file_num;
}
self.save_meta()?;
Expand All @@ -218,7 +219,7 @@ impl FileEventLog {

// Replace writer
{
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
*writer = BufWriter::with_capacity(self.config.write_buffer_size, file);
}

Expand All @@ -239,13 +240,13 @@ impl EventLog for FileEventLog {

// Conditionally flush based on config
if self.config.flush_on_append {
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
writer.flush()?;
}

// Update metadata
{
let mut meta = self.meta.lock().unwrap();
let mut meta = self.meta.lock();
meta.next_event_id = event_id + 1;
meta.total_events += 1;
}
Expand Down Expand Up @@ -321,20 +322,20 @@ impl EventLog for FileEventLog {
}

// Single lock acquisition + single write syscall
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
writer.write_all(&buffer)?;
}

// Conditionally flush based on config
if self.config.flush_on_append {
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
writer.flush()?;
}

// Update metadata
let last_id = first_event_id + events.len() as u64 - 1;
{
let mut meta = self.meta.lock().unwrap();
let mut meta = self.meta.lock();
meta.next_event_id = last_id + 1;
meta.total_events += events.len() as u64;
}
Expand All @@ -359,11 +360,11 @@ impl EventLog for FileEventLog {
) -> Result<Box<dyn EventLogIterator>> {
// Flush writer to ensure all data is on disk
{
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
writer.flush()?;
}

let meta = self.meta.lock().unwrap();
let meta = self.meta.lock();
let end_id = end.unwrap_or(meta.next_event_id);

Ok(Box::new(FileEventLogIter::new(
Expand Down Expand Up @@ -405,7 +406,7 @@ impl EventLog for FileEventLog {
}

fn oldest_event_id(&self) -> Result<EventId> {
let meta = self.meta.lock().unwrap();
let meta = self.meta.lock();
Ok(meta.oldest_event_id)
}

Expand All @@ -419,15 +420,15 @@ impl EventLog for FileEventLog {
}

fn sync(&self) -> Result<()> {
let mut writer = self.writer.lock().unwrap();
let mut writer = self.writer.lock();
writer.flush()?;
writer.get_ref().sync_all()?;
self.save_meta()?;
Ok(())
}

fn stats(&self) -> Result<EventLogStats> {
let meta = self.meta.lock().unwrap();
let meta = self.meta.lock();

// Calculate total bytes across all log files
let mut total_bytes = 0u64;
Expand Down
7 changes: 4 additions & 3 deletions crates/azoth-lmdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use azoth_core::{
};
use azoth_file_log::{FileEventLog, FileEventLogConfig};
use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags};
use parking_lot::Mutex;
use std::path::Path;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
Arc,
};
use std::time::Duration;

Expand Down Expand Up @@ -316,7 +317,7 @@ impl CanonicalStore for LmdbCanonicalStore {
}

fn seal(&self) -> Result<EventId> {
let _guard = self.write_lock.lock().unwrap();
let _guard = self.write_lock.lock();

let mut txn = self
.env
Expand Down Expand Up @@ -426,7 +427,7 @@ impl LmdbCanonicalStore {
/// Sealing is used as a temporary barrier to create deterministic snapshots. Backups should
/// clear the seal before resuming ingestion; otherwise the DB becomes permanently read-only.
pub fn clear_seal(&self) -> Result<()> {
let _guard = self.write_lock.lock().unwrap();
let _guard = self.write_lock.lock();

let mut txn = self
.env
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-sqlite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tracing = { workspace = true }
anyhow = { workspace = true }
chrono = { workspace = true }
tokio = { workspace = true, features = ["time", "sync"] }
parking_lot = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/azoth-sqlite/src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use azoth_core::{
error::{AzothError, Result},
ReadPoolConfig,
};
use parking_lot::Mutex;
use rusqlite::{Connection, OpenFlags};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::{Duration, Instant};
use tokio::sync::{Semaphore, SemaphorePermit};

Expand All @@ -19,7 +19,7 @@ use tokio::sync::{Semaphore, SemaphorePermit};
/// This wraps a SQLite read-only connection with automatic permit release
/// when the connection is returned to the pool.
pub struct PooledSqliteConnection<'a> {
conn: std::sync::MutexGuard<'a, Connection>,
conn: parking_lot::MutexGuard<'a, Connection>,
_permit: SemaphorePermit<'a>,
}

Expand Down Expand Up @@ -128,7 +128,7 @@ impl SqliteReadPool {
let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
for i in 0..self.connections.len() {
let idx = (start + i) % self.connections.len();
if let Ok(guard) = self.connections[idx].try_lock() {
if let Some(guard) = self.connections[idx].try_lock() {
return Ok(PooledSqliteConnection {
conn: guard,
_permit: permit,
Expand All @@ -152,7 +152,7 @@ impl SqliteReadPool {
let start = self.next_idx.fetch_add(1, Ordering::Relaxed) % self.connections.len();
for i in 0..self.connections.len() {
let idx = (start + i) % self.connections.len();
if let Ok(guard) = self.connections[idx].try_lock() {
if let Some(guard) = self.connections[idx].try_lock() {
return Ok(Some(PooledSqliteConnection {
conn: guard,
_permit: permit,
Expand Down
25 changes: 13 additions & 12 deletions crates/azoth-sqlite/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use azoth_core::{
types::EventId,
ProjectionConfig,
};
use parking_lot::Mutex;
use rusqlite::{Connection, OpenFlags};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use crate::read_pool::SqliteReadPool;
use crate::schema;
Expand Down Expand Up @@ -138,7 +139,7 @@ impl SqliteProjectionStore {

let conn = self.read_conn.clone();
tokio::task::spawn_blocking(move || {
let conn_guard = conn.lock().unwrap();
let conn_guard = conn.lock();
f(&conn_guard)
})
.await
Expand Down Expand Up @@ -166,7 +167,7 @@ impl SqliteProjectionStore {
return f(conn.connection());
}

let conn_guard = self.read_conn.lock().unwrap();
let conn_guard = self.read_conn.lock();
f(&conn_guard)
}

Expand All @@ -189,7 +190,7 @@ impl SqliteProjectionStore {
{
let conn = self.write_conn.clone();
tokio::task::spawn_blocking(move || {
let conn_guard = conn.lock().unwrap();
let conn_guard = conn.lock();
f(&conn_guard)
})
.await
Expand All @@ -211,7 +212,7 @@ impl SqliteProjectionStore {
where
F: FnOnce(&Connection) -> Result<()>,
{
let conn_guard = self.write_conn.lock().unwrap();
let conn_guard = self.write_conn.lock();
f(&conn_guard)
}

Expand All @@ -233,7 +234,7 @@ impl SqliteProjectionStore {
where
F: FnOnce(&rusqlite::Transaction) -> Result<()>,
{
let mut conn_guard = self.write_conn.lock().unwrap();
let mut conn_guard = self.write_conn.lock();
let tx = conn_guard
.transaction()
.map_err(|e| AzothError::Projection(e.to_string()))?;
Expand All @@ -254,7 +255,7 @@ impl SqliteProjectionStore {
{
let conn = self.write_conn.clone();
tokio::task::spawn_blocking(move || {
let mut conn_guard = conn.lock().unwrap();
let mut conn_guard = conn.lock();
let tx = conn_guard
.transaction()
.map_err(|e| AzothError::Projection(e.to_string()))?;
Expand Down Expand Up @@ -337,13 +338,13 @@ impl ProjectionStore for SqliteProjectionStore {

fn begin_txn(&self) -> Result<Self::Txn<'_>> {
// Begin exclusive transaction using SimpleProjectionTxn (uses write connection)
let guard = self.write_conn.lock().unwrap();
let guard = self.write_conn.lock();
SimpleProjectionTxn::new(guard)
}

fn get_cursor(&self) -> Result<EventId> {
// Use read connection for this read-only operation
let conn = self.read_conn.lock().unwrap();
let conn = self.read_conn.lock();
let cursor: i64 = conn
.query_row(
"SELECT last_applied_event_id FROM projection_meta WHERE id = 0",
Expand All @@ -356,14 +357,14 @@ impl ProjectionStore for SqliteProjectionStore {
}

fn migrate(&self, target_version: u32) -> Result<()> {
let conn = self.write_conn.lock().unwrap();
let conn = self.write_conn.lock();
schema::migrate(&conn, target_version)
}

fn backup_to(&self, path: &Path) -> Result<()> {
// Checkpoint WAL to flush all changes to the main database file
{
let conn = self.write_conn.lock().unwrap();
let conn = self.write_conn.lock();
// Execute checkpoint with full iteration of results
let mut stmt = conn
.prepare("PRAGMA wal_checkpoint(RESTART)")
Expand Down Expand Up @@ -394,7 +395,7 @@ impl ProjectionStore for SqliteProjectionStore {

fn schema_version(&self) -> Result<u32> {
// Use read connection for this read-only operation
let conn = self.read_conn.lock().unwrap();
let conn = self.read_conn.lock();
let version: i64 = conn
.query_row(
"SELECT schema_version FROM projection_meta WHERE id = 0",
Expand Down
2 changes: 1 addition & 1 deletion crates/azoth-sqlite/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use azoth_core::{
traits::ProjectionTxn,
types::EventId,
};
use parking_lot::MutexGuard;
use rusqlite::Connection;
use std::sync::MutexGuard;

// Projection transaction that works with Connection directly
pub struct SimpleProjectionTxn<'a> {
Expand Down
Loading