From 4e62a2bbf776e99c43c7d131c4304b3a0e42c1b5 Mon Sep 17 00:00:00 2001 From: johnny Date: Fri, 27 Feb 2026 07:53:22 -0500 Subject: [PATCH] add u8 columns --- src/catalog/types.rs | 22 ++- src/commit/assertions.rs | 24 ++- src/commit/validation.rs | 2 + src/lib.rs | 13 ++ src/lib_helpers.rs | 14 ++ src/lib_tests.rs | 384 ++++++++++++++++++++++++++++++++++++- src/query/executor.rs | 355 ++++++++++++++++++++++++++++------ src/query/operators.rs | 33 +++- src/query/plan.rs | 6 + src/repository.rs | 1 + src/storage/encoded_key.rs | 4 + src/storage/keyspace.rs | 1 + 12 files changed, 775 insertions(+), 84 deletions(-) diff --git a/src/catalog/types.rs b/src/catalog/types.rs index 8581f6a..2de66a9 100644 --- a/src/catalog/types.rs +++ b/src/catalog/types.rs @@ -8,6 +8,7 @@ use std::hash::{Hash, Hasher}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum ColumnType { Text, + U8, Integer, Float, Boolean, @@ -111,6 +112,7 @@ where #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Value { Text(CompactString), + U8(u8), Integer(i64), Float(f64), Boolean(bool), @@ -135,6 +137,7 @@ impl Hash for Value { self.kind_rank().hash(state); match self { Value::Text(v) => v.hash(state), + Value::U8(v) => v.hash(state), Value::Integer(v) => v.hash(state), Value::Float(v) => v.to_bits().hash(state), Value::Boolean(v) => v.hash(state), @@ -164,14 +167,15 @@ impl Value { match self { Value::Null => 0, Value::Boolean(_) => 1, - Value::Integer(_) => 2, - Value::U256(_) => 3, - Value::I256(_) => 4, - Value::Timestamp(_) => 5, - Value::Float(_) => 6, - Value::Text(_) => 7, - Value::Json(_) => 8, - Value::Blob(_) => 9, + Value::U8(_) => 2, + Value::Integer(_) => 3, + Value::U256(_) => 4, + Value::I256(_) => 5, + Value::Timestamp(_) => 6, + Value::Float(_) => 7, + Value::Text(_) => 8, + Value::Json(_) => 9, + Value::Blob(_) => 10, } } } @@ -200,6 +204,7 @@ impl Ord for Value { match (self, other) { (Value::Null, Value::Null) => Ordering::Equal, (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b), + (Value::U8(a), Value::U8(b)) => a.cmp(b), (Value::Integer(a), Value::Integer(b)) => a.cmp(b), (Value::U256(a), Value::U256(b)) => a.cmp(b), (Value::I256(a), Value::I256(b)) => a.cmp(b), @@ -221,6 +226,7 @@ mod tests { fn arb_value() -> impl Strategy { let leaf = prop_oneof![ any::().prop_map(Value::Boolean), + any::().prop_map(Value::U8), any::().prop_map(Value::Integer), prop::array::uniform32(any::()).prop_map(Value::U256), prop::array::uniform32(any::()).prop_map(Value::I256), diff --git a/src/commit/assertions.rs b/src/commit/assertions.rs index 0fdb800..b096206 100644 --- a/src/commit/assertions.rs +++ b/src/commit/assertions.rs @@ -133,7 +133,11 @@ fn validate_assertion(catalog: &Catalog, assertion: &ReadAssertion) -> Result<() }; if !matches!( col.col_type, - ColumnType::Integer | ColumnType::Float | ColumnType::U256 | ColumnType::Timestamp + ColumnType::U8 + | ColumnType::Integer + | ColumnType::Float + | ColumnType::U256 + | ColumnType::Timestamp ) { return Err(AedbError::Validation(format!( "column {column} is not numeric for SumCompare" @@ -470,17 +474,25 @@ fn sum_rows_for_column<'a>( ) -> Result { let col_type = &schema.columns[column_idx].col_type; match col_type { - ColumnType::Integer | ColumnType::Timestamp => { + ColumnType::U8 | ColumnType::Integer | ColumnType::Timestamp => { let mut sum: i64 = 0; for row in rows { if !match_filter(row, schema, filter)? { continue; } - if let Some(Value::Integer(v) | Value::Timestamp(v)) = row.values.get(column_idx) { - sum = sum.checked_add(*v).ok_or(AedbError::Overflow)?; + if let Some(value) = row.values.get(column_idx) { + let addend = match value { + Value::U8(x) => *x as i64, + Value::Integer(x) | Value::Timestamp(x) => *x, + _ => continue, + }; + sum = sum.checked_add(addend).ok_or(AedbError::Overflow)?; } } - if matches!(col_type, ColumnType::Timestamp) { + if matches!(col_type, ColumnType::U8) { + let as_u8 = u8::try_from(sum).map_err(|_| AedbError::Overflow)?; + Ok(Value::U8(as_u8)) + } else if matches!(col_type, ColumnType::Timestamp) { Ok(Value::Timestamp(sum)) } else { Ok(Value::Integer(sum)) @@ -521,6 +533,7 @@ fn sum_rows_for_column<'a>( fn zero_for_threshold(threshold: &Value) -> Value { match threshold { + Value::U8(_) => Value::U8(0), Value::Integer(_) => Value::Integer(0), Value::Timestamp(_) => Value::Timestamp(0), Value::Float(_) => Value::Float(0.0), @@ -559,6 +572,7 @@ fn value_matches_type(value: &Value, col_type: &ColumnType) -> bool { matches!( (value, col_type), (Value::Text(_), ColumnType::Text) + | (Value::U8(_), ColumnType::U8) | (Value::Integer(_), ColumnType::Integer) | (Value::Float(_), ColumnType::Float) | (Value::Boolean(_), ColumnType::Boolean) diff --git a/src/commit/validation.rs b/src/commit/validation.rs index 8f0ae42..bf0fb92 100644 --- a/src/commit/validation.rs +++ b/src/commit/validation.rs @@ -1253,6 +1253,7 @@ fn value_matches_type(value: &Value, ty: &ColumnType) -> bool { matches!( (value, ty), (Value::Text(_), ColumnType::Text) + | (Value::U8(_), ColumnType::U8) | (Value::Integer(_), ColumnType::Integer) | (Value::Float(_), ColumnType::Float) | (Value::Boolean(_), ColumnType::Boolean) @@ -1267,6 +1268,7 @@ fn value_matches_type(value: &Value, ty: &ColumnType) -> bool { fn value_type_name(value: &Value) -> &'static str { match value { Value::Text(_) => "Text", + Value::U8(_) => "U8", Value::Integer(_) => "Integer", Value::Float(_) => "Float", Value::Boolean(_) => "Boolean", diff --git a/src/lib.rs b/src/lib.rs index d3af717..c0c2833 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -341,11 +341,24 @@ pub struct QueryDiagnostics { pub estimated_scan_rows: u64, pub max_scan_rows: u64, pub index_used: Option, + pub selected_indexes: Vec, + pub predicate_evaluation_path: PredicateEvaluationPath, + pub plan_trace: Vec, pub stages: Vec, pub bounded_by_limit_or_cursor: bool, pub has_joins: bool, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PredicateEvaluationPath { + None, + PrimaryKeyEqLookup, + SecondaryIndexLookup, + AsyncIndexProjection, + FullScanFilter, + JoinExecution, +} + #[derive(Debug, Clone)] pub struct QueryWithDiagnosticsResult { pub result: QueryResult, diff --git a/src/lib_helpers.rs b/src/lib_helpers.rs index 5c0f1ec..3a909c7 100644 --- a/src/lib_helpers.rs +++ b/src/lib_helpers.rs @@ -80,6 +80,14 @@ pub(crate) fn explain_query_against_view( .clone() .or_else(|| query.use_index.clone()); let bounded_by_limit_or_cursor = query.limit.is_some() || options.cursor.is_some(); + let access_path = crate::query::executor::explain_access_path_for_query( + snapshot.as_ref(), + catalog.as_ref(), + project_id, + scope_id, + &query, + options, + )?; if !query.joins.is_empty() { let mut estimated = snapshot .table(project_id, scope_id, &query.table) @@ -111,6 +119,9 @@ pub(crate) fn explain_query_against_view( estimated_scan_rows: estimated, max_scan_rows: max_scan_rows as u64, index_used, + selected_indexes: access_path.selected_indexes, + predicate_evaluation_path: access_path.predicate_evaluation_path, + plan_trace: access_path.plan_trace, stages, bounded_by_limit_or_cursor, has_joins: true, @@ -141,6 +152,9 @@ pub(crate) fn explain_query_against_view( estimated_scan_rows, max_scan_rows: max_scan_rows as u64, index_used, + selected_indexes: access_path.selected_indexes, + predicate_evaluation_path: access_path.predicate_evaluation_path, + plan_trace: access_path.plan_trace, stages: planned.stages, bounded_by_limit_or_cursor, has_joins: false, diff --git a/src/lib_tests.rs b/src/lib_tests.rs index eefeb57..9141666 100644 --- a/src/lib_tests.rs +++ b/src/lib_tests.rs @@ -3,7 +3,8 @@ use super::{ QueryBatchItem, QueryCommitTelemetryHook, QueryTelemetryEvent, ReadOnlySqlAdapter, RecoveryCache, RemoteBackupAdapter, SYSTEM_CALLER_ID, }; -use crate::catalog::schema::ColumnDef; +use crate::PredicateEvaluationPath; +use crate::catalog::schema::{ColumnDef, IndexType}; use crate::catalog::types::{ColumnType, Row, Value}; use crate::catalog::{DdlOperation, ResourceType}; use crate::commit::tx::{TransactionEnvelope, WriteClass, WriteIntent}; @@ -4149,6 +4150,10 @@ async fn exists_and_explain_diagnostics_work() { .expect("explain"); assert!(explain.estimated_scan_rows >= 1); assert!(explain.stages.contains(&ExecutionStage::Scan)); + assert!( + !explain.plan_trace.is_empty(), + "explain should include access-path trace" + ); let with_diag = tx .query_with_diagnostics( @@ -4166,6 +4171,383 @@ async fn exists_and_explain_diagnostics_work() { assert_eq!(with_diag.diagnostics.snapshot_seq, tx.snapshot_seq()); } +#[tokio::test] +async fn non_pk_text_eq_regression_in_project_scope_indexed_and_non_indexed_paths() { + let dir = tempdir().expect("temp"); + let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open"); + db.create_project("p").await.expect("project"); + create_table( + &db, + "p", + "app", + "sessions", + vec![ + ColumnDef { + name: "id".into(), + col_type: ColumnType::Integer, + nullable: false, + }, + ColumnDef { + name: "user_id".into(), + col_type: ColumnType::Text, + nullable: false, + }, + ColumnDef { + name: "status".into(), + col_type: ColumnType::Text, + nullable: false, + }, + ], + vec!["id"], + ) + .await; + + for (id, user_id, status) in [ + (1_i64, "8a25f1bc-ea96-48d0-8535-47b784a2df1d", "open"), + (2, "8a25f1bc-ea96-48d0-8535-47b784a2df1d", "closed"), + (3, "2cf2434c-ed95-4f35-b786-4853592e6f25", "open"), + ] { + db.commit(Mutation::Upsert { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "sessions".into(), + primary_key: vec![Value::Integer(id)], + row: Row::from_values(vec![ + Value::Integer(id), + Value::Text(user_id.into()), + Value::Text(status.into()), + ]), + }) + .await + .expect("seed session"); + } + + let query = Query::select(&["id", "user_id"]) + .from("sessions") + .where_(Expr::Eq( + "user_id".into(), + Value::Text("8a25f1bc-ea96-48d0-8535-47b784a2df1d".into()), + )); + let pre_index_result = db + .query("p", "app", query.clone()) + .await + .expect("eq query without index"); + assert_eq!(pre_index_result.rows.len(), 2); + + let pre_index_explain = db + .explain_query("p", "app", query.clone(), QueryOptions::default()) + .await + .expect("explain without index"); + assert_eq!( + pre_index_explain.predicate_evaluation_path, + PredicateEvaluationPath::FullScanFilter + ); + assert!( + pre_index_explain.selected_indexes.is_empty(), + "non-index path should not report selected indexes" + ); + + db.commit(Mutation::Ddl(DdlOperation::CreateIndex { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "sessions".into(), + index_name: "by_user_id".into(), + if_not_exists: false, + columns: vec!["user_id".into()], + index_type: IndexType::BTree, + partial_filter: None, + })) + .await + .expect("user_id index"); + + let indexed_result = db + .query("p", "app", query.clone()) + .await + .expect("eq query with index"); + assert_eq!(indexed_result.rows.len(), 2); + assert!( + indexed_result.rows_examined <= pre_index_result.rows_examined, + "indexed equality should not examine more rows" + ); + + let indexed_explain = db + .explain_query("p", "app", query, QueryOptions::default()) + .await + .expect("explain with index"); + assert_eq!( + indexed_explain.predicate_evaluation_path, + PredicateEvaluationPath::SecondaryIndexLookup + ); + assert!( + indexed_explain + .selected_indexes + .contains(&"by_user_id".to_string()), + "explain should report selected secondary index" + ); + assert!( + indexed_explain + .plan_trace + .iter() + .any(|line| line.contains("by_user_id")), + "plan trace should include selected index name" + ); +} + +#[tokio::test] +async fn uuid_text_equality_parity_between_primary_key_and_secondary_index_paths() { + let dir = tempdir().expect("temp"); + let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open"); + db.create_project("p").await.expect("project"); + db.create_scope("p", "app").await.expect("scope"); + + create_table( + &db, + "p", + "app", + "users_pk_uuid", + vec![ + ColumnDef { + name: "user_uuid".into(), + col_type: ColumnType::Text, + nullable: false, + }, + ColumnDef { + name: "display_name".into(), + col_type: ColumnType::Text, + nullable: false, + }, + ], + vec!["user_uuid"], + ) + .await; + create_table( + &db, + "p", + "app", + "users_secondary_uuid", + vec![ + ColumnDef { + name: "id".into(), + col_type: ColumnType::Integer, + nullable: false, + }, + ColumnDef { + name: "user_uuid".into(), + col_type: ColumnType::Text, + nullable: false, + }, + ColumnDef { + name: "display_name".into(), + col_type: ColumnType::Text, + nullable: false, + }, + ], + vec!["id"], + ) + .await; + db.commit(Mutation::Ddl(DdlOperation::CreateIndex { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "users_secondary_uuid".into(), + index_name: "by_user_uuid".into(), + if_not_exists: false, + columns: vec!["user_uuid".into()], + index_type: IndexType::BTree, + partial_filter: None, + })) + .await + .expect("secondary uuid index"); + + let rows = [ + (1_i64, "8e1e917f-f8f8-4f06-bf38-3f2c37dcd857", "alice"), + (2, "6e0df6fd-5095-4e37-bf9d-1f6b5d6dfcb8", "bob"), + (3, "1b631635-766d-4207-aa53-f5367b9bf13a", "carol"), + ]; + for (_id, user_uuid, display_name) in rows { + db.commit(Mutation::Upsert { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "users_pk_uuid".into(), + primary_key: vec![Value::Text(user_uuid.to_string().into())], + row: Row::from_values(vec![ + Value::Text(user_uuid.to_string().into()), + Value::Text(display_name.into()), + ]), + }) + .await + .expect("seed pk uuid"); + } + + for (id, user_uuid, display_name) in rows { + db.commit(Mutation::Upsert { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "users_secondary_uuid".into(), + primary_key: vec![Value::Integer(id)], + row: Row::from_values(vec![ + Value::Integer(id), + Value::Text(user_uuid.into()), + Value::Text(display_name.into()), + ]), + }) + .await + .expect("seed secondary uuid"); + } + + for (_id, user_uuid, _display_name) in rows { + let pk_query = Query::select(&["display_name"]) + .from("users_pk_uuid") + .where_(Expr::Eq( + "user_uuid".into(), + Value::Text(user_uuid.to_string().into()), + )); + let secondary_query = Query::select(&["display_name"]) + .from("users_secondary_uuid") + .where_(Expr::Eq( + "user_uuid".into(), + Value::Text(user_uuid.to_string().into()), + )); + + let pk_result = db + .query("p", "app", pk_query.clone()) + .await + .expect("pk query"); + let secondary_result = db + .query("p", "app", secondary_query.clone()) + .await + .expect("secondary query"); + assert_eq!(pk_result.rows.len(), 1); + assert_eq!(secondary_result.rows.len(), 1); + assert_eq!(pk_result.rows[0].values, secondary_result.rows[0].values); + } + + let pk_query = Query::select(&["display_name"]) + .from("users_pk_uuid") + .where_(Expr::Eq( + "user_uuid".into(), + Value::Text("8e1e917f-f8f8-4f06-bf38-3f2c37dcd857".into()), + )); + let secondary_query = Query::select(&["display_name"]) + .from("users_secondary_uuid") + .where_(Expr::Eq( + "user_uuid".into(), + Value::Text("8e1e917f-f8f8-4f06-bf38-3f2c37dcd857".into()), + )); + + let pk_explain = db + .explain_query("p", "app", pk_query, QueryOptions::default()) + .await + .expect("pk explain"); + assert_eq!( + pk_explain.predicate_evaluation_path, + PredicateEvaluationPath::PrimaryKeyEqLookup + ); + + let secondary_explain = db + .explain_query("p", "app", secondary_query, QueryOptions::default()) + .await + .expect("secondary explain"); + assert_eq!( + secondary_explain.predicate_evaluation_path, + PredicateEvaluationPath::SecondaryIndexLookup + ); + assert!( + secondary_explain + .selected_indexes + .contains(&"by_user_uuid".to_string()) + ); +} + +#[tokio::test] +async fn u8_column_type_supports_write_read_and_indexed_equality() { + let dir = tempdir().expect("temp"); + let db = AedbInstance::open(AedbConfig::default(), dir.path()).expect("open"); + db.create_project("p").await.expect("project"); + db.create_scope("p", "app").await.expect("scope"); + + create_table( + &db, + "p", + "app", + "levels", + vec![ + ColumnDef { + name: "id".into(), + col_type: ColumnType::Integer, + nullable: false, + }, + ColumnDef { + name: "level".into(), + col_type: ColumnType::U8, + nullable: false, + }, + ], + vec!["id"], + ) + .await; + + db.commit(Mutation::Upsert { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "levels".into(), + primary_key: vec![Value::Integer(1)], + row: Row::from_values(vec![Value::Integer(1), Value::U8(7)]), + }) + .await + .expect("seed u8 row"); + + let without_index = db + .query( + "p", + "app", + Query::select(&["id", "level"]) + .from("levels") + .where_(Expr::Eq("level".into(), Value::Integer(7))), + ) + .await + .expect("u8 equality via integer literal"); + assert_eq!(without_index.rows.len(), 1); + assert_eq!(without_index.rows[0].values[1], Value::U8(7)); + + db.commit(Mutation::Ddl(DdlOperation::CreateIndex { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "levels".into(), + index_name: "by_level".into(), + if_not_exists: false, + columns: vec!["level".into()], + index_type: IndexType::BTree, + partial_filter: None, + })) + .await + .expect("create u8 index"); + + let with_index = db + .query( + "p", + "app", + Query::select(&["id", "level"]) + .from("levels") + .where_(Expr::Eq("level".into(), Value::U8(7))), + ) + .await + .expect("u8 equality with index"); + assert_eq!(with_index.rows.len(), 1); + assert!(with_index.rows_examined <= without_index.rows_examined); + + let err = db + .commit(Mutation::Upsert { + project_id: "p".into(), + scope_id: "app".into(), + table_name: "levels".into(), + primary_key: vec![Value::Integer(2)], + row: Row::from_values(vec![Value::Integer(2), Value::Integer(8)]), + }) + .await + .expect_err("integer value should not satisfy U8 column type"); + assert!(matches!(err, AedbError::TypeMismatch { .. })); +} + #[tokio::test] async fn sql_transaction_plan_helpers_commit() { let dir = tempdir().expect("temp"); diff --git a/src/query/executor.rs b/src/query/executor.rs index 1bdee78..fc40333 100644 --- a/src/query/executor.rs +++ b/src/query/executor.rs @@ -17,6 +17,7 @@ use std::ops::Bound; type IndexBounds = (Bound, Bound); +#[derive(Debug, Clone)] enum IndexLookup { Range { column: String, bounds: IndexBounds }, MultiEq { column: String, values: Vec }, @@ -31,6 +32,13 @@ pub struct QueryResult { pub materialized_seq: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct AccessPathDiagnostics { + pub selected_indexes: Vec, + pub predicate_evaluation_path: crate::PredicateEvaluationPath, + pub plan_trace: Vec, +} + pub fn execute_query( snapshot: &KeyspaceSnapshot, catalog: &Catalog, @@ -113,15 +121,16 @@ pub fn execute_query_with_options( ); } - let table_key = (namespace_key(project_id, scope_id), query.table.clone()); + let (q_project, q_scope, q_table) = resolve_table_ref(project_id, scope_id, &query.table); + let table_key = (namespace_key(&q_project, &q_scope), q_table.clone()); let schema = catalog .tables .get(&table_key) .ok_or_else(|| QueryError::TableNotFound { - project_id: project_id.to_string(), - table: query.table.clone(), + project_id: q_project.clone(), + table: q_table.clone(), })?; - let table = snapshot.table(project_id, scope_id, &query.table); + let table = snapshot.table(&q_project, &q_scope, &q_table); let mut materialized_seq = None; validate_query(schema, &query)?; @@ -140,46 +149,41 @@ pub fn execute_query_with_options( } let estimated_rows: usize; - let row_source: Box + Send> = - if let Some(async_index) = &options.async_index { - let projection = snapshot - .async_index(project_id, scope_id, &query.table, async_index) - .ok_or_else(|| QueryError::InvalidQuery { - reason: "async index not found".into(), - })?; - materialized_seq = Some(projection.materialized_seq); - estimated_rows = projection.rows.len(); - let rows = projection.rows.clone(); - Box::new(rows.into_iter().map(|(_, row)| row)) - } else if let (Some(predicate), Some(table)) = (&query.predicate, table) { - let table_rows = table.rows.clone(); - let indexed_pks = indexed_pks_for_predicate( - catalog, - project_id, - scope_id, - &query.table, - table, - predicate, - )?; - match indexed_pks { - Some(pks) => { - estimated_rows = pks.len(); - Box::new( - pks.into_iter() - .filter_map(move |pk| table_rows.get(&pk).cloned()), - ) - } - None => { - estimated_rows = table.rows.len(); - let rows = table.rows.clone(); - Box::new(rows.into_iter().map(|(_, row)| row)) - } + let row_source: Box + Send> = if let Some(async_index) = + &options.async_index + { + let projection = snapshot + .async_index(project_id, scope_id, &query.table, async_index) + .ok_or_else(|| QueryError::InvalidQuery { + reason: "async index not found".into(), + })?; + materialized_seq = Some(projection.materialized_seq); + estimated_rows = projection.rows.len(); + let rows = projection.rows.clone(); + Box::new(rows.into_iter().map(|(_, row)| row)) + } else if let (Some(predicate), Some(table)) = (&query.predicate, table) { + let table_rows = table.rows.clone(); + let indexed_pks = + indexed_pks_for_predicate(catalog, &q_project, &q_scope, &q_table, table, predicate)?; + match indexed_pks { + Some(pks) => { + estimated_rows = pks.len(); + Box::new( + pks.into_iter() + .filter_map(move |pk| table_rows.get(&pk).cloned()), + ) } - } else { - let rows = table.map(|t| t.rows.clone()).unwrap_or_default(); - estimated_rows = rows.len(); - Box::new(rows.into_iter().map(|(_, row)| row)) - }; + None => { + estimated_rows = table.rows.len(); + let rows = table.rows.clone(); + Box::new(rows.into_iter().map(|(_, row)| row)) + } + } + } else { + let rows = table.map(|t| t.rows.clone()).unwrap_or_default(); + estimated_rows = rows.len(); + Box::new(rows.into_iter().map(|(_, row)| row)) + }; if estimated_rows > max_scan_rows && query.limit.is_none() && options.cursor.is_none() { return Err(QueryError::ScanBoundExceeded { @@ -375,6 +379,135 @@ pub fn execute_query_with_options( }) } +pub(crate) fn explain_access_path_for_query( + snapshot: &KeyspaceSnapshot, + catalog: &Catalog, + project_id: &str, + scope_id: &str, + query: &Query, + options: &QueryOptions, +) -> Result { + if !query.joins.is_empty() { + let mut trace = Vec::new(); + trace.push("join query: predicate evaluation happens after join execution".to_string()); + if query.predicate.is_some() { + trace.push("post-join filter stage evaluates query predicate".to_string()); + } + return Ok(AccessPathDiagnostics { + selected_indexes: Vec::new(), + predicate_evaluation_path: crate::PredicateEvaluationPath::JoinExecution, + plan_trace: trace, + }); + } + + let mut selected_indexes = Vec::new(); + let mut trace = Vec::new(); + let mut predicate_evaluation_path = crate::PredicateEvaluationPath::None; + + let mut effective_options = options.clone(); + if effective_options.async_index.is_none() { + effective_options.async_index = query.use_index.clone(); + } + + if let Some(async_index) = &effective_options.async_index { + selected_indexes.push(async_index.clone()); + trace.push(format!( + "selected async index projection '{async_index}' as row source" + )); + predicate_evaluation_path = crate::PredicateEvaluationPath::AsyncIndexProjection; + if query.predicate.is_some() { + trace.push("query predicate is evaluated as filter on projected rows".to_string()); + } + return Ok(AccessPathDiagnostics { + selected_indexes, + predicate_evaluation_path, + plan_trace: trace, + }); + } + + let table_key = (namespace_key(project_id, scope_id), query.table.clone()); + let schema = catalog + .tables + .get(&table_key) + .ok_or_else(|| QueryError::TableNotFound { + project_id: project_id.to_string(), + table: query.table.clone(), + })?; + let table = snapshot.table(project_id, scope_id, &query.table); + + if let Some(predicate) = query.predicate.as_ref() { + if query.limit != Some(0) + && query.group_by.is_empty() + && query.aggregates.is_empty() + && query.having.is_none() + && query.order_by.is_empty() + && options.cursor.is_none() + && extract_primary_key_values(predicate, &schema.primary_key).is_some() + { + trace.push("primary-key equality predicate detected; using direct row lookup".into()); + return Ok(AccessPathDiagnostics { + selected_indexes, + predicate_evaluation_path: crate::PredicateEvaluationPath::PrimaryKeyEqLookup, + plan_trace: trace, + }); + } + + if let Some(table) = table { + let indexed = indexed_pks_for_predicate_with_trace( + catalog, + project_id, + scope_id, + &query.table, + table, + predicate, + )?; + if let Some(indexed) = indexed { + if !indexed.selected_indexes.is_empty() { + selected_indexes.extend(indexed.selected_indexes.clone()); + predicate_evaluation_path = + crate::PredicateEvaluationPath::SecondaryIndexLookup; + } else { + predicate_evaluation_path = crate::PredicateEvaluationPath::FullScanFilter; + } + trace.extend(indexed.plan_trace); + if matches!( + predicate_evaluation_path, + crate::PredicateEvaluationPath::FullScanFilter + ) { + trace.push( + "no matching secondary index; evaluating predicate during table scan" + .to_string(), + ); + } else { + trace.push( + "residual predicate is evaluated on rows returned by index lookup" + .to_string(), + ); + } + return Ok(AccessPathDiagnostics { + selected_indexes, + predicate_evaluation_path, + plan_trace: trace, + }); + } + } + + trace.push("predicate not indexable for current schema/index set".to_string()); + return Ok(AccessPathDiagnostics { + selected_indexes, + predicate_evaluation_path: crate::PredicateEvaluationPath::FullScanFilter, + plan_trace: trace, + }); + } + + trace.push("no predicate supplied; full table scan path".to_string()); + Ok(AccessPathDiagnostics { + selected_indexes, + predicate_evaluation_path, + plan_trace: trace, + }) +} + #[allow(clippy::too_many_arguments)] fn execute_join_query( snapshot: &KeyspaceSnapshot, @@ -950,17 +1083,21 @@ fn validate_expr_types( let value_compatible = |col_type: &ColumnType, value: &Value| -> bool { matches!(value, Value::Null) || match col_type { + ColumnType::U8 => matches!( + value, + Value::U8(_) | Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) + ), ColumnType::Integer => matches!( value, - Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) + Value::U8(_) | Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) ), ColumnType::Float => matches!( value, - Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) + Value::U8(_) | Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) ), ColumnType::Timestamp => matches!( value, - Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) + Value::U8(_) | Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) ), ColumnType::Text => matches!(value, Value::Text(_)), ColumnType::Boolean => matches!(value, Value::Boolean(_)), @@ -1108,28 +1245,87 @@ fn indexed_pks_for_predicate( table: &crate::storage::keyspace::TableData, predicate: &crate::query::plan::Expr, ) -> Result>, QueryError> { + Ok(indexed_pks_for_predicate_with_trace( + catalog, project_id, scope_id, table_name, table, predicate, + )? + .map(|result| result.pks)) +} + +#[derive(Debug, Clone)] +struct IndexLookupResult { + pks: Vec, + selected_indexes: Vec, + plan_trace: Vec, +} + +fn indexed_pks_for_predicate_with_trace( + catalog: &Catalog, + project_id: &str, + scope_id: &str, + table_name: &str, + table: &crate::storage::keyspace::TableData, + predicate: &crate::query::plan::Expr, +) -> Result, QueryError> { use crate::query::plan::Expr; match predicate { Expr::And(lhs, rhs) => { - let left = - indexed_pks_for_predicate(catalog, project_id, scope_id, table_name, table, lhs)?; - let right = - indexed_pks_for_predicate(catalog, project_id, scope_id, table_name, table, rhs)?; + let left = indexed_pks_for_predicate_with_trace( + catalog, project_id, scope_id, table_name, table, lhs, + )?; + let right = indexed_pks_for_predicate_with_trace( + catalog, project_id, scope_id, table_name, table, rhs, + )?; return Ok(match (left, right) { - (Some(left), Some(right)) => Some(intersect_pks(left, right)), - (Some(left), None) => Some(left), - (None, Some(right)) => Some(right), + (Some(left), Some(right)) => Some(IndexLookupResult { + pks: intersect_pks(left.pks, right.pks), + selected_indexes: merge_selected_indexes( + left.selected_indexes, + right.selected_indexes, + ), + plan_trace: merge_trace( + "AND predicate combines indexed candidates with intersection", + left.plan_trace, + right.plan_trace, + ), + }), + (Some(left), None) => Some(IndexLookupResult { + plan_trace: merge_trace_single( + "AND predicate uses indexed left side; right side will be residual filter", + left.plan_trace, + ), + ..left + }), + (None, Some(right)) => Some(IndexLookupResult { + plan_trace: merge_trace_single( + "AND predicate uses indexed right side; left side will be residual filter", + right.plan_trace, + ), + ..right + }), (None, None) => None, }); } Expr::Or(lhs, rhs) => { - let left = - indexed_pks_for_predicate(catalog, project_id, scope_id, table_name, table, lhs)?; - let right = - indexed_pks_for_predicate(catalog, project_id, scope_id, table_name, table, rhs)?; + let left = indexed_pks_for_predicate_with_trace( + catalog, project_id, scope_id, table_name, table, lhs, + )?; + let right = indexed_pks_for_predicate_with_trace( + catalog, project_id, scope_id, table_name, table, rhs, + )?; return Ok(match (left, right) { - (Some(left), Some(right)) => Some(union_pks(left, right)), + (Some(left), Some(right)) => Some(IndexLookupResult { + pks: union_pks(left.pks, right.pks), + selected_indexes: merge_selected_indexes( + left.selected_indexes, + right.selected_indexes, + ), + plan_trace: merge_trace( + "OR predicate combines indexed candidates with union", + left.plan_trace, + right.plan_trace, + ), + }), _ => None, }); } @@ -1196,7 +1392,13 @@ fn indexed_pks_for_predicate( } else { index.scan_prefix(&encoded) }; - return Ok(Some(pks)); + return Ok(Some(IndexLookupResult { + pks, + selected_indexes: vec![idx_name.clone()], + plan_trace: vec![format!( + "selected composite index '{idx_name}' with leftmost prefix columns={prefix_cols}" + )], + })); }; let column = match &lookup { IndexLookup::Range { column, .. } => column, @@ -1229,14 +1431,45 @@ fn indexed_pks_for_predicate( return Ok(None); }; - let pks = match lookup { + let pks = match lookup.clone() { IndexLookup::Range { bounds, .. } => index.scan_range(bounds.0, bounds.1), IndexLookup::MultiEq { values, .. } => values .into_iter() .flat_map(|v| index.scan_eq(&EncodedKey::from_values(&[v]))) .collect(), }; - Ok(Some(pks)) + Ok(Some(IndexLookupResult { + pks, + selected_indexes: vec![index_name.clone()], + plan_trace: vec![format!( + "selected single-column index '{index_name}' for predicate on '{column}'" + )], + })) +} + +fn merge_selected_indexes(left: Vec, right: Vec) -> Vec { + let mut out = Vec::with_capacity(left.len() + right.len()); + for name in left.into_iter().chain(right) { + if !out.contains(&name) { + out.push(name); + } + } + out +} + +fn merge_trace(header: &str, mut left: Vec, right: Vec) -> Vec { + let mut out = Vec::with_capacity(1 + left.len() + right.len()); + out.push(header.to_string()); + out.append(&mut left); + out.extend(right); + out +} + +fn merge_trace_single(header: &str, mut trace: Vec) -> Vec { + let mut out = Vec::with_capacity(1 + trace.len()); + out.push(header.to_string()); + out.append(&mut trace); + out } fn intersect_pks(left: Vec, right: Vec) -> Vec { diff --git a/src/query/operators.rs b/src/query/operators.rs index ed78877..e38be44 100644 --- a/src/query/operators.rs +++ b/src/query/operators.rs @@ -431,10 +431,13 @@ impl AggregateState { *v = v.saturating_add(1); } (AggregateState::Sum(sum), Aggregate::Sum(_)) => { - if let Some(idx) = col_idx - && let Value::Integer(v) = &row.values[idx] - { - *sum = sum.saturating_add(*v); + if let Some(idx) = col_idx { + let v = match &row.values[idx] { + Value::Integer(v) => *v, + Value::U8(v) => *v as i64, + _ => 0, + }; + *sum = sum.saturating_add(v); } } (AggregateState::Min(state), Aggregate::Min(_)) => { @@ -454,11 +457,16 @@ impl AggregateState { } } (AggregateState::Avg { total, count }, Aggregate::Avg(_)) => { - if let Some(idx) = col_idx - && let Value::Integer(v) = &row.values[idx] - { - *total = total.saturating_add(*v); - *count = count.saturating_add(1); + if let Some(idx) = col_idx { + let maybe_v = match &row.values[idx] { + Value::Integer(v) => Some(*v), + Value::U8(v) => Some(*v as i64), + _ => None, + }; + if let Some(v) = maybe_v { + *total = total.saturating_add(v); + *count = count.saturating_add(1); + } } } _ => {} @@ -645,6 +653,13 @@ fn like_match(value: &str, pattern: &str) -> bool { fn compare_values(left: &Value, right: &Value) -> Option { match (left, right) { (Value::Null, _) | (_, Value::Null) => None, + (Value::U8(a), Value::U8(b)) => a.partial_cmp(b), + (Value::U8(a), Value::Integer(b)) => (*a as i64).partial_cmp(b), + (Value::Integer(a), Value::U8(b)) => a.partial_cmp(&(*b as i64)), + (Value::U8(a), Value::Float(b)) => (*a as f64).partial_cmp(b), + (Value::Float(a), Value::U8(b)) => a.partial_cmp(&(*b as f64)), + (Value::U8(a), Value::Timestamp(b)) => (*a as i64).partial_cmp(b), + (Value::Timestamp(a), Value::U8(b)) => a.partial_cmp(&(*b as i64)), (Value::Integer(a), Value::Float(b)) => (*a as f64).partial_cmp(b), (Value::Float(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)), (Value::Timestamp(a), Value::Integer(b)) => a.partial_cmp(b), diff --git a/src/query/plan.rs b/src/query/plan.rs index ad35572..182b0ea 100644 --- a/src/query/plan.rs +++ b/src/query/plan.rs @@ -303,6 +303,12 @@ impl IntoQueryValue for i32 { } } +impl IntoQueryValue for u8 { + fn into_query_value(self) -> Value { + Value::U8(self) + } +} + impl IntoQueryValue for u64 { fn into_query_value(self) -> Value { Value::Integer(self as i64) diff --git a/src/repository.rs b/src/repository.rs index 1296c7b..476464d 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -468,6 +468,7 @@ pub fn u256_at(row: &Row, index: usize, column: &str) -> Result<[u8; 32], RowDec fn value_kind(value: &Value) -> &'static str { match value { Value::Text(_) => "Text", + Value::U8(_) => "U8", Value::Integer(_) => "Integer", Value::Float(_) => "Float", Value::Boolean(_) => "Boolean", diff --git a/src/storage/encoded_key.rs b/src/storage/encoded_key.rs index c63a809..005b38e 100644 --- a/src/storage/encoded_key.rs +++ b/src/storage/encoded_key.rs @@ -45,6 +45,10 @@ pub fn prefix_successor(prefix: &EncodedKey) -> Option { fn encode_value(v: &Value, out: &mut SmallVec<[u8; 64]>) { match v { + Value::U8(i) => { + out.push(0x0F); + out.push(*i); + } Value::Integer(i) => { out.push(0x10); let shifted = (*i as u64) ^ 0x8000_0000_0000_0000; diff --git a/src/storage/keyspace.rs b/src/storage/keyspace.rs index f84d59e..cd4a36a 100644 --- a/src/storage/keyspace.rs +++ b/src/storage/keyspace.rs @@ -833,6 +833,7 @@ fn estimate_row_bytes(row: &Row) -> usize { fn estimate_value_bytes(v: &Value) -> usize { match v { Value::Text(s) | Value::Json(s) => s.len(), + Value::U8(_) => 1, Value::Integer(_) | Value::Float(_) | Value::Timestamp(_) => 8, Value::Boolean(_) => 1, Value::U256(_) | Value::I256(_) => 32,