Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aedb"
version = "0.1.3"
version = "0.1.4"
edition = "2024"
description = "Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads"
license = "MIT OR Apache-2.0"
Expand Down
5 changes: 5 additions & 0 deletions src/commit/executor/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ pub(super) fn process_commit_epoch(
wal_sync_micros,
sync_executed,
catalog_changed,
..EpochProcessResult::default()
};
}

Expand Down Expand Up @@ -673,6 +674,7 @@ pub(super) fn process_commit_epoch(
wal_sync_micros,
sync_executed,
catalog_changed,
..EpochProcessResult::default()
};
}

Expand Down Expand Up @@ -736,6 +738,7 @@ pub(super) fn process_commit_epoch(
wal_sync_micros,
sync_executed,
catalog_changed,
..EpochProcessResult::default()
};
}
wal_append_ops = wal_append_ops.saturating_add(1);
Expand Down Expand Up @@ -773,6 +776,7 @@ pub(super) fn process_commit_epoch(
wal_sync_micros,
sync_executed,
catalog_changed,
..EpochProcessResult::default()
};
}
wal_sync_ops = wal_sync_ops.saturating_add(1);
Expand Down Expand Up @@ -901,6 +905,7 @@ pub(super) fn process_commit_epoch(
wal_sync_micros,
sync_executed,
catalog_changed,
..EpochProcessResult::default()
}
}

Expand Down
75 changes: 55 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4517,9 +4517,9 @@ impl AedbInstance {
for pk in &schema.primary_key {
query = query.order_by(pk, Order::Asc);
}
let query_result = if let Some(caller_ref) = caller.as_ref() {
self.query_with_options_as(
Some(caller_ref),
let query_result = self
.query_with_options_as(
caller.as_ref(),
project_id,
scope_id,
query,
Expand All @@ -4530,21 +4530,7 @@ impl AedbInstance {
},
)
.await
.map_err(query_error_to_aedb)?
} else {
self.query_with_options(
project_id,
scope_id,
query,
QueryOptions {
consistency: ConsistencyMode::AtLatest,
allow_full_scan: true,
..QueryOptions::default()
},
)
.await
.map_err(query_error_to_aedb)?
};
.map_err(query_error_to_aedb)?;

let Some(before) = query_result.rows.first().cloned() else {
return Ok(None);
Expand Down Expand Up @@ -6350,7 +6336,11 @@ impl ReadTx<'_> {
hydrate_query: Query,
hydrate_key_column: &str,
) -> Result<(QueryResult, QueryResult), QueryError> {
let source = self.query(project_id, scope_id, source_query).await?;
let mut source = self.query(project_id, scope_id, source_query).await?;
// This helper treats the source query's limit as the caller's complete key set.
// Do not surface pagination state from the underlying query engine here.
source.cursor = None;
source.truncated = false;
let keys = source
.rows
.iter()
Expand All @@ -6363,6 +6353,7 @@ impl ReadTx<'_> {
rows: Vec::new(),
rows_examined: 0,
cursor: None,
truncated: false,
snapshot_seq: self.lease.view.seq,
materialized_seq: None,
},
Expand All @@ -6376,7 +6367,51 @@ impl ReadTx<'_> {
}),
..hydrate_query
};
let hydrated = self.query(project_id, scope_id, hydrate_query).await?;
let page_size = self.db._config.max_scan_rows.min(100).max(1);
let mut hydrate_query = ensure_stable_order_from_catalog(
project_id,
scope_id,
&self.lease.view.catalog,
hydrate_query,
);
hydrate_query.limit = Some(page_size);

let mut all_rows = Vec::new();
let mut total_rows_examined = 0usize;
let mut next_cursor: Option<String> = None;
let mut materialized_seq = None;
loop {
let page = self
.query_with_options(
project_id,
scope_id,
hydrate_query.clone(),
QueryOptions {
consistency: ConsistencyMode::AtSeq(self.lease.view.seq),
cursor: next_cursor.clone(),
..QueryOptions::default()
},
)
.await?;
total_rows_examined = total_rows_examined.saturating_add(page.rows_examined);
if materialized_seq.is_none() {
materialized_seq = page.materialized_seq;
}
all_rows.extend(page.rows);
if let Some(cursor) = page.cursor {
next_cursor = Some(cursor);
continue;
}
break;
}
let hydrated = QueryResult {
rows: all_rows,
rows_examined: total_rows_examined,
cursor: None,
truncated: false,
snapshot_seq: self.lease.view.seq,
materialized_seq,
};
Ok((source, hydrated))
}
}
92 changes: 92 additions & 0 deletions src/lib_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6616,7 +6616,99 @@ async fn list_batch_and_lookup_helpers_work() {
.await
.expect("lookup/hydrate");
assert_eq!(source.rows.len(), 3);
assert!(!source.truncated);
assert_eq!(hydrated.rows.len(), 3);
assert!(!hydrated.truncated);
}

#[tokio::test]
async fn lookup_then_hydrate_fetches_all_pages_for_large_key_sets() {
let dir = tempdir().expect("temp");
let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open");
db.create_project("p").await.expect("project");
db.create_scope("p", "app").await.expect("scope");

create_table(
&db,
"p",
"app",
"items",
vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Integer,
nullable: false,
},
ColumnDef {
name: "user_id".into(),
col_type: ColumnType::Integer,
nullable: false,
},
],
vec!["id"],
)
.await;
create_table(
&db,
"p",
"app",
"users",
vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Integer,
nullable: false,
},
ColumnDef {
name: "username".into(),
col_type: ColumnType::Text,
nullable: false,
},
],
vec!["id"],
)
.await;

for id in 1_i64..=1_000_i64 {
db.commit(Mutation::Upsert {
project_id: "p".into(),
scope_id: "app".into(),
table_name: "items".into(),
primary_key: vec![Value::Integer(id)],
row: Row::from_values(vec![Value::Integer(id), Value::Integer(id)]),
})
.await
.expect("insert item");
db.commit(Mutation::Upsert {
project_id: "p".into(),
scope_id: "app".into(),
table_name: "users".into(),
primary_key: vec![Value::Integer(id)],
row: Row::from_values(vec![
Value::Integer(id),
Value::Text(format!("u{id}").into()),
]),
})
.await
.expect("insert user");
}

let (source, hydrated) = db
.lookup_then_hydrate(
"p",
"app",
Query::select(&["user_id"]).from("items").limit(1_000),
0,
Query::select(&["id", "username"]).from("users"),
"id",
ConsistencyMode::AtLatest,
)
.await
.expect("lookup/hydrate");
assert_eq!(source.rows.len(), 1_000);
assert!(!source.truncated);
assert_eq!(hydrated.rows.len(), 1_000);
assert!(!hydrated.truncated);
}

#[tokio::test]
Expand Down
Loading
Loading