diff --git a/.claude/settings.json b/.claude/settings.json index 19f69c75..13c7ac20 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -22,6 +22,9 @@ ] } ] + }, + "env": { + "CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS": "1" }, "permissions": { "allow": [ diff --git a/CLAUDE.md b/CLAUDE.md index ee85c44a..abc24dd0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,22 +10,6 @@ A Rust CLI tool for SQL templating and execution, similar to dbt. - duckdb-rs (database, bundled feature) - tokio (async runtime) -## Project Structure -- `crates/ff-cli`: Main binary, subcommands in `commands/` module -- `crates/ff-core`: Shared types, config, DAG logic -- `crates/ff-jinja`: Template rendering (config, var functions only) -- `crates/ff-sql`: SQL parsing, table extraction from AST -- `crates/ff-db`: Database trait + DuckDB implementation -- `crates/ff-test`: Schema test generation (unique, not_null) - -## Key Commands -```bash -make build # Build all crates -make test # Run all tests -make lint # Run clippy + fmt check -make ci # Full CI check locally -cargo run -p ff-cli -- -``` ## Architecture Notes - Dependencies extracted from SQL AST via `visit_relations`, NOT Jinja functions @@ -46,47 +30,9 @@ YAML determines the resource type: | `function` | `.sql` | User-defined SQL function | | `python` | `.py` | Python transformation (planned) | -### Unified node_paths layout (preferred) -```yaml -# featherflow.yml -node_paths: ["nodes"] -``` -``` -nodes/ - stg_orders/ - stg_orders.sql - stg_orders.yml # kind: sql - raw_orders/ - raw_orders.csv - raw_orders.yml # kind: seed - raw_ecommerce/ - raw_ecommerce.yml # kind: source - cents_to_dollars/ - cents_to_dollars.sql - cents_to_dollars.yml # kind: function -``` - -### Legacy per-type layout (still supported) -```yaml -# featherflow.yml -model_paths: ["models"] -source_paths: ["sources"] -function_paths: ["functions"] -``` - Legacy kind values (`model`, `sources`, `functions`) are normalised to their modern equivalents (`sql`, `source`, `function`) automatically. ## Testing - All tests: `make test` -- Unit tests only: `make test-unit` -- Integration tests: `make test-integration` -- Verbose output: `make test-verbose` -- Test fixtures in `tests/fixtures/sample_project/` -- Seed data in `testdata/seeds/` - -## Code Style -- Use `?` for error propagation, add `.context()` at boundaries -- Prefer `impl Trait` over `Box` where possible -- All public items need rustdoc comments -- No unwrap() except in tests +- End to end test harness: `make ci-e2e` \ No newline at end of file diff --git a/crates/ff-analysis/src/datafusion_bridge/lineage.rs b/crates/ff-analysis/src/datafusion_bridge/lineage.rs index 43ece97e..17be4d41 100644 --- a/crates/ff-analysis/src/datafusion_bridge/lineage.rs +++ b/crates/ff-analysis/src/datafusion_bridge/lineage.rs @@ -4,7 +4,7 @@ //! its source columns — whether it's a direct copy, a transformation, //! or merely inspected (e.g. in a WHERE clause). -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use datafusion_expr::{Expr, LogicalPlan}; use ff_core::ModelName; @@ -182,6 +182,35 @@ fn collect_column_refs(expr: &Expr, refs: &mut Vec<(String, String)>) { }); } +/// Extract alias → real table name mappings from a LogicalPlan. +/// +/// Walks the plan tree looking for `SubqueryAlias` nodes wrapping `TableScan` +/// nodes, which represent `FROM table AS alias` patterns. Returns a map from +/// alias name to real table name. +pub fn extract_alias_map(plan: &LogicalPlan) -> HashMap { + let mut aliases = HashMap::new(); + collect_aliases(plan, &mut aliases); + aliases +} + +fn collect_aliases(plan: &LogicalPlan, aliases: &mut HashMap) { + match plan { + LogicalPlan::SubqueryAlias(sa) => { + let alias_name = sa.alias.table().to_string(); + if let LogicalPlan::TableScan(scan) = sa.input.as_ref() { + aliases.insert(alias_name, scan.table_name.table().to_string()); + } else { + collect_aliases(sa.input.as_ref(), aliases); + } + } + _ => { + for input in plan.inputs() { + collect_aliases(input, aliases); + } + } + } +} + /// Deduplicate lineage edges, keeping the first occurrence per (output, source) pair pub fn deduplicate_edges(edges: &[ColumnLineageEdge]) -> Vec { let mut seen: HashSet<(&str, &str, &str)> = HashSet::with_capacity(edges.len()); diff --git a/crates/ff-analysis/src/lib.rs b/crates/ff-analysis/src/lib.rs index ba345df3..670529e1 100644 --- a/crates/ff-analysis/src/lib.rs +++ b/crates/ff-analysis/src/lib.rs @@ -26,8 +26,8 @@ pub use types::{parse_sql_type, FloatBitWidth, IntBitWidth, Nullability, SqlType // DataFusion bridge re-exports pub use datafusion_bridge::lineage::{ - deduplicate_edges, extract_column_lineage as extract_plan_column_lineage, ColumnLineageEdge, - LineageKind, ModelColumnLineage, + deduplicate_edges, extract_alias_map, extract_column_lineage as extract_plan_column_lineage, + ColumnLineageEdge, LineageKind, ModelColumnLineage, }; pub use datafusion_bridge::planner::sql_to_plan; pub use datafusion_bridge::propagation::{ diff --git a/crates/ff-analysis/src/pass/mod.rs b/crates/ff-analysis/src/pass/mod.rs index 9147e9f8..106f2eb1 100644 --- a/crates/ff-analysis/src/pass/mod.rs +++ b/crates/ff-analysis/src/pass/mod.rs @@ -2,6 +2,7 @@ pub(crate) mod expr_utils; pub(crate) mod plan_cross_model; +pub(crate) mod plan_description_drift; pub(crate) mod plan_join_keys; pub(crate) mod plan_nullability; pub mod plan_pass; @@ -50,6 +51,12 @@ pub enum DiagnosticCode { A040, /// A041: Cross-model nullability mismatch A041, + /// A050: Copy/Rename column with missing description — suggest inheriting from upstream + A050, + /// A051: Copy/Rename column with modified description — potential documentation drift + A051, + /// A052: Transform column with missing description — needs new documentation + A052, } impl std::fmt::Display for DiagnosticCode { @@ -77,6 +84,9 @@ impl std::str::FromStr for DiagnosticCode { "A033" => Ok(DiagnosticCode::A033), "A040" => Ok(DiagnosticCode::A040), "A041" => Ok(DiagnosticCode::A041), + "A050" => Ok(DiagnosticCode::A050), + "A051" => Ok(DiagnosticCode::A051), + "A052" => Ok(DiagnosticCode::A052), _ => Err(format!("unknown diagnostic code: {s}")), } } diff --git a/crates/ff-analysis/src/pass/plan_description_drift.rs b/crates/ff-analysis/src/pass/plan_description_drift.rs new file mode 100644 index 00000000..07c5ad56 --- /dev/null +++ b/crates/ff-analysis/src/pass/plan_description_drift.rs @@ -0,0 +1,151 @@ +//! Description drift detection pass (A050-A052) +//! +//! Checks column-level lineage edges for documentation drift: +//! - A050: Copy/Rename column with missing description — suggest inheriting from upstream +//! - A051: Copy/Rename column with modified description — potential drift +//! - A052: Transform column with missing description — needs new documentation + +use std::collections::HashMap; + +use ff_core::ModelName; + +use crate::context::AnalysisContext; +use crate::datafusion_bridge::propagation::ModelPlanResult; + +use super::plan_pass::DagPlanPass; +use super::{Diagnostic, DiagnosticCode, Severity}; + +/// DAG-level pass that checks description propagation across lineage edges. +pub struct PlanDescriptionDrift; + +impl DagPlanPass for PlanDescriptionDrift { + fn name(&self) -> &'static str { + "description_drift" + } + + fn description(&self) -> &'static str { + "Detect missing or drifted column descriptions across lineage edges" + } + + fn run_project( + &self, + _models: &HashMap, + ctx: &AnalysisContext, + ) -> Vec { + let mut diagnostics = Vec::new(); + let lineage = ctx.lineage(); + let project = ctx.project(); + + // Build description lookup from project schemas + let desc_lookup = build_project_descriptions(project); + + for edge in &lineage.edges { + let src_desc = desc_lookup + .get(&edge.source_model) + .and_then(|cols| cols.get(&edge.source_column.to_lowercase())); + let tgt_desc = desc_lookup + .get(&edge.target_model) + .and_then(|cols| cols.get(&edge.target_column.to_lowercase())); + + // Only check edges targeting models (skip edges targeting seeds/sources) + if !project.models.contains_key(edge.target_model.as_str()) { + continue; + } + + match (edge.is_direct, src_desc, tgt_desc) { + // Copy/Rename with missing target description + (true, Some(_src), None) => { + diagnostics.push(Diagnostic { + code: DiagnosticCode::A050, + severity: Severity::Warning, + message: format!( + "Column '{}' is a direct pass-through from '{}.{}' but has no description — consider inheriting from upstream", + edge.target_column, edge.source_model, edge.source_column + ), + model: ModelName::new(&edge.target_model), + column: Some(edge.target_column.clone()), + hint: Some(format!( + "Add a description to '{}' in the YAML schema, or copy it from '{}.{}'", + edge.target_column, edge.source_model, edge.source_column + )), + pass_name: "description_drift".into(), + }); + } + // Copy/Rename with modified description + (true, Some(src), Some(tgt)) if src != tgt => { + diagnostics.push(Diagnostic { + code: DiagnosticCode::A051, + severity: Severity::Info, + message: format!( + "Column '{}' is a direct pass-through from '{}.{}' but has a different description — verify this is intentional", + edge.target_column, edge.source_model, edge.source_column + ), + model: ModelName::new(&edge.target_model), + column: Some(edge.target_column.clone()), + hint: None, + pass_name: "description_drift".into(), + }); + } + // Transform with missing target description + (false, _, None) => { + diagnostics.push(Diagnostic { + code: DiagnosticCode::A052, + severity: Severity::Warning, + message: format!( + "Column '{}' is a transformation but has no description — consider documenting it", + edge.target_column, + ), + model: ModelName::new(&edge.target_model), + column: Some(edge.target_column.clone()), + hint: Some(format!( + "Add a description to '{}' in the YAML schema", + edge.target_column + )), + pass_name: "description_drift".into(), + }); + } + _ => {} + } + } + + diagnostics + } +} + +/// Build a lookup of model_name -> { column_name_lowercase -> description } +/// from model YAML schemas and source definitions. +fn build_project_descriptions( + project: &ff_core::Project, +) -> HashMap> { + let mut lookup: HashMap> = HashMap::new(); + + for (name, model) in &project.models { + if let Some(schema) = &model.schema { + let mut col_descs = HashMap::new(); + for col in &schema.columns { + if let Some(ref desc) = col.description { + col_descs.insert(col.name.to_lowercase(), desc.clone()); + } + } + if !col_descs.is_empty() { + lookup.insert(name.to_string(), col_descs); + } + } + } + + for source_file in &project.sources { + for table in &source_file.tables { + let mut col_descs = HashMap::new(); + for col in &table.columns { + if let Some(ref desc) = col.description { + col_descs.insert(col.name.to_lowercase(), desc.clone()); + } + } + if !col_descs.is_empty() { + lookup.insert(table.name.clone(), col_descs); + } + } + } + + lookup +} diff --git a/crates/ff-analysis/src/pass/plan_pass.rs b/crates/ff-analysis/src/pass/plan_pass.rs index e1e2f6fe..b1f37c7c 100644 --- a/crates/ff-analysis/src/pass/plan_pass.rs +++ b/crates/ff-analysis/src/pass/plan_pass.rs @@ -63,6 +63,7 @@ impl PlanPassManager { dag_passes: vec![ Box::new(super::plan_unused_columns::PlanUnusedColumns), Box::new(super::plan_cross_model::CrossModelConsistency), + Box::new(super::plan_description_drift::PlanDescriptionDrift), ], } } diff --git a/crates/ff-cli/src/commands/lineage.rs b/crates/ff-cli/src/commands/lineage.rs index 81c4e003..97c649dc 100644 --- a/crates/ff-cli/src/commands/lineage.rs +++ b/crates/ff-cli/src/commands/lineage.rs @@ -1,11 +1,19 @@ //! Lineage command implementation — column-level lineage across models +//! +//! Uses DataFusion LogicalPlan as primary lineage engine, with AST fallback +//! for models that fail DataFusion planning. use anyhow::{Context, Result}; -use ff_sql::{extract_column_lineage, ProjectLineage, SqlParser}; -use std::collections::HashSet; +use ff_core::dag::ModelDag; +use ff_core::SchemaRegistry; +use ff_sql::{ + extract_column_lineage, extract_dependencies, ExprType, LineageEdge, ModelLineage, + ProjectLineage, SqlParser, +}; +use std::collections::{HashMap, HashSet}; use crate::cli::{GlobalArgs, LineageArgs, LineageDirection, LineageOutput}; -use crate::commands::common::{self, load_project}; +use crate::commands::common::{self, build_external_tables_lookup, load_project}; /// Execute the lineage command pub(crate) async fn execute(args: &LineageArgs, global: &GlobalArgs) -> Result<()> { @@ -13,13 +21,22 @@ pub(crate) async fn execute(args: &LineageArgs, global: &GlobalArgs) -> Result<( let parser = SqlParser::from_dialect_name(&project.config.dialect.to_string()) .context("Invalid SQL dialect")?; - let jinja = common::build_jinja_env(&project); + let jinja = common::build_jinja_env_with_context(&project, global.target.as_deref(), false); - let known_models: HashSet<&str> = project.models.keys().map(|k| k.as_str()).collect(); + let mut known_models: HashSet<&str> = project.models.keys().map(|k| k.as_str()).collect(); + for seed in &project.seeds { + known_models.insert(seed.name.as_str()); + } + for source_file in &project.sources { + for table in &source_file.tables { + known_models.insert(&table.name); + } + } - let mut project_lineage = ProjectLineage::new(); + // Phase 1: Render SQL, extract dependencies, build DAG + let mut dep_map: HashMap> = HashMap::with_capacity(project.models.len()); + let mut rendered_sql: HashMap = HashMap::with_capacity(project.models.len()); - // Compile each model and extract lineage for (name, model) in &project.models { let rendered = match jinja.render(&model.raw_sql) { Ok(sql) => sql, @@ -30,6 +47,7 @@ pub(crate) async fn execute(args: &LineageArgs, global: &GlobalArgs) -> Result<( name, e ); } + dep_map.insert(name.to_string(), vec![]); continue; } }; @@ -43,13 +61,95 @@ pub(crate) async fn execute(args: &LineageArgs, global: &GlobalArgs) -> Result<( name, e ); } + dep_map.insert(name.to_string(), vec![]); continue; } }; - if let Some(stmt) = stmts.first() { - if let Some(lineage) = extract_column_lineage(stmt, name) { - project_lineage.add_model_lineage(lineage); + let raw_deps = extract_dependencies(&stmts); + let model_deps: Vec = raw_deps + .into_iter() + .filter(|d| known_models.contains(d.as_str())) + .collect(); + dep_map.insert(name.to_string(), model_deps); + rendered_sql.insert(name.to_string(), rendered); + } + + // Phase 2: Try DataFusion static analysis pipeline for richer lineage + let external_tables = build_external_tables_lookup(&project); + let topo_order: Vec = if let Ok(dag) = ModelDag::build(&dep_map) { + dag.topological_order().unwrap_or_default() + } else { + // Fallback to model key order if DAG can't be built + project.models.keys().map(|k| k.to_string()).collect() + }; + + let sa_output = common::run_static_analysis_pipeline( + &project, + &rendered_sql, + &topo_order, + &external_tables, + ); + + // Phase 3: Build project lineage — DataFusion primary, AST fallback + let mut project_lineage = ProjectLineage::new(); + + match sa_output { + Ok(output) => { + let propagation = &output.result; + let mut datafusion_models: HashSet = HashSet::new(); + + // DataFusion path: convert DataFusion lineage to ff-sql types + for (model_name, plan_result) in &propagation.model_plans { + let df_lineage = + ff_analysis::extract_plan_column_lineage(model_name.clone(), &plan_result.plan); + let alias_map = ff_analysis::extract_alias_map(&plan_result.plan); + let ast_lineage = + bridge_datafusion_lineage(&df_lineage, model_name.as_str(), &alias_map); + project_lineage.add_model_lineage(ast_lineage); + datafusion_models.insert(model_name.to_string()); + } + + if global.verbose && !propagation.failures.is_empty() { + for (model, err) in &propagation.failures { + eprintln!("[verbose] DataFusion fallback for '{}': {}", model, err); + } + } + + // AST fallback for models that failed DataFusion planning + for name in project.models.keys() { + if datafusion_models.contains(name.as_str()) { + continue; + } + if let Some(sql) = rendered_sql.get(name.as_str()) { + if let Ok(stmts) = parser.parse(sql) { + if let Some(stmt) = stmts.first() { + if let Some(lineage) = extract_column_lineage(stmt, name) { + project_lineage.add_model_lineage(lineage); + } + } + } + } + } + } + Err(e) => { + // Full AST fallback if the pipeline fails entirely + if global.verbose { + eprintln!( + "[verbose] Static analysis pipeline failed: {}, using AST fallback", + e + ); + } + for name in project.models.keys() { + if let Some(sql) = rendered_sql.get(name.as_str()) { + if let Ok(stmts) = parser.parse(sql) { + if let Some(stmt) = stmts.first() { + if let Some(lineage) = extract_column_lineage(stmt, name) { + project_lineage.add_model_lineage(lineage); + } + } + } + } } } } @@ -59,6 +159,11 @@ pub(crate) async fn execute(args: &LineageArgs, global: &GlobalArgs) -> Result<( let classification_lookup = ff_core::classification::build_classification_lookup(&project); project_lineage.propagate_classifications(&classification_lookup); + // Compute description status from schema registry + let registry = SchemaRegistry::from_project(&project); + let desc_lookup = build_description_lookup(®istry, &project); + project_lineage.compute_description_status(&desc_lookup); + // Apply classification filter if specified if let Some(ref cls) = args.classification { project_lineage @@ -75,6 +180,114 @@ pub(crate) async fn execute(args: &LineageArgs, global: &GlobalArgs) -> Result<( Ok(()) } +/// Bridge DataFusion `ModelColumnLineage` to ff-sql `ModelLineage`. +/// +/// Converts DataFusion's flat edge list into the per-column structure expected +/// by the ff-sql `ProjectLineage::resolve_edges()` method. Resolves table +/// aliases (e.g. `c` → `stg_customers`) using the LogicalPlan so that +/// `resolve_edges()` can match source tables to known model names. +fn bridge_datafusion_lineage( + df: &ff_analysis::ModelColumnLineage, + model_name: &str, + alias_map: &HashMap, +) -> ModelLineage { + use ff_sql::ColumnRef; + + let mut lineage = ModelLineage { + model_name: model_name.to_string(), + columns: Vec::new(), + inspect_columns: Vec::new(), + table_aliases: alias_map.clone(), // alias → real table name + source_tables: HashSet::new(), + }; + + // Helper: resolve a source table name through the alias map + let resolve_table = |name: &str| -> String { + alias_map + .get(name) + .cloned() + .unwrap_or_else(|| name.to_string()) + }; + + // Group edges by output_column + let mut column_groups: HashMap<&str, Vec<&ff_analysis::ColumnLineageEdge>> = HashMap::new(); + for edge in &df.edges { + column_groups + .entry(&edge.output_column) + .or_default() + .push(edge); + } + + for (output_col, edges) in &column_groups { + if output_col.is_empty() { + // Empty output_column = Inspect edges + for edge in edges { + if edge.kind == ff_analysis::LineageKind::Inspect { + let resolved = if edge.source_table.is_empty() { + String::new() + } else { + resolve_table(&edge.source_table) + }; + let col_ref = if resolved.is_empty() { + ColumnRef::simple(&edge.source_column) + } else { + ColumnRef::qualified(&resolved, &edge.source_column) + }; + lineage.inspect_columns.push(col_ref); + if !resolved.is_empty() { + lineage.source_tables.insert(resolved); + } + } + } + continue; + } + + let mut col_lineage = ff_sql::ColumnLineage { + output_column: output_col.to_string(), + source_columns: HashSet::new(), + is_direct: false, + expr_type: ExprType::Unknown, + }; + + let mut all_copy = true; + for edge in edges { + let resolved = if edge.source_table.is_empty() { + String::new() + } else { + resolve_table(&edge.source_table) + }; + let col_ref = if resolved.is_empty() { + ColumnRef::simple(&edge.source_column) + } else { + ColumnRef::qualified(&resolved, &edge.source_column) + }; + col_lineage.source_columns.insert(col_ref); + + if !resolved.is_empty() { + lineage.source_tables.insert(resolved); + } + + match edge.kind { + ff_analysis::LineageKind::Copy => {} + ff_analysis::LineageKind::Transform | ff_analysis::LineageKind::Inspect => { + all_copy = false; + } + } + } + + if all_copy { + col_lineage.is_direct = true; + col_lineage.expr_type = ExprType::Column; + } else { + col_lineage.expr_type = ExprType::Expression; + } + + lineage.columns.push(col_lineage); + } + + lineage +} + /// Print lineage as JSON fn print_json(lineage: &ProjectLineage, args: &LineageArgs) -> Result<()> { if let (Some(model), Some(column)) = (&args.node, &args.column) { @@ -119,30 +332,29 @@ fn print_table(lineage: &ProjectLineage, args: &LineageArgs) { return; } - let edges: Vec<&ff_sql::LineageEdge> = - if let (Some(model), Some(column)) = (&args.node, &args.column) { - match args.direction { - LineageDirection::Upstream => lineage.trace_column_recursive(model, column), - LineageDirection::Downstream => lineage.column_consumers_recursive(model, column), - LineageDirection::Both => { - let mut all = lineage.trace_column_recursive(model, column); - all.extend(lineage.column_consumers_recursive(model, column)); - all - } + let edges: Vec<&LineageEdge> = if let (Some(model), Some(column)) = (&args.node, &args.column) { + match args.direction { + LineageDirection::Upstream => lineage.trace_column_recursive(model, column), + LineageDirection::Downstream => lineage.column_consumers_recursive(model, column), + LineageDirection::Both => { + let mut all = lineage.trace_column_recursive(model, column); + all.extend(lineage.column_consumers_recursive(model, column)); + all } - } else if let Some(model) = &args.node { - lineage - .edges - .iter() - .filter(|e| match args.direction { - LineageDirection::Upstream => e.target_model == *model, - LineageDirection::Downstream => e.source_model == *model, - LineageDirection::Both => e.target_model == *model || e.source_model == *model, - }) - .collect() - } else { - lineage.edges.iter().collect() - }; + } + } else if let Some(model) = &args.node { + lineage + .edges + .iter() + .filter(|e| match args.direction { + LineageDirection::Upstream => e.target_model == *model, + LineageDirection::Downstream => e.source_model == *model, + LineageDirection::Both => e.target_model == *model || e.source_model == *model, + }) + .collect() + } else { + lineage.edges.iter().collect() + }; if edges.is_empty() { println!("No matching lineage edges found."); @@ -151,19 +363,26 @@ fn print_table(lineage: &ProjectLineage, args: &LineageArgs) { // Print header println!( - "{:<25} {:<20} {:<25} {:<20} {:<10} {:<12} TYPE", - "SOURCE MODEL", "SOURCE COLUMN", "TARGET MODEL", "TARGET COLUMN", "DIRECT?", "CLASS" + "{:<25} {:<20} {:<25} {:<20} {:<10} {:<12} {:<12} TYPE", + "SOURCE MODEL", + "SOURCE COLUMN", + "TARGET MODEL", + "TARGET COLUMN", + "KIND", + "DESC STATUS", + "CLASS", ); - println!("{}", "-".repeat(125)); + println!("{}", "-".repeat(145)); for edge in &edges { println!( - "{:<25} {:<20} {:<25} {:<20} {:<10} {:<12} {}", + "{:<25} {:<20} {:<25} {:<20} {:<10} {:<12} {:<12} {}", edge.source_model, edge.source_column, edge.target_model, edge.target_column, - if edge.is_direct { "yes" } else { "no" }, + edge.kind, + edge.description_status, edge.classification.as_deref().unwrap_or("-"), edge.expr_type, ); @@ -171,3 +390,46 @@ fn print_table(lineage: &ProjectLineage, args: &LineageArgs) { println!("\n{} lineage edge(s) found.", edges.len()); } + +/// Build a lookup of node_name -> { column_name_lowercase -> description } +/// from the schema registry for description status computation. +fn build_description_lookup( + registry: &SchemaRegistry, + project: &ff_core::Project, +) -> HashMap> { + let mut lookup: HashMap> = HashMap::new(); + + // Models + for name in project.models.keys() { + if let Some(columns) = registry.get_columns(name.as_str()) { + let mut col_descs = HashMap::new(); + for (col_key, info) in columns { + if let Some(ref desc) = info.description { + col_descs.insert(col_key.clone(), desc.clone()); + } + } + if !col_descs.is_empty() { + lookup.insert(name.to_string(), col_descs); + } + } + } + + // Sources + for source_file in &project.sources { + for table in &source_file.tables { + if let Some(columns) = registry.get_columns(&table.name) { + let mut col_descs = HashMap::new(); + for (col_key, info) in columns { + if let Some(ref desc) = info.description { + col_descs.insert(col_key.clone(), desc.clone()); + } + } + if !col_descs.is_empty() { + lookup.insert(table.name.clone(), col_descs); + } + } + } + } + + lookup +} diff --git a/crates/ff-cli/tests/integration_tests.rs b/crates/ff-cli/tests/integration_tests.rs index ede73d8d..5fb6f36b 100644 --- a/crates/ff-cli/tests/integration_tests.rs +++ b/crates/ff-cli/tests/integration_tests.rs @@ -2121,11 +2121,24 @@ fn test_analysis_sample_project_no_false_diagnostics() { errors ); + // Filter out description drift diagnostics (A050-A052) — these are expected + // info/warning-level diagnostics on projects without full description coverage + let non_drift: Vec<_> = diagnostics + .iter() + .filter(|d| { + !matches!( + d.code, + ff_analysis::DiagnosticCode::A050 + | ff_analysis::DiagnosticCode::A051 + | ff_analysis::DiagnosticCode::A052 + ) + }) + .collect(); assert!( - diagnostics.is_empty(), - "Expected zero diagnostics on sample_project, got {}:\n{:#?}", - diagnostics.len(), - diagnostics + non_drift.is_empty(), + "Expected zero non-drift diagnostics on sample_project, got {}:\n{:#?}", + non_drift.len(), + non_drift ); } @@ -2142,7 +2155,8 @@ fn test_analysis_pass_names() { assert!(names.contains(&"plan_join_keys")); assert!(names.contains(&"plan_unused_columns")); assert!(names.contains(&"cross_model_consistency")); - assert_eq!(names.len(), 5); + assert!(names.contains(&"description_drift")); + assert_eq!(names.len(), 6); } // ── Phase 1: Type Inference (A002, A004, A005) ───────────────────────── @@ -2625,6 +2639,19 @@ fn test_analysis_dag_ecommerce_all_plan() { fn test_analysis_dag_ecommerce_zero_diagnostics() { let pipeline = build_analysis_pipeline("tests/fixtures/sa_dag_pass_ecommerce"); let diags = run_all_passes(&pipeline); + // Filter out description drift diagnostics (A050-A052) — these are expected + // informational/warning diagnostics from the new description_drift pass + let diags: Vec<_> = diags + .into_iter() + .filter(|d| { + !matches!( + d.code, + ff_analysis::DiagnosticCode::A050 + | ff_analysis::DiagnosticCode::A051 + | ff_analysis::DiagnosticCode::A052 + ) + }) + .collect(); assert!( diags.is_empty(), "Ecommerce project should produce zero diagnostics, got {}:\n{:#?}", @@ -2691,6 +2718,19 @@ fn test_analysis_deep_expression_plans() { fn test_analysis_guard_clean_project_zero_diagnostics() { let pipeline = build_analysis_pipeline("tests/fixtures/sa_clean_project"); let diags = run_all_passes(&pipeline); + // Filter out description drift diagnostics (A050-A052) — these are expected + // informational/warning diagnostics from the new description_drift pass + let diags: Vec<_> = diags + .into_iter() + .filter(|d| { + !matches!( + d.code, + ff_analysis::DiagnosticCode::A050 + | ff_analysis::DiagnosticCode::A051 + | ff_analysis::DiagnosticCode::A052 + ) + }) + .collect(); assert!( diags.is_empty(), "Clean project should have zero diagnostics, got {}:\n{:#?}", @@ -3972,3 +4012,1003 @@ fn test_edge_expr_type_across_hops() { assert_eq!(from_stg.unwrap().expr_type, ExprType::Column); assert!(from_stg.unwrap().is_direct); } + +// ── Category G: CLI-based DataFusion Lineage Tests ────────────────────── +// +// These tests invoke `ff lineage --output json` as a subprocess to exercise +// the **real DataFusion bridge path** (alias resolution, plan walking, etc.) +// rather than the AST-only path used in build_analysis_pipeline above. +// This is the test harness that actually catches alias resolution bugs. + +fn ff_bin() -> String { + env!("CARGO_BIN_EXE_ff").to_string() +} + +/// Run `ff lineage --output json` and parse the full result +fn run_lineage_json() -> serde_json::Value { + let output = std::process::Command::new(ff_bin()) + .args([ + "lineage", + "--project-dir", + "tests/fixtures/sample_project", + "--output", + "json", + ]) + .output() + .expect("Failed to run ff lineage"); + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "ff lineage should succeed.\nstdout: {}\nstderr: {}", + stdout, + stderr + ); + + serde_json::from_str(stdout.trim()).unwrap_or_else(|e| { + panic!( + "JSON parse failed: {}\nraw: {}", + e, + &stdout[..stdout.len().min(500)] + ) + }) +} + +/// Run `ff lineage -n --column --output json` and return the edge array +fn run_lineage_column_json(model: &str, column: &str) -> Vec { + let output = std::process::Command::new(ff_bin()) + .args([ + "lineage", + "--project-dir", + "tests/fixtures/sample_project", + "-n", + model, + "--column", + column, + "--output", + "json", + ]) + .output() + .expect("Failed to run ff lineage"); + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + output.status.success(), + "ff lineage -n {} --column {} should succeed.\nstdout: {}\nstderr: {}", + model, + column, + stdout, + stderr + ); + + serde_json::from_str(stdout.trim()).unwrap_or_else(|e| { + panic!( + "JSON parse failed for {} {}: {}\nraw: {}", + model, + column, + e, + &stdout[..stdout.len().min(500)] + ) + }) +} + +/// Helper: extract edges array from full lineage JSON +fn get_edges(data: &serde_json::Value) -> &Vec { + data["edges"] + .as_array() + .expect("lineage JSON should have 'edges' array") +} + +/// Helper: find edges in an array matching source model/column -> target model/column +fn find_edge<'a>( + edges: &'a [serde_json::Value], + source_model: &str, + source_column: &str, + target_model: &str, + target_column: &str, +) -> Option<&'a serde_json::Value> { + edges.iter().find(|e| { + e["source_model"] == source_model + && e["source_column"] == source_column + && e["target_model"] == target_model + && e["target_column"] == target_column + }) +} + +/// G1: Total project lineage has substantial edges (basic sanity) +#[test] +fn test_cli_lineage_total_edges() { + let data = run_lineage_json(); + let edges = get_edges(&data); + assert!( + edges.len() >= 50, + "Expected at least 50 lineage edges, got {}", + edges.len() + ); +} + +// ── G2: Every raw_customers column traces to stg_customers ────────────── + +#[test] +fn test_cli_lineage_raw_customers_id() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_customers", "id", "stg_customers", "customer_id"); + assert!( + edge.is_some(), + "raw_customers.id -> stg_customers.customer_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_customers_name() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_customers", + "name", + "stg_customers", + "customer_name", + ); + assert!( + edge.is_some(), + "raw_customers.name -> stg_customers.customer_name edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_customers_email() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_customers", "email", "stg_customers", "email"); + assert!( + edge.is_some(), + "raw_customers.email -> stg_customers.email edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_customers_created_at() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_customers", + "created_at", + "stg_customers", + "signup_date", + ); + assert!( + edge.is_some(), + "raw_customers.created_at -> stg_customers.signup_date edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_customers_tier() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_customers", + "tier", + "stg_customers", + "customer_tier", + ); + assert!( + edge.is_some(), + "raw_customers.tier -> stg_customers.customer_tier edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +// ── G3: Every raw_orders column traces to stg_orders ──────────────────── + +#[test] +fn test_cli_lineage_raw_orders_id() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_orders", "id", "stg_orders", "order_id"); + assert!( + edge.is_some(), + "raw_orders.id -> stg_orders.order_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_orders_user_id() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_orders", "user_id", "stg_orders", "customer_id"); + assert!( + edge.is_some(), + "raw_orders.user_id -> stg_orders.customer_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_orders_created_at() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_orders", + "created_at", + "stg_orders", + "order_date", + ); + assert!( + edge.is_some(), + "raw_orders.created_at -> stg_orders.order_date edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_orders_amount() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_orders", "amount", "stg_orders", "amount"); + assert!( + edge.is_some(), + "raw_orders.amount -> stg_orders.amount edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_orders_status() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_orders", "status", "stg_orders", "status"); + assert!( + edge.is_some(), + "raw_orders.status -> stg_orders.status edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +// ── G4: Every raw_payments column traces to stg_payments or stg_payments_star ─ + +#[test] +fn test_cli_lineage_raw_payments_id_to_stg_payments() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_payments", "id", "stg_payments", "payment_id"); + assert!( + edge.is_some(), + "raw_payments.id -> stg_payments.payment_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_payments_order_id_to_stg_payments() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_payments", + "order_id", + "stg_payments", + "order_id", + ); + assert!( + edge.is_some(), + "raw_payments.order_id -> stg_payments.order_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_payments_amount_to_stg_payments() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_payments", "amount", "stg_payments", "amount"); + assert!( + edge.is_some(), + "raw_payments.amount -> stg_payments.amount edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "transform"); +} + +#[test] +fn test_cli_lineage_raw_payments_id_to_star() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_payments", "id", "stg_payments_star", "id"); + assert!( + edge.is_some(), + "raw_payments.id -> stg_payments_star.id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_payments_order_id_to_star() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_payments", + "order_id", + "stg_payments_star", + "order_id", + ); + assert!( + edge.is_some(), + "raw_payments.order_id -> stg_payments_star.order_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_payments_payment_method_to_star() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_payments", + "payment_method", + "stg_payments_star", + "payment_method", + ); + assert!( + edge.is_some(), + "raw_payments.payment_method -> stg_payments_star.payment_method edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_payments_amount_to_star() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_payments", + "amount", + "stg_payments_star", + "amount", + ); + assert!( + edge.is_some(), + "raw_payments.amount -> stg_payments_star.amount edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_payments_created_at_to_star() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_payments", + "created_at", + "stg_payments_star", + "created_at", + ); + assert!( + edge.is_some(), + "raw_payments.created_at -> stg_payments_star.created_at edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +// ── G5: Every raw_products column traces to stg_products ──────────────── + +#[test] +fn test_cli_lineage_raw_products_id() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_products", "id", "stg_products", "product_id"); + assert!( + edge.is_some(), + "raw_products.id -> stg_products.product_id edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_products_name() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_products", + "name", + "stg_products", + "product_name", + ); + assert!( + edge.is_some(), + "raw_products.name -> stg_products.product_name edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "rename"); +} + +#[test] +fn test_cli_lineage_raw_products_category() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge( + edges, + "raw_products", + "category", + "stg_products", + "category", + ); + assert!( + edge.is_some(), + "raw_products.category -> stg_products.category edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +#[test] +fn test_cli_lineage_raw_products_price() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_products", "price", "stg_products", "price"); + assert!( + edge.is_some(), + "raw_products.price -> stg_products.price edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "transform"); +} + +#[test] +fn test_cli_lineage_raw_products_active() { + let data = run_lineage_json(); + let edges = get_edges(&data); + let edge = find_edge(edges, "raw_products", "active", "stg_products", "active"); + assert!( + edge.is_some(), + "raw_products.active -> stg_products.active edge missing" + ); + assert_eq!(edge.unwrap()["kind"], "copy"); +} + +// ── G6: Aliased models — edges resolve through aliases correctly ──────── +// +// These are the critical tests: models with FROM table AS alias must resolve +// aliases to real table names. Without extract_alias_map(), these all fail. + +#[test] +fn test_cli_lineage_int_orders_enriched_alias_resolution() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // stg_orders aliased as `o` — edges must use real name + assert!( + find_edge( + edges, + "stg_orders", + "order_id", + "int_orders_enriched", + "order_id" + ) + .is_some(), + "stg_orders.order_id -> int_orders_enriched.order_id missing (alias: o)" + ); + assert!( + find_edge( + edges, + "stg_orders", + "customer_id", + "int_orders_enriched", + "customer_id" + ) + .is_some(), + "stg_orders.customer_id -> int_orders_enriched.customer_id missing (alias: o)" + ); + assert!( + find_edge( + edges, + "stg_orders", + "order_date", + "int_orders_enriched", + "order_date" + ) + .is_some(), + "stg_orders.order_date -> int_orders_enriched.order_date missing (alias: o)" + ); + assert!( + find_edge( + edges, + "stg_orders", + "status", + "int_orders_enriched", + "status" + ) + .is_some(), + "stg_orders.status -> int_orders_enriched.status missing (alias: o)" + ); + + // stg_payments aliased as `p` + let has_payment_edge = edges + .iter() + .any(|e| e["source_model"] == "stg_payments" && e["target_model"] == "int_orders_enriched"); + assert!( + has_payment_edge, + "int_orders_enriched should have edges from stg_payments (alias: p)" + ); +} + +#[test] +fn test_cli_lineage_int_customer_metrics_alias_resolution() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // stg_customers aliased as `c` + assert!( + find_edge( + edges, + "stg_customers", + "customer_id", + "int_customer_metrics", + "customer_id" + ) + .is_some(), + "stg_customers.customer_id -> int_customer_metrics.customer_id missing (alias: c)" + ); + assert!( + find_edge( + edges, + "stg_customers", + "customer_name", + "int_customer_metrics", + "customer_name" + ) + .is_some(), + "stg_customers.customer_name -> int_customer_metrics.customer_name missing (alias: c)" + ); + + // stg_orders aliased as `o` — aggregation columns + let has_orders_edge = edges + .iter() + .any(|e| e["source_model"] == "stg_orders" && e["target_model"] == "int_customer_metrics"); + assert!( + has_orders_edge, + "int_customer_metrics should have edges from stg_orders (alias: o)" + ); +} + +#[test] +fn test_cli_lineage_int_customer_ranking_alias_resolution() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // stg_customers aliased as `c` + assert!( + find_edge( + edges, + "stg_customers", + "customer_id", + "int_customer_ranking", + "customer_id" + ) + .is_some(), + "stg_customers.customer_id -> int_customer_ranking.customer_id missing (alias: c)" + ); + assert!( + find_edge( + edges, + "stg_customers", + "customer_name", + "int_customer_ranking", + "customer_name" + ) + .is_some(), + "stg_customers.customer_name -> int_customer_ranking.customer_name missing (alias: c)" + ); + + // int_customer_metrics aliased as `m` + assert!( + find_edge( + edges, + "int_customer_metrics", + "lifetime_value", + "int_customer_ranking", + "lifetime_value" + ) + .is_some(), + "int_customer_metrics.lifetime_value -> int_customer_ranking.lifetime_value missing (alias: m)" + ); + assert!( + find_edge( + edges, + "int_customer_metrics", + "lifetime_value", + "int_customer_ranking", + "value_or_zero" + ) + .is_some(), + "int_customer_metrics.lifetime_value -> int_customer_ranking.value_or_zero missing (alias: m)" + ); + assert!( + find_edge( + edges, + "int_customer_metrics", + "total_orders", + "int_customer_ranking", + "nonzero_orders" + ) + .is_some(), + "int_customer_metrics.total_orders -> int_customer_ranking.nonzero_orders missing (alias: m)" + ); +} + +#[test] +fn test_cli_lineage_dim_customers_alias_resolution() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // int_customer_metrics aliased as `m` + assert!( + find_edge( + edges, + "int_customer_metrics", + "customer_id", + "dim_customers", + "customer_id" + ) + .is_some(), + "int_customer_metrics.customer_id -> dim_customers.customer_id missing (alias: m)" + ); + assert!( + find_edge( + edges, + "int_customer_metrics", + "total_orders", + "dim_customers", + "total_orders" + ) + .is_some(), + "int_customer_metrics.total_orders -> dim_customers.total_orders missing (alias: m)" + ); + assert!( + find_edge( + edges, + "int_customer_metrics", + "lifetime_value", + "dim_customers", + "lifetime_value" + ) + .is_some(), + "int_customer_metrics.lifetime_value -> dim_customers.lifetime_value missing (alias: m)" + ); + + // stg_customers aliased as `c` + assert!( + find_edge( + edges, + "stg_customers", + "customer_name", + "dim_customers", + "customer_name" + ) + .is_some(), + "stg_customers.customer_name -> dim_customers.customer_name missing (alias: c)" + ); + assert!( + find_edge(edges, "stg_customers", "email", "dim_customers", "email").is_some(), + "stg_customers.email -> dim_customers.email missing (alias: c)" + ); + assert!( + find_edge( + edges, + "stg_customers", + "signup_date", + "dim_customers", + "signup_date" + ) + .is_some(), + "stg_customers.signup_date -> dim_customers.signup_date missing (alias: c)" + ); +} + +#[test] +fn test_cli_lineage_fct_orders_alias_resolution() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // int_orders_enriched aliased as `e` + assert!( + find_edge( + edges, + "int_orders_enriched", + "order_id", + "fct_orders", + "order_id" + ) + .is_some(), + "int_orders_enriched.order_id -> fct_orders.order_id missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "customer_id", + "fct_orders", + "customer_id" + ) + .is_some(), + "int_orders_enriched.customer_id -> fct_orders.customer_id missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "order_date", + "fct_orders", + "order_date" + ) + .is_some(), + "int_orders_enriched.order_date -> fct_orders.order_date missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "status", + "fct_orders", + "status" + ) + .is_some(), + "int_orders_enriched.status -> fct_orders.status missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "payment_total", + "fct_orders", + "payment_total" + ) + .is_some(), + "int_orders_enriched.payment_total -> fct_orders.payment_total missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "payment_count", + "fct_orders", + "payment_count" + ) + .is_some(), + "int_orders_enriched.payment_count -> fct_orders.payment_count missing (alias: e)" + ); + + // stg_customers aliased as `c` + assert!( + find_edge( + edges, + "stg_customers", + "customer_name", + "fct_orders", + "customer_name" + ) + .is_some(), + "stg_customers.customer_name -> fct_orders.customer_name missing (alias: c)" + ); + assert!( + find_edge( + edges, + "stg_customers", + "customer_tier", + "fct_orders", + "customer_tier" + ) + .is_some(), + "stg_customers.customer_tier -> fct_orders.customer_tier missing (alias: c)" + ); +} + +#[test] +fn test_cli_lineage_rpt_customer_orders_alias_resolution() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // stg_customers aliased as `c` + assert!( + find_edge( + edges, + "stg_customers", + "customer_id", + "rpt_customer_orders", + "customer_id" + ) + .is_some(), + "stg_customers.customer_id -> rpt_customer_orders.customer_id missing (alias: c)" + ); + assert!( + find_edge( + edges, + "stg_customers", + "customer_name", + "rpt_customer_orders", + "customer_name" + ) + .is_some(), + "stg_customers.customer_name -> rpt_customer_orders.customer_name missing (alias: c)" + ); + assert!( + find_edge( + edges, + "stg_customers", + "email", + "rpt_customer_orders", + "email" + ) + .is_some(), + "stg_customers.email -> rpt_customer_orders.email missing (alias: c)" + ); + + // int_orders_enriched aliased as `e` + assert!( + find_edge( + edges, + "int_orders_enriched", + "order_id", + "rpt_customer_orders", + "order_id" + ) + .is_some(), + "int_orders_enriched.order_id -> rpt_customer_orders.order_id missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "order_amount", + "rpt_customer_orders", + "order_amount" + ) + .is_some(), + "int_orders_enriched.order_amount -> rpt_customer_orders.order_amount missing (alias: e)" + ); + assert!( + find_edge( + edges, + "int_orders_enriched", + "payment_total", + "rpt_customer_orders", + "payment_total" + ) + .is_some(), + "int_orders_enriched.payment_total -> rpt_customer_orders.payment_total missing (alias: e)" + ); +} + +// ── G7: Per-column recursive tracing through aliased models ───────────── + +#[test] +fn test_cli_lineage_column_trace_customer_id_through_aliases() { + // int_customer_ranking.customer_id traces back through stg_customers -> raw_customers + let edges = run_lineage_column_json("int_customer_ranking", "customer_id"); + assert!( + !edges.is_empty(), + "int_customer_ranking.customer_id should have upstream lineage edges" + ); + + // Should trace back to raw_customers.id + let has_raw = edges.iter().any(|e| e["source_model"] == "raw_customers"); + assert!( + has_raw, + "int_customer_ranking.customer_id should trace back to raw_customers" + ); +} + +#[test] +fn test_cli_lineage_column_trace_fct_orders_order_id() { + // fct_orders.order_id traces: fct_orders <- int_orders_enriched <- stg_orders <- raw_orders + let edges = run_lineage_column_json("fct_orders", "order_id"); + assert!( + !edges.is_empty(), + "fct_orders.order_id should have upstream lineage edges" + ); + + let has_raw = edges.iter().any(|e| e["source_model"] == "raw_orders"); + assert!( + has_raw, + "fct_orders.order_id should trace back to raw_orders" + ); +} + +#[test] +fn test_cli_lineage_column_trace_dim_customers_email() { + // dim_customers.email traces: dim_customers <- stg_customers <- raw_customers + let edges = run_lineage_column_json("dim_customers", "email"); + assert!( + !edges.is_empty(), + "dim_customers.email should have upstream lineage edges" + ); + + let has_raw = edges.iter().any(|e| e["source_model"] == "raw_customers"); + assert!( + has_raw, + "dim_customers.email should trace back to raw_customers" + ); +} + +#[test] +fn test_cli_lineage_column_trace_fct_orders_customer_name() { + // fct_orders.customer_name traces: fct_orders <- stg_customers <- raw_customers + let edges = run_lineage_column_json("fct_orders", "customer_name"); + assert!( + !edges.is_empty(), + "fct_orders.customer_name should have upstream lineage edges" + ); + + let has_stg = edges.iter().any(|e| e["source_model"] == "stg_customers"); + assert!( + has_stg, + "fct_orders.customer_name should trace through stg_customers" + ); +} + +// ── G8: No alias leakage — verify no edges reference alias names ──────── + +#[test] +fn test_cli_lineage_no_alias_leakage() { + let data = run_lineage_json(); + let edges = get_edges(&data); + + // Known aliases used in the sample project + let alias_names = ["o", "c", "p", "m", "e"]; + + for edge in edges { + let source = edge["source_model"].as_str().unwrap_or(""); + let target = edge["target_model"].as_str().unwrap_or(""); + + for alias in &alias_names { + assert_ne!( + source, *alias, + "Edge source_model should be a real table name, not alias '{}': {:?}", + alias, edge + ); + assert_ne!( + target, *alias, + "Edge target_model should be a real table name, not alias '{}': {:?}", + alias, edge + ); + } + } +} + +// ── G9: Model-level lineage data uses real table names in source_tables ─ + +#[test] +fn test_cli_lineage_model_source_tables_resolved() { + let data = run_lineage_json(); + let models = data["models"].as_object().expect("models should be object"); + + let alias_names = ["o", "c", "p", "m", "e"]; + + for (model_name, model_data) in models { + if let Some(sources) = model_data["source_tables"].as_array() { + for source in sources { + let s = source.as_str().unwrap_or(""); + for alias in &alias_names { + assert_ne!( + s, *alias, + "Model '{}' source_tables contains alias '{}' instead of real table name", + model_name, alias + ); + } + } + } + } +} diff --git a/crates/ff-cli/tests/sa_integration_tests.rs b/crates/ff-cli/tests/sa_integration_tests.rs index a70f84dd..1a4079bb 100644 --- a/crates/ff-cli/tests/sa_integration_tests.rs +++ b/crates/ff-cli/tests/sa_integration_tests.rs @@ -86,6 +86,18 @@ fn assert_no_diagnostics_with_code(diagnostics: &[serde_json::Value], code: &str assert_diagnostics(diagnostics, code, 0); } +/// Filter out description drift diagnostics (A050-A052) from JSON results. +/// These are expected informational/warning diagnostics from the description_drift pass. +fn filter_description_drift(diagnostics: Vec) -> Vec { + diagnostics + .into_iter() + .filter(|d| { + let code = d.get("code").and_then(|c| c.as_str()).unwrap_or(""); + !matches!(code, "A050" | "A051" | "A052") + }) + .collect() +} + fn assert_no_error_severity(diagnostics: &[serde_json::Value]) { let errors: Vec<_> = diagnostics .iter() @@ -297,6 +309,7 @@ fn test_analyze_sample_project_no_regressions() { assert_no_diagnostics_with_code(&diagnostics, "A001"); assert_no_error_severity(&diagnostics); + let diagnostics = filter_description_drift(diagnostics); assert_eq!( diagnostics.len(), 0, @@ -504,7 +517,7 @@ fn test_sa_clean_project_no_xmodel_cli() { #[test] fn test_sa_dag_ecommerce_zero_diagnostics_cli() { - let diags = run_analyze_json("tests/fixtures/sa_dag_pass_ecommerce"); + let diags = filter_description_drift(run_analyze_json("tests/fixtures/sa_dag_pass_ecommerce")); assert_eq!( diags.len(), 0, @@ -649,7 +662,7 @@ fn test_cli_analyze_model_filter() { #[test] fn test_guard_clean_project_zero_diagnostics_cli() { - let diags = run_analyze_json("tests/fixtures/sa_clean_project"); + let diags = filter_description_drift(run_analyze_json("tests/fixtures/sa_clean_project")); assert_eq!( diags.len(), 0, @@ -661,7 +674,7 @@ fn test_guard_clean_project_zero_diagnostics_cli() { #[test] fn test_guard_ecommerce_zero_diagnostics_cli() { - let diags = run_analyze_json("tests/fixtures/sa_dag_pass_ecommerce"); + let diags = filter_description_drift(run_analyze_json("tests/fixtures/sa_dag_pass_ecommerce")); assert_eq!( diags.len(), 0, diff --git a/crates/ff-core/src/lib.rs b/crates/ff-core/src/lib.rs index 7a71ae07..7b5cbf7c 100644 --- a/crates/ff-core/src/lib.rs +++ b/crates/ff-core/src/lib.rs @@ -21,6 +21,7 @@ pub mod query_comment; pub mod reference_manifest; pub mod rules; pub mod run_state; +pub mod schema_registry; pub mod seed; pub mod seed_name; pub mod selector; @@ -50,6 +51,7 @@ pub use project::{Project, ProjectParts}; pub use query_comment::{ModelCommentInput, QueryCommentContext, QueryCommentMetadata}; pub use reference_manifest::{ReferenceManifest, ReferenceModelRef}; pub use run_state::{CompletedModel, FailedModel, RunState, RunStateSummary, RunStatus}; +pub use schema_registry::{ColumnInfo, SchemaRegistry}; pub use seed::Seed; pub use seed_name::SeedName; pub use selector::{apply_selectors, Selector, TraversalDepth}; diff --git a/crates/ff-core/src/schema_registry.rs b/crates/ff-core/src/schema_registry.rs new file mode 100644 index 00000000..82dd01b3 --- /dev/null +++ b/crates/ff-core/src/schema_registry.rs @@ -0,0 +1,91 @@ +//! Schema registry for column metadata lookup +//! +//! Provides a unified view of column names, types, and descriptions across +//! all project nodes (models, sources, seeds). Used by lineage to compute +//! description propagation status. + +use crate::project::Project; +use std::collections::HashMap; + +/// Metadata for a single column in a node. +#[derive(Debug, Clone)] +pub struct ColumnInfo { + /// Column name + pub name: String, + /// SQL data type + pub data_type: String, + /// Optional human-readable description + pub description: Option, +} + +/// Registry of column metadata indexed by node name then column name. +#[derive(Debug, Default)] +pub struct SchemaRegistry { + /// node_name -> { column_name_lowercase -> ColumnInfo } + nodes: HashMap>, +} + +impl SchemaRegistry { + /// Build a registry from a loaded project. + /// + /// Collects columns from: + /// - Model YAML schemas (`project.models`) + /// - Source table definitions (`project.sources`) + pub fn from_project(project: &Project) -> Self { + let mut registry = Self::default(); + + // Models + for (name, model) in &project.models { + if let Some(schema) = &model.schema { + let mut cols = HashMap::new(); + for col in &schema.columns { + cols.insert( + col.name.to_lowercase(), + ColumnInfo { + name: col.name.clone(), + data_type: col.data_type.clone(), + description: col.description.clone(), + }, + ); + } + if !cols.is_empty() { + registry.nodes.insert(name.to_string(), cols); + } + } + } + + // Sources + for source_file in &project.sources { + for table in &source_file.tables { + let mut cols = HashMap::new(); + for col in &table.columns { + cols.insert( + col.name.to_lowercase(), + ColumnInfo { + name: col.name.clone(), + data_type: col.data_type.clone(), + description: col.description.clone(), + }, + ); + } + if !cols.is_empty() { + registry.nodes.insert(table.name.clone(), cols); + } + } + } + + registry + } + + /// Look up a single column in a node. + pub fn get_column(&self, node: &str, column: &str) -> Option<&ColumnInfo> { + self.nodes + .get(node) + .and_then(|cols| cols.get(&column.to_lowercase())) + } + + /// Get all columns for a node. + pub fn get_columns(&self, node: &str) -> Option<&HashMap> { + self.nodes.get(node) + } +} diff --git a/crates/ff-sql/src/lib.rs b/crates/ff-sql/src/lib.rs index 9ae7ca78..cfe066a1 100644 --- a/crates/ff-sql/src/lib.rs +++ b/crates/ff-sql/src/lib.rs @@ -25,8 +25,8 @@ pub use extractor::{ }; pub use inline::{collect_ephemeral_dependencies, inline_ephemeral_ctes}; pub use lineage::{ - extract_column_lineage, ColumnLineage, ColumnRef, ExprType, LineageEdge, ModelLineage, - ProjectLineage, + extract_column_lineage, ColumnLineage, ColumnRef, DescriptionStatus, ExprType, LineageEdge, + LineageKind, ModelLineage, ProjectLineage, }; pub use parser::SqlParser; pub use qualify::qualify_table_references; diff --git a/crates/ff-sql/src/lineage.rs b/crates/ff-sql/src/lineage.rs index 4034d42f..8b760200 100644 --- a/crates/ff-sql/src/lineage.rs +++ b/crates/ff-sql/src/lineage.rs @@ -5,8 +5,8 @@ use serde::{Deserialize, Serialize}; use sqlparser::ast::{ - Expr, FunctionArg, FunctionArgExpr, Query, Select, SelectItem, SelectItemQualifiedWildcardKind, - SetExpr, Statement, TableFactor, TableWithJoins, + Expr, FunctionArg, FunctionArgExpr, JoinConstraint, JoinOperator, Query, Select, SelectItem, + SelectItemQualifiedWildcardKind, SetExpr, Statement, TableFactor, TableWithJoins, }; use std::collections::{HashMap, HashSet}; @@ -41,6 +41,55 @@ fn edge_endpoints( } } +/// Classification of how a column flows through the lineage chain. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum LineageKind { + /// Column passed through without modification (same name) + #[default] + Copy, + /// Column passed through but renamed (different name) + Rename, + /// Column modified via operations (aggregations, expressions, CASE, etc.) + Transform, + /// Column referenced in WHERE/JOIN/GROUP BY/HAVING but not in SELECT output + Inspect, +} + +impl std::fmt::Display for LineageKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LineageKind::Copy => write!(f, "copy"), + LineageKind::Rename => write!(f, "rename"), + LineageKind::Transform => write!(f, "transform"), + LineageKind::Inspect => write!(f, "inspect"), + } + } +} + +/// Status of description propagation between source and target columns. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum DescriptionStatus { + /// Target inherits the same description from source + Inherited, + /// Target has a different description than source + Modified, + /// Either source or target is missing a description + #[default] + Missing, +} + +impl std::fmt::Display for DescriptionStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DescriptionStatus::Inherited => write!(f, "inherited"), + DescriptionStatus::Modified => write!(f, "modified"), + DescriptionStatus::Missing => write!(f, "missing"), + } + } +} + /// Expression type for a lineage column #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "snake_case")] @@ -93,7 +142,7 @@ pub struct ColumnRef { impl ColumnRef { /// Create from a simple column name - pub(crate) fn simple(column: &str) -> Self { + pub fn simple(column: &str) -> Self { Self { table: None, column: column.to_string(), @@ -101,7 +150,7 @@ impl ColumnRef { } /// Create from table.column - pub(crate) fn qualified(table: &str, column: &str) -> Self { + pub fn qualified(table: &str, column: &str) -> Self { Self { table: Some(table.to_string()), column: column.to_string(), @@ -177,8 +226,10 @@ impl ColumnLineage { pub struct ModelLineage { /// Model name pub model_name: String, - /// Column lineages for this model + /// Column lineages for this model (SELECT output) pub columns: Vec, + /// Columns referenced in WHERE/JOIN ON/GROUP BY/HAVING but not in SELECT output (Inspect edges) + pub inspect_columns: Vec, /// Table aliases used in the query pub table_aliases: HashMap, /// Source tables referenced @@ -191,6 +242,7 @@ impl ModelLineage { Self { model_name: model_name.to_string(), columns: Vec::new(), + inspect_columns: Vec::new(), table_aliases: HashMap::new(), source_tables: HashSet::new(), } @@ -223,6 +275,10 @@ pub struct LineageEdge { pub is_direct: bool, /// Expression type pub expr_type: ExprType, + /// Lineage kind (Copy, Rename, Transform, Inspect) + pub kind: LineageKind, + /// Description propagation status + pub description_status: DescriptionStatus, /// Data classification from source column (propagated from schema) #[serde(skip_serializing_if = "Option::is_none")] pub classification: Option, @@ -250,6 +306,7 @@ impl ProjectLineage { /// Resolve cross-model edges by matching source tables to known models pub fn resolve_edges(&mut self, known_models: &HashSet<&str>) { + // SELECT output edges (Copy/Rename/Transform) let new_edges: Vec<_> = self .models .iter() @@ -263,6 +320,19 @@ impl ProjectLineage { .collect(); self.edges.extend(new_edges); + + // Inspect edges from WHERE/JOIN ON/GROUP BY/HAVING + let inspect_edges: Vec<_> = self + .models + .iter() + .flat_map(|(target, lineage)| { + lineage.inspect_columns.iter().filter_map(move |col_ref| { + resolve_inspect_edge(target, lineage, col_ref, known_models) + }) + }) + .collect(); + + self.edges.extend(inspect_edges); } /// Trace a column upstream — find all source columns that contribute to it @@ -322,6 +392,30 @@ impl ProjectLineage { result } + /// Compute description status for all edges using a column description lookup. + /// + /// `descriptions` maps `node_name -> { column_name -> description }`. + /// Call this after `resolve_edges()`. + pub fn compute_description_status( + &mut self, + descriptions: &HashMap>, + ) { + for edge in &mut self.edges { + let src_desc = descriptions + .get(&edge.source_model) + .and_then(|cols| cols.get(&edge.source_column.to_lowercase())); + let tgt_desc = descriptions + .get(&edge.target_model) + .and_then(|cols| cols.get(&edge.target_column.to_lowercase())); + + edge.description_status = match (src_desc, tgt_desc) { + (Some(s), Some(t)) if s == t => DescriptionStatus::Inherited, + (Some(_), Some(_)) => DescriptionStatus::Modified, + _ => DescriptionStatus::Missing, + }; + } + } + /// Propagate data classifications from schema definitions onto lineage edges /// /// For each edge, looks up the source column's classification in the provided @@ -361,10 +455,11 @@ impl ProjectLineage { dot.push('\n'); for edge in &self.edges { - let style = if edge.is_direct { - "" - } else { - " [style=dashed]" + let style = match edge.kind { + LineageKind::Copy => " [label=\"copy\"]", + LineageKind::Rename => " [label=\"rename\"]", + LineageKind::Transform => " [style=bold, label=\"transform\"]", + LineageKind::Inspect => " [style=dashed, label=\"inspect\"]", }; dot.push_str(&format!( " \"{}\":\"{}\" -> \"{}\":\"{}\"{};\n", @@ -399,7 +494,16 @@ fn resolve_single_edge( source_ref: &ColumnRef, known_models: &HashSet<&str>, ) -> Option { - let source_table = source_ref.table.as_deref().unwrap_or(""); + // When source_ref.table is None (unqualified column like `id` instead of `t.id`), + // infer from source_tables if there is exactly one source table. + let inferred_table: Option<&str> = match source_ref.table.as_deref() { + Some(t) => Some(t), + None if lineage.source_tables.len() == 1 => { + lineage.source_tables.iter().next().map(|s| s.as_str()) + } + None => None, + }; + let source_table = inferred_table.unwrap_or(""); let resolved_table = lineage .table_aliases .get(source_table) @@ -408,6 +512,20 @@ fn resolve_single_edge( let source_model = known_models .iter() .find(|m| m.eq_ignore_ascii_case(resolved_table))?; + + let kind = if col_lineage.is_direct { + if source_ref + .column + .eq_ignore_ascii_case(&col_lineage.output_column) + { + LineageKind::Copy + } else { + LineageKind::Rename + } + } else { + LineageKind::Transform + }; + Some(LineageEdge { source_model: source_model.to_string(), source_column: source_ref.column.clone(), @@ -415,6 +533,45 @@ fn resolve_single_edge( target_column: col_lineage.output_column.clone(), is_direct: col_lineage.is_direct, expr_type: col_lineage.expr_type, + kind, + description_status: DescriptionStatus::Missing, + classification: None, + }) +} + +/// Resolve an Inspect edge for a column referenced in WHERE/JOIN/GROUP BY/HAVING. +fn resolve_inspect_edge( + target_model: &str, + lineage: &ModelLineage, + col_ref: &ColumnRef, + known_models: &HashSet<&str>, +) -> Option { + let inferred_table: Option<&str> = match col_ref.table.as_deref() { + Some(t) => Some(t), + None if lineage.source_tables.len() == 1 => { + lineage.source_tables.iter().next().map(|s| s.as_str()) + } + None => None, + }; + let source_table = inferred_table.unwrap_or(""); + let resolved_table = lineage + .table_aliases + .get(source_table) + .map(|s| s.as_str()) + .unwrap_or(source_table); + let source_model = known_models + .iter() + .find(|m| m.eq_ignore_ascii_case(resolved_table))?; + + Some(LineageEdge { + source_model: source_model.to_string(), + source_column: col_ref.column.clone(), + target_model: target_model.to_string(), + target_column: col_ref.column.clone(), + is_direct: false, + expr_type: ExprType::Expression, + kind: LineageKind::Inspect, + description_status: DescriptionStatus::Missing, classification: None, }) } @@ -519,6 +676,53 @@ fn extract_lineage_from_select(select: &Select, lineage: &mut ModelLineage) { } } } + + // Collect columns already in SELECT output for dedup + let select_columns: HashSet = lineage + .columns + .iter() + .flat_map(|c| c.source_columns.iter().cloned()) + .collect(); + + // Extract Inspect columns from WHERE, JOIN ON, GROUP BY, HAVING + let mut inspect_refs: HashSet = HashSet::new(); + + // WHERE clause + if let Some(ref selection) = select.selection { + let wh = extract_lineage_from_expr(selection, lineage); + inspect_refs.extend(wh.source_columns); + } + + // JOIN ON clauses + for table in &select.from { + for join in &table.joins { + if let Some(expr) = extract_join_on_expr(&join.join_operator) { + let on = extract_lineage_from_expr(expr, lineage); + inspect_refs.extend(on.source_columns); + } + } + } + + // GROUP BY + if let sqlparser::ast::GroupByExpr::Expressions(ref exprs, _) = select.group_by { + for expr in exprs { + let gb = extract_lineage_from_expr(expr, lineage); + inspect_refs.extend(gb.source_columns); + } + } + + // HAVING + if let Some(ref having) = select.having { + let hv = extract_lineage_from_expr(having, lineage); + inspect_refs.extend(hv.source_columns); + } + + // Only keep refs that are NOT already in SELECT output + for col_ref in inspect_refs { + if col_ref.column != "*" && !select_columns.contains(&col_ref) { + lineage.inspect_columns.push(col_ref); + } + } } /// Extract table aliases from a FROM clause table reference @@ -779,6 +983,24 @@ fn extract_columns_from_function_args( } } +/// Extract the ON expression from a join operator, if present. +fn extract_join_on_expr(op: &JoinOperator) -> Option<&Expr> { + let constraint = match op { + JoinOperator::Join(c) + | JoinOperator::Inner(c) + | JoinOperator::Left(c) + | JoinOperator::LeftOuter(c) + | JoinOperator::Right(c) + | JoinOperator::RightOuter(c) + | JoinOperator::FullOuter(c) => Some(c), + _ => None, + }; + match constraint { + Some(JoinConstraint::On(expr)) => Some(expr), + _ => None, + } +} + #[cfg(test)] #[path = "lineage_test.rs"] mod tests; diff --git a/crates/ff-sql/src/lineage_test.rs b/crates/ff-sql/src/lineage_test.rs index 21df3460..edf7a4f5 100644 --- a/crates/ff-sql/src/lineage_test.rs +++ b/crates/ff-sql/src/lineage_test.rs @@ -200,6 +200,11 @@ fn build_project_lineage(edges: Vec) -> ProjectLineage { } fn make_edge(src_model: &str, src_col: &str, tgt_model: &str, tgt_col: &str) -> LineageEdge { + let kind = if src_col == tgt_col { + LineageKind::Copy + } else { + LineageKind::Rename + }; LineageEdge { source_model: src_model.to_string(), source_column: src_col.to_string(), @@ -207,6 +212,8 @@ fn make_edge(src_model: &str, src_col: &str, tgt_model: &str, tgt_col: &str) -> target_column: tgt_col.to_string(), is_direct: true, expr_type: ExprType::Column, + kind, + description_status: DescriptionStatus::Missing, classification: None, } } @@ -318,3 +325,275 @@ fn test_recursive_no_matches() { let upstream = lineage.trace_column_recursive("A", "x"); assert!(upstream.is_empty()); } + +// --- Unqualified column inference tests --- + +#[test] +fn test_bare_column_resolves_to_single_source_table() { + // stg_customers: SELECT id AS customer_id FROM raw_customers + // `id` has no table qualifier, but source_tables has exactly one entry + let lineage = parse_and_extract_lineage( + "SELECT id AS customer_id FROM raw_customers", + "stg_customers", + ) + .unwrap(); + + assert_eq!(lineage.source_tables.len(), 1); + assert!(lineage.source_tables.contains("raw_customers")); + + let customer_id = lineage.get_column("customer_id").unwrap(); + // source_ref.table is None for bare `id` + assert!(customer_id + .source_columns + .contains(&ColumnRef::simple("id"))); + + // Now resolve edges — bare column should infer raw_customers + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + + let known: HashSet<&str> = ["raw_customers", "stg_customers"].iter().copied().collect(); + project.resolve_edges(&known); + + // Should create edge: raw_customers.id → stg_customers.customer_id + assert_eq!(project.edges.len(), 1); + assert_eq!(project.edges[0].source_model, "raw_customers"); + assert_eq!(project.edges[0].source_column, "id"); + assert_eq!(project.edges[0].target_model, "stg_customers"); + assert_eq!(project.edges[0].target_column, "customer_id"); +} + +#[test] +fn test_bare_column_with_multiple_sources_stays_unresolved() { + // When there are 2+ source tables, bare column cannot be resolved + let lineage = + parse_and_extract_lineage("SELECT id, name FROM table_a, table_b", "test_model").unwrap(); + + assert_eq!(lineage.source_tables.len(), 2); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + + let known: HashSet<&str> = ["table_a", "table_b"].iter().copied().collect(); + project.resolve_edges(&known); + + // No edges should be created — ambiguous source + assert!(project.edges.is_empty()); +} + +#[test] +fn test_multi_hop_chain_with_bare_columns() { + // Simulate: raw_customers -> stg_customers -> int_customer_ranking + // stg_customers: SELECT id AS customer_id FROM raw_customers (bare column) + // int_customer_ranking: SELECT c.customer_id FROM stg_customers c (qualified) + + let stg = parse_and_extract_lineage( + "SELECT id AS customer_id FROM raw_customers", + "stg_customers", + ) + .unwrap(); + + let int_rank = parse_and_extract_lineage( + "SELECT c.customer_id FROM stg_customers c", + "int_customer_ranking", + ) + .unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(stg); + project.add_model_lineage(int_rank); + + let known: HashSet<&str> = ["raw_customers", "stg_customers", "int_customer_ranking"] + .iter() + .copied() + .collect(); + project.resolve_edges(&known); + + // Should have 2 edges forming the full chain + assert_eq!(project.edges.len(), 2); + + // Trace upstream from int_customer_ranking.customer_id should find both hops + let chain = project.trace_column_recursive("int_customer_ranking", "customer_id"); + assert_eq!(chain.len(), 2, "Expected 2-hop chain, got {}", chain.len()); + + // Verify the chain: raw_customers.id -> stg_customers.customer_id -> int_customer_ranking.customer_id + assert!(chain + .iter() + .any(|e| e.source_model == "raw_customers" && e.source_column == "id")); + assert!(chain + .iter() + .any(|e| e.source_model == "stg_customers" && e.source_column == "customer_id")); +} + +// --- LineageKind tests --- + +#[test] +fn test_lineage_kind_copy() { + // SELECT customer_id FROM stg_customers — same name = Copy + let lineage = + parse_and_extract_lineage("SELECT customer_id FROM stg_customers", "test_model").unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["stg_customers"].iter().copied().collect(); + project.resolve_edges(&known); + + assert_eq!(project.edges.len(), 1); + assert_eq!(project.edges[0].kind, LineageKind::Copy); +} + +#[test] +fn test_lineage_kind_rename() { + // SELECT id AS customer_id FROM raw_customers — different name = Rename + let lineage = parse_and_extract_lineage( + "SELECT id AS customer_id FROM raw_customers", + "stg_customers", + ) + .unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["raw_customers"].iter().copied().collect(); + project.resolve_edges(&known); + + assert_eq!(project.edges.len(), 1); + assert_eq!(project.edges[0].kind, LineageKind::Rename); +} + +#[test] +fn test_lineage_kind_transform() { + // SELECT COUNT(id) AS cnt FROM orders — function = Transform + let lineage = + parse_and_extract_lineage("SELECT COUNT(o.id) AS cnt FROM orders o", "test_model").unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["orders"].iter().copied().collect(); + project.resolve_edges(&known); + + assert_eq!(project.edges.len(), 1); + assert_eq!(project.edges[0].kind, LineageKind::Transform); +} + +// --- Inspect edge tests --- + +#[test] +fn test_inspect_edges_from_where() { + // WHERE references status which is NOT in SELECT output + let lineage = parse_and_extract_lineage( + "SELECT o.order_id FROM orders o WHERE o.status = 'completed'", + "test_model", + ) + .unwrap(); + + // status should be in inspect_columns + assert!( + lineage.inspect_columns.iter().any(|r| r.column == "status"), + "WHERE column 'status' should be an inspect column" + ); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["orders"].iter().copied().collect(); + project.resolve_edges(&known); + + // Should have 1 Copy edge (order_id) + 1 Inspect edge (status) + let copy_edges: Vec<_> = project + .edges + .iter() + .filter(|e| e.kind != LineageKind::Inspect) + .collect(); + let inspect_edges: Vec<_> = project + .edges + .iter() + .filter(|e| e.kind == LineageKind::Inspect) + .collect(); + assert_eq!(copy_edges.len(), 1, "Expected 1 copy edge for order_id"); + assert_eq!(inspect_edges.len(), 1, "Expected 1 inspect edge for status"); + assert_eq!(inspect_edges[0].source_column, "status"); +} + +#[test] +fn test_inspect_edges_from_join_on() { + // JOIN ON references customer_id in both tables, but only SELECT output from one + let lineage = parse_and_extract_lineage( + "SELECT c.customer_name FROM stg_customers c + INNER JOIN stg_orders o ON c.customer_id = o.customer_id", + "test_model", + ) + .unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["stg_customers", "stg_orders"].iter().copied().collect(); + project.resolve_edges(&known); + + // Should have inspect edges for customer_id from JOIN ON + let inspect_edges: Vec<_> = project + .edges + .iter() + .filter(|e| e.kind == LineageKind::Inspect) + .collect(); + assert!( + inspect_edges + .iter() + .any(|e| e.source_column == "customer_id"), + "JOIN ON customer_id should create inspect edges" + ); +} + +#[test] +fn test_inspect_edges_from_group_by() { + // GROUP BY customer_id, but customer_id is also in SELECT — should NOT be inspect + // HAVING uses total which is in SELECT — should NOT be inspect + let lineage = parse_and_extract_lineage( + "SELECT o.customer_id, count(o.order_id) as total + FROM orders o + GROUP BY o.customer_id + HAVING count(o.order_id) > 5", + "test_model", + ) + .unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["orders"].iter().copied().collect(); + project.resolve_edges(&known); + + // customer_id is in both SELECT and GROUP BY — should NOT produce an inspect edge + let inspect_edges: Vec<_> = project + .edges + .iter() + .filter(|e| e.kind == LineageKind::Inspect) + .collect(); + assert!( + !inspect_edges + .iter() + .any(|e| e.source_column == "customer_id"), + "customer_id is already in SELECT, should not be an inspect edge" + ); +} + +#[test] +fn test_no_inspect_for_columns_already_in_select() { + // WHERE references order_id which IS in SELECT — should NOT be inspect + let lineage = parse_and_extract_lineage( + "SELECT o.order_id, o.amount FROM orders o WHERE o.order_id > 100", + "test_model", + ) + .unwrap(); + + let mut project = ProjectLineage::new(); + project.add_model_lineage(lineage); + let known: HashSet<&str> = ["orders"].iter().copied().collect(); + project.resolve_edges(&known); + + let inspect_edges: Vec<_> = project + .edges + .iter() + .filter(|e| e.kind == LineageKind::Inspect) + .collect(); + assert!( + inspect_edges.is_empty(), + "No inspect edges expected — all WHERE columns are already in SELECT" + ); +} diff --git a/docs/column-lineage-redesign.md b/docs/column-lineage-redesign.md new file mode 100644 index 00000000..e9e7b20e --- /dev/null +++ b/docs/column-lineage-redesign.md @@ -0,0 +1,1380 @@ +# Column-Level Lineage Redesign + +## Industry Research + +### dbt Fusion (SDF Labs Engine) + +dbt Fusion's column-level lineage is powered by an acquisition of SDF Labs — a Rust-based SQL compiler that builds an internal representation (IR) / logical plan for every query. Key characteristics: + +- **AOT (Ahead-of-Time) rendering**: compiles and validates ALL models before any warehouse execution, so lineage is available for the entire project before anything runs +- **No database connection required**: static analysis works locally +- **Schema propagation across DAG**: the engine knows what columns exist in every model, understands function type signatures, and propagates types in topological order + +**Lineage classification (3 types):** + +| Type | Meaning | +| ------------- | ------------------------------------------------------------------------------- | +| **Copy** | Column passed through without modification | +| **Transform** | Column modified via operations (aggregations, expressions) | +| **Inspect** | Column referenced/examined but lineage is ambiguous (JSON unpacking, complex ops)| + +**Column evolution (secondary classification):** + +| Type | Meaning | +| --------------- | --------------------------------------------------------------- | +| **Passthrough** | Reused without change — auto-inherits upstream descriptions | +| **Rename** | Reused with new name — still inherits descriptions | +| **Transformed** | Modified — breaks inheritance chain, needs new docs | + +**Limitations**: only tracks SELECT statements (not WHERE/JOIN usage), can't parse Python models, introspective models (`run_query()`) fall back to JIT. Enterprise-only. + +### SQLMesh / sqlglot (Tobiko Data) + +Uses sqlglot, an open-source Python SQL parser/transpiler (20+ dialects) with a built-in optimizer that powers lineage. + +**Core API**: `lineage(column, sql, schema, sources, dialect)` — returns a tree of `Node` objects representing the lineage chain. The optimizer qualifies columns (resolves which table each unqualified column belongs to) before tracing. + +**Critical requirement**: schema metadata MUST be externally provided for all root tables. Without schemas, unqualified column references (`SELECT id FROM t`) cannot be resolved correctly. + +**Handles complex SQL**: UNION (positional column matching), PIVOTs, UDTFs, `SELECT *` expansion. (CTE/recursive CTE support is irrelevant to Feather-Flow since we forbid CTEs and derived tables via S005/S006.) + +**No built-in classification** — tools built on top (Recce, DataHub) add their own: Passthrough / Rename / Derived / Source / Unknown. + +**Limitations**: requires pre-existing schemas, no dynamic SQL, Python-based (slower than Rust), classification must be layered on top. + +### Key Takeaway From Both + +Both approaches require **schema awareness** as a prerequisite for correct column resolution. The critical step is **column qualification** — resolving which table each unqualified column reference belongs to. Without this, `SELECT id FROM raw_customers` loses the connection between `id` and `raw_customers`. + +--- + +## Current Feather-Flow Implementation + +### Two Independent Systems + +1. **AST-based (`ff-sql/src/lineage.rs`)** — used by `ff lineage` CLI command. Parses SQL with sqlparser-rs and walks the AST. +2. **DataFusion-based (`ff-analysis/src/datafusion_bridge/lineage.rs`)** — used only within the analysis engine. Per-model only, no cross-model resolution. + +### The Specific Bug: `int_customer_ranking.customer_id` Lineage + +**Expected full chain:** + +```text +raw_customers.id → stg_customers.customer_id → int_customer_ranking.customer_id +``` + +**Actual output (only 1 hop):** + +```text +stg_customers.customer_id → int_customer_ranking.customer_id +``` + +### Root Cause Analysis + +**Two critical bugs in `resolve_single_edge()` at `ff-sql/src/lineage.rs:395`:** + +#### Bug 1: Unqualified column references resolve to empty string + +```rust +// lineage.rs:402 +let source_table = source_ref.table.as_deref().unwrap_or(""); +``` + +When `stg_customers.sql` says `SELECT id as customer_id FROM raw_customers` (no table prefix on `id`), the extracted `ColumnRef` has `table: None`. This becomes `""`, which matches no model. **Edge silently dropped.** + +#### Bug 2: Seeds and sources excluded from `known_models` + +```rust +// lineage.rs (CLI lineage.rs:18) +let known_models: HashSet<&str> = project.models.keys().map(|k| k.as_str()).collect(); +``` + +Even if `raw_customers` were resolved as the source table, it's a **seed** (lives in `project.seeds`), not a model. Seeds and source tables are invisible to edge resolution. + +### Walk-through of the Failure + +1. **stg_customers extraction**: `id as customer_id` → `ColumnRef { table: None, column: "id" }`, `source_tables: { "raw_customers" }` +2. **Edge resolution**: `source_ref.table` is `None` → resolves to `""` → no match in `known_models` → **no edge created** +3. **int_customer_ranking extraction**: `c.customer_id` → alias `c` resolves to `stg_customers` → `ColumnRef { table: "stg_customers", column: "customer_id" }` +4. **Edge resolution**: `stg_customers` IS in `known_models` → edge created: `stg_customers.customer_id → int_customer_ranking.customer_id` +5. **Recursive trace**: BFS from `int_customer_ranking.customer_id` → finds hop to `stg_customers.customer_id` → looks for edges targeting `stg_customers.customer_id` → **none exist** → stops + +--- + +## Feather-Flow's Unfair Advantage + +Unlike dbt or SQLMesh, Feather-Flow **already has** the key prerequisites that make column lineage hard: + +1. **Mandatory 1:1 YAML schemas** — every node MUST have a `.yml` with column definitions. We don't need to introspect a database or guess schemas. +2. **Source YAML with full column metadata** — `raw_ecommerce.yml` already declares every column with name + type for every source table. All sources are required to have schema definitions. +3. **Static analysis validates schemas match SQL** — SA01/SA02 diagnostics ensure YAML columns match what the SQL actually produces. This means our schema metadata is **guaranteed correct** at runtime. +4. **DataFusion schema propagation** — `propagate_schemas()` already walks the DAG in topo order, building full schema context for every model. +5. **Seed schemas** — seed nodes have YAML with column definitions. +6. **No CTEs or derived tables** — S005/S006 enforce this. Every SELECT operates directly on named tables, making lineage tracing structurally simpler than what dbt/SQLMesh must handle. No recursive scope descent needed. + +Most tools struggle to get schema information. We have it by design, it's validated, and our SQL constraints make tracing deterministic. + +--- + +## Proposed Design: DataFusion-First Column Lineage + +### Core Principles + +1. **DataFusion is the lineage engine** — use DataFusion `LogicalPlan` for per-model column extraction (more accurate, already integrated with schema propagation). Retire AST-based lineage in `ff-sql` as the primary system. +2. **Leverage validated YAML schemas** — build a `SchemaRegistry` from our guaranteed-correct YAML metadata for column qualification and description tracking. +3. **No CTEs/derived tables simplifies everything** — since S005/S006 forbid these, every FROM/JOIN target is a named table. Column qualification is unambiguous given our schema metadata. + +### Classification System + +All four kinds answer the question "how is this column **used** relative to the SELECT output?" + +| Kind | In SELECT? | Definition | Example | +| ------------- | ---------- | ------------------------------------------------------------- | ------------------------------------------------ | +| **Copy** | Yes | Column passed through with same name, no transformation | `SELECT customer_id FROM stg_customers` | +| **Rename** | Yes | Column passed through (direct ref) but aliased to a new name | `SELECT id AS customer_id FROM raw_customers` | +| **Transform** | Yes | Column derived from expression, aggregation, function, etc. | `SELECT coalesce(val, 0) AS val FROM ...` | +| **Inspect** | No | Column referenced in WHERE, JOIN ON, GROUP BY, HAVING only | `WHERE status = 'active'`, `ON a.id = b.id` | + +### Description Tracking + +Each lineage edge carries a `description_status` indicating whether the column's YAML description has changed across the hop: + +| Status | Meaning | +| ------------- | -------------------------------------------------------------- | +| **Inherited** | Description text is identical to the upstream column's | +| **Modified** | Description text differs from the upstream column's | +| **Missing** | No description defined on this node's YAML | + +This enables: + +- **Lint rules**: warn when a Copy/Rename column has a modified description (likely stale or inconsistent) +- **Auto-suggest**: for Copy/Rename columns missing descriptions, suggest inheriting from upstream +- **Documentation drift detection**: surface models where descriptions diverge from their source of truth + +### Phase 1: Fix the Two Bugs (Minimal, High-Impact) + +Fix the two root causes without architectural changes: + +**1a. Resolve unqualified columns using `source_tables`** + +In `resolve_single_edge()`, when `source_ref.table` is `None`: + +- If the model has exactly 1 source table, use it (unambiguous) +- If multiple source tables, match the column name against YAML schemas for each source table to disambiguate +- Since we forbid CTEs/derived tables, every source table is a named table with known schema — this is always resolvable + +**1b. Expand `known_models` to include seeds and source tables** + +```rust +let mut known_nodes: HashSet<&str> = project.models.keys().map(|k| k.as_str()).collect(); +for seed in &project.seeds { + known_nodes.insert(seed.name.as_str()); +} +for source_file in &project.sources { + for table in &source_file.tables { + known_nodes.insert(&table.name); + } +} +``` + +**Result after Phase 1**: The full chain `raw_customers.id → stg_customers.customer_id → int_customer_ranking.customer_id` works. + +### Phase 2: Schema-Powered Column Qualification + Description Tracking + +Build a proper column qualification pass using validated YAML metadata. + +**2a. Build a `SchemaRegistry`** + +A lookup structure built from project metadata: + +```rust +struct ColumnMeta { + name: String, + data_type: Option, + description: Option, + classification: Option, +} + +struct SchemaRegistry { + /// model_name → { column_name → ColumnMeta } + models: HashMap>, + /// source_table_name → { column_name → ColumnMeta } + sources: HashMap>, + /// seed_name → { column_name → ColumnMeta } + seeds: HashMap>, +} +``` + +Populated from: + +- Model YAML `columns:` sections +- Source YAML `tables[].columns:` sections +- Seed YAML `columns:` sections + +Since all of these are mandatory and SA-validated, the registry is always complete. + +**2b. Column qualification pass** + +Before edge resolution, qualify unresolved column references: + +- For every `ColumnRef { table: None, column }`, look up which source table(s) in the model's FROM clause own that column via the `SchemaRegistry` +- For `SELECT *`, expand to all columns from the source table(s) using schemas +- For ambiguous columns (same name in multiple JOINed tables), flag as ambiguous (this is already a SQL error that DataFusion would catch) + +Since CTEs and derived tables are forbidden, every table reference resolves to a known node with a known schema. No recursive scope descent needed. + +**2c. Richer `LineageEdge`** + +Extend `LineageEdge` with: + +```rust +struct LineageEdge { + source_model: String, + source_column: String, + target_model: String, + target_column: String, + kind: LineageKind, // Copy, Rename, Transform, Inspect + classification: Option, + description_status: DescriptionStatus, // Inherited, Modified, Missing +} +``` + +Classification rules: + +- **Copy**: `ExprType::Column` AND `source_column == target_column` +- **Rename**: `ExprType::Column` AND `source_column != target_column` +- **Transform**: `ExprType::Function | Expression | Cast | Case | Subquery` +- **Inspect**: column appears in WHERE/JOIN/GROUP BY but not in SELECT + +### Phase 3: DataFusion-First Lineage Engine + +Unify the two lineage systems with DataFusion as the primary engine. + +**Why DataFusion over AST**: + +- DataFusion already runs during `propagate_schemas()` in topo order across the DAG +- DataFusion's `LogicalPlan` resolves column references more accurately than raw AST walking (it understands join semantics, projection pushdown, etc.) +- DataFusion already classifies Copy/Transform/Inspect from the plan +- We already build `LogicalPlan` for every model during static analysis — lineage extraction is a natural extension + +**Architecture**: + +1. During `propagate_schemas()`, after building each model's `LogicalPlan`, extract per-model column lineage from the plan (already done in `lineage.rs` DataFusion bridge) +2. Feed extracted per-model lineage into `ProjectLineage` for cross-model resolution (reuse the BFS traversal logic from `ff-sql`) +3. Use `SchemaRegistry` for column qualification of unresolved references +4. The `ff lineage` CLI command triggers the unified pipeline instead of doing its own AST extraction + +**What stays in `ff-sql`**: `ProjectLineage`, `LineageEdge`, cross-model resolution (`resolve_edges`), recursive traversal (`trace_column_recursive`, `column_consumers_recursive`), DOT output. These are graph operations, not SQL parsing. + +**What moves to DataFusion**: per-model column extraction (replaces `extract_column_lineage` AST walker). + +### Phase 4: Description Inheritance + Lint Rules + +Since we now track `kind` and `description_status`: + +- **Copy/Rename columns with `Missing` description**: suggest inheriting from upstream +- **Copy/Rename columns with `Modified` description**: warn about potential documentation drift (new diagnostic code, e.g., A050) +- **Transform columns with `Missing` description**: warn that transformed columns need documentation +- `ff validate` surfaces these as warnings +- `ff lineage --column X` shows description status in the chain view + +### Phase 5: Enhanced CLI Output + +**Chain view** (default when `--column` is specified): + +```text +$ ff lineage -n int_customer_ranking --column customer_id --direction upstream + +CHAIN: int_customer_ranking.customer_id +════════════════════════════════════════════════════════════════ + + raw_customers.id (source, INTEGER) + "Unique order identifier" + │ rename → description: modified + stg_customers.customer_id (sql, INTEGER, pii) + "Unique identifier for the customer" + │ copy → description: modified + int_customer_ranking.customer_id (sql, INTEGER, pii) + "Unique customer identifier" + +3 nodes in lineage chain. +``` + +**Table view** (default when no `--column`): + +```text +$ ff lineage -n int_customer_ranking + +SOURCE MODEL SOURCE COLUMN TARGET MODEL TARGET COLUMN KIND CLASS DESC STATUS +────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── +stg_customers customer_id int_customer_ranking customer_id copy pii modified +stg_customers customer_name int_customer_ranking customer_name copy pii inherited +int_customer_metrics lifetime_value int_customer_ranking lifetime_value copy - inherited +int_customer_metrics lifetime_value int_customer_ranking value_or_zero transform - modified +int_customer_metrics total_orders int_customer_ranking nonzero_orders transform - modified +int_customer_metrics customer_id int_customer_ranking customer_id inspect pii - +``` + +**JSON output** (`--output json`): full edge metadata including kind, classification, description_status, source/target types. + +**DOT output** (`--output dot`): column-level DAG with edge labels for kind, suitable for VS Code extension. + +--- + +## Implementation Priority + +| Phase | Effort | Impact | Description | +| ------- | ------ | ---------- | ------------------------------------------------------------------ | +| **1** | Small | **High** | Fix two bugs — unqualified columns + seeds/sources in known_nodes | +| **2** | Medium | **High** | SchemaRegistry + column qualification + description tracking | +| **3** | Medium | **High** | DataFusion-first engine, retire AST extraction as primary | +| **4** | Small | Medium | Description inheritance lint rules (A050+) | +| **5** | Small | Medium | Enhanced CLI output (chain view, description status) | + +Phase 1 alone fixes the reported bug. Phases 2-3 build the real engine. Phases 4-5 are the polish that makes this best-in-class. + +--- + +## Comparison: Where Feather-Flow Lands After This + +| Capability | dbt Fusion | SQLMesh | Feather-Flow (after) | +| ---------------------------- | ------------------- | -------------------- | ----------------------------------------- | +| Schema source | Inferred from DAG | Must be provided | Mandatory YAML + SA validation | +| Database required | No | No | No | +| Classification | Copy/Transform/Inspect | N/A (add-on) | Copy/Rename/Transform/Inspect | +| Description inheritance | Enterprise only | No | Built-in | +| Description drift detection | No | No | Built-in (Inherited/Modified/Missing) | +| Column qualification | Built into compiler | sqlglot optimizer | SchemaRegistry from validated YAML | +| Cross-model resolution | Built-in | Per-query only | Built-in (recursive BFS) | +| Seeds/sources in lineage | Yes | Depends on config | Yes | +| Validates schemas match SQL | Yes (SA) | No | Yes (SA01/SA02) | +| CTE/derived table handling | Full support needed | Full support needed | Not needed (S005/S006 forbid them) | +| Lineage engine | Custom Rust IR | Python sqlglot | DataFusion LogicalPlan (Rust) | +| Language | Rust | Python | Rust | +| Open source | No (Enterprise) | Yes | Yes | + +--- + +## Test Harness: Complete Column Lineage Map + +This section documents the expected lineage for **every column in the sample project**, traced from raw sources through every model. This is the ground truth for the test harness. + +### DAG Structure + +```text + RAW SOURCES (seeds + source YAML) + ┌──────────────┬──────────────┬──────────────┐ + │ │ │ │ +raw_customers raw_orders raw_products raw_payments + │ │ │ │ │ + ▼ ▼ ▼ ▼ ▼ +stg_customers stg_orders stg_products stg_payments stg_payments_star + │ │ │ │ │ │ │ + │ │ │ │ │ │ │ + ▼ │ ┌────▼────┤ ▼ ▼ ▼ +int_customer │ int_orders dim_products │ + _metrics │ _enriched dim_products │ + │ │ │ │ │ _extended │ + │ ▼ │ │ │ │ + │ int_customer │ │ │ + │ _ranking │ │ │ + │ │ │ │ │ + ▼─────────│──────▼ ▼───────────────────┘ + dim_customers fct_orders int_all_orders + │ int_high_value_orders + │ + rpt_customer_orders + rpt_order_volume (via table function) +``` + +### Classification Rules (for reference) + +- **Copy**: column in SELECT, passed through with same name, no transformation +- **Rename**: column in SELECT, direct reference but aliased to a different name +- **Transform**: column in SELECT, derived from expression/function/aggregation/cast +- **Inspect**: column used in WHERE, JOIN ON, GROUP BY, or HAVING **only** (not in SELECT) + +When a column appears in both SELECT and WHERE/JOIN/GROUP BY, the SELECT classification takes precedence — no separate Inspect edge. + +### Description Status Rules + +- **inherited**: both source and target YAML have descriptions, and they are identical +- **modified**: both source and target YAML have descriptions, and they differ +- **missing**: either source or target (or both) lack a description in YAML + +### Source Column Inventory + +All columns defined in raw source/seed nodes. Most have no YAML descriptions (seeds have no `columns:` section; most source columns in `raw_ecommerce.yml` omit `description:`). + +**raw_customers** (seed + source): + +| Column | Type | Source YAML Description | +| ---------- | ------- | ----------------------- | +| id | INTEGER | (none) | +| name | VARCHAR | (none) | +| email | VARCHAR | (none) | +| created_at | DATE | (none) | +| tier | VARCHAR | (none) | + +**raw_orders** (seed + source): + +| Column | Type | Source YAML Description | +| ---------- | ------------- | -------------------------- | +| id | INTEGER | "Unique order identifier" | +| user_id | INTEGER | (none) | +| created_at | DATE | (none) | +| amount | DECIMAL(10,2) | (none) | +| status | VARCHAR | (none) | + +**raw_products** (seed + source): + +| Column | Type | Source YAML Description | +| -------- | ------------- | ----------------------- | +| id | INTEGER | (none) | +| name | VARCHAR | (none) | +| category | VARCHAR | (none) | +| price | DECIMAL(10,2) | (none) | +| active | BOOLEAN | (none) | + +**raw_payments** (seed + source): + +| Column | Type | Source YAML Description | +| -------------- | ------------- | ----------------------- | +| id | INTEGER | (none) | +| order_id | INTEGER | (none) | +| payment_method | VARCHAR | (none) | +| amount | DECIMAL(10,2) | (none) | +| created_at | DATE | (none) | + +--- + +### Per-Model Edge Matrix + +Every output column for every SQL model, with its immediate upstream source(s). This is the core test data — each row is one expected `LineageEdge`. + +#### stg_customers + +Source: `raw_customers` (single table, no alias, all column refs unqualified) + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ------------- | ------------- | ------------- | --------- | ----------- | +| 1 | customer_id | raw_customers | id | rename | missing | +| 2 | customer_name | raw_customers | name | rename | missing | +| 3 | email | raw_customers | email | copy | missing | +| 4 | signup_date | raw_customers | created_at | rename | missing | +| 5 | customer_tier | raw_customers | tier | rename | missing | + +Inspect edges: none + +#### stg_orders + +Source: `raw_orders` (single table, no alias, WHERE `created_at >= ...`) + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ------------- | ------------ | ------------- | --------- | ----------- | +| 1 | order_id | raw_orders | id | rename | modified | +| 2 | customer_id | raw_orders | user_id | rename | missing | +| 3 | order_date | raw_orders | created_at | rename | missing | +| 4 | amount | raw_orders | amount | copy | missing | +| 5 | status | raw_orders | status | copy | missing | + +Inspect edges: none (`created_at` is in both SELECT and WHERE — SELECT wins as rename) + +Note: edge #1 is the only raw→staging edge where both source and target have descriptions. Source: "Unique order identifier", Target: "Unique identifier for the order" → **modified**. + +#### stg_payments + +Source: `raw_payments` (single table, no alias). After Jinja rendering: `{{ cents_to_dollars("amount") }}` → `amount / 100.0` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ------------- | ------------ | ------------- | --------- | ----------- | +| 1 | payment_id | raw_payments | id | rename | missing | +| 2 | order_id | raw_payments | order_id | copy | missing | +| 3 | amount | raw_payments | amount | transform | missing | + +Inspect edges: none + +Note: `amount / 100.0` is an arithmetic expression → transform (not copy, even though output name matches). + +#### stg_payments_star + +Source: `raw_payments` (single table, `SELECT *`) + +| # | target_column | source_model | source_column | kind | desc_status | +| - | -------------- | ------------ | -------------- | ---- | ----------- | +| 1 | id | raw_payments | id | copy | missing | +| 2 | order_id | raw_payments | order_id | copy | missing | +| 3 | payment_method | raw_payments | payment_method | copy | missing | +| 4 | amount | raw_payments | amount | copy | missing | +| 5 | created_at | raw_payments | created_at | copy | missing | + +Inspect edges: none + +Note: `SELECT *` requires schema-aware expansion. Each column must be resolved using the `SchemaRegistry` for `raw_payments`. This is a key test case for Phase 2. + +#### stg_products + +Source: `raw_products` (single table, no alias) + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ------------- | ------------ | ------------- | --------- | ----------- | +| 1 | product_id | raw_products | id | rename | missing | +| 2 | product_name | raw_products | name | rename | missing | +| 3 | category | raw_products | category | copy | missing | +| 4 | price | raw_products | price | transform | missing | +| 5 | active | raw_products | active | copy | missing | + +Inspect edges: none + +Note: `cast(price as decimal(10, 2)) as price` is a CAST → transform, even though the output name matches the input name. + +#### int_customer_metrics + +Sources: `stg_customers c` JOIN `stg_orders o` ON `c.customer_id = o.customer_id`, GROUP BY `c.customer_id, c.customer_name` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | -------------- | ------------- | ------------- | --------- | ----------- | +| 1 | customer_id | stg_customers | customer_id | copy | modified | +| 2 | customer_name | stg_customers | customer_name | copy | modified | +| 3 | total_orders | stg_orders | order_id | transform | n/a | +| 4 | lifetime_value | stg_orders | amount | transform | n/a | +| 5 | last_order_date| stg_orders | order_date | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | ------------ | ------------- | ------- | +| 1 | stg_orders | customer_id | JOIN ON | + +Description details for select edges: +- #1: "Unique identifier for the customer" → "Unique customer identifier" = **modified** +- #2: "Full name of the customer" → "Customer full name" = **modified** +- #3-5: transform columns — description comparison is n/a (new semantics) + +#### int_customer_ranking + +Sources: `stg_customers c` JOIN `int_customer_metrics m` ON `c.customer_id = m.customer_id` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | -------------- | -------------------- | -------------- | --------- | ----------- | +| 1 | customer_id | stg_customers | customer_id | copy | modified | +| 2 | customer_name | stg_customers | customer_name | copy | modified | +| 3 | lifetime_value | int_customer_metrics | lifetime_value | copy | inherited | +| 4 | value_or_zero | int_customer_metrics | lifetime_value | transform | n/a | +| 5 | nonzero_orders | int_customer_metrics | total_orders | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | -------------------- | ------------- | ------- | +| 1 | int_customer_metrics | customer_id | JOIN ON | + +Description details: +- #1: "Unique identifier for the customer" → "Unique customer identifier" = **modified** +- #2: "Full name of the customer" → "Customer full name" = **modified** +- #3: "Total amount spent across all orders" → "Total amount spent across all orders" = **inherited** + +#### int_orders_enriched + +Sources: `stg_orders o` JOIN `stg_payments p` ON `o.order_id = p.order_id`, GROUP BY `o.order_id, o.customer_id, o.order_date, o.amount, o.status` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ------------- | ------------ | ------------- | --------- | ----------- | +| 1 | order_id | stg_orders | order_id | copy | modified | +| 2 | customer_id | stg_orders | customer_id | copy | modified | +| 3 | order_date | stg_orders | order_date | copy | inherited | +| 4 | order_amount | stg_orders | amount | rename | n/a | +| 5 | status | stg_orders | status | copy | inherited | +| 6 | payment_total | stg_payments | amount | transform | n/a | +| 7 | payment_count | stg_payments | payment_id | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | ------------ | ------------- | ------- | +| 1 | stg_payments | order_id | JOIN ON | + +Description details: +- #1: "Unique identifier for the order" → "Unique order identifier" = **modified** +- #2: "Foreign key to stg_customers" → "Reference to the customer" = **modified** +- #3: "Date the order was placed" → "Date the order was placed" = **inherited** +- #5: "Order status" → "Order status" = **inherited** + +#### int_high_value_orders + +Source: `stg_orders o`, GROUP BY `o.customer_id`, HAVING `sum(o.amount) > 100` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ------------- | ------------ | ------------- | --------- | ----------- | +| 1 | customer_id | stg_orders | customer_id | copy | modified | +| 2 | order_count | stg_orders | order_id | transform | n/a | +| 3 | total_amount | stg_orders | amount | transform | n/a | +| 4 | min_order | stg_orders | amount | transform | n/a | +| 5 | max_order | stg_orders | amount | transform | n/a | +| 6 | avg_order | stg_orders | amount | transform | n/a | + +Inspect edges: none (customer_id in GROUP BY is also in SELECT; amount in HAVING is also in SELECT transforms) + +Description details: +- #1: "Foreign key to stg_customers" → "Customer identifier" = **modified** + +#### int_all_orders (UNION ALL) + +Two branches. Each output column has edges from **both** branches. + +Branch 1: `int_orders_enriched` WHERE `status = 'completed'` +Branch 2: `stg_orders` WHERE `status = 'pending'` + +| # | target_column | source_model (B1) | source_column (B1) | kind (B1) | source_model (B2) | source_column (B2) | kind (B2) | +| -- | ------------- | ------------------- | ------------------ | --------- | ----------------- | ------------------ | --------- | +| 1 | order_id | int_orders_enriched | order_id | copy | stg_orders | order_id | copy | +| 2 | customer_id | int_orders_enriched | customer_id | copy | stg_orders | customer_id | copy | +| 3 | order_date | int_orders_enriched | order_date | copy | stg_orders | order_date | copy | +| 4 | order_amount | int_orders_enriched | order_amount | copy | stg_orders | amount | rename | +| 5 | status | int_orders_enriched | status | copy | stg_orders | status | copy | +| 6 | source | (literal) | — | transform | (literal) | — | transform | + +Inspect edges: none (status in WHERE is also in SELECT) + +Description status (B1 edges): +- #1: "Unique order identifier" → "Unique order identifier" = **inherited** +- #2: "Reference to the customer" → "Customer reference" = **modified** +- #3: "Date the order was placed" → "Date the order was placed" = **inherited** +- #4: "Original order amount" → "Order amount" = **modified** +- #5: "Order status" → "Order status" = **inherited** + +Description status (B2 edges): +- #1: "Unique identifier for the order" → "Unique order identifier" = **modified** +- #2: "Foreign key to stg_customers" → "Customer reference" = **modified** +- #3: "Date the order was placed" → "Date the order was placed" = **inherited** +- #4: "Order total in USD" → "Order amount" = **modified** +- #5: "Order status" → "Order status" = **inherited** + +Note: `source` column is a string literal with no upstream column dependency — transform with zero source columns. + +#### dim_customers + +Sources: `int_customer_metrics m` JOIN `stg_customers c` ON `m.customer_id = c.customer_id` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | -------------- | -------------------- | -------------- | --------- | ----------- | +| 1 | customer_id | int_customer_metrics | customer_id | copy | inherited | +| 2 | customer_name | stg_customers | customer_name | copy | modified | +| 3 | email | stg_customers | email | copy | inherited | +| 4 | signup_date | stg_customers | signup_date | copy | inherited | +| 5 | total_orders | int_customer_metrics | total_orders | copy | inherited | +| 6 | lifetime_value | int_customer_metrics | lifetime_value | copy | inherited | +| 7 | last_order_date| int_customer_metrics | last_order_date| copy | inherited | +| 8 | computed_tier | int_customer_metrics | lifetime_value | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | ------------- | ------------- | ------- | +| 1 | stg_customers | customer_id | JOIN ON | + +Description details: +- #1: "Unique customer identifier" → "Unique customer identifier" = **inherited** +- #2: "Full name of the customer" → "Customer full name" = **modified** +- #3: "Customer email address" → "Customer email address" = **inherited** +- #4: "Date the customer signed up" → "Date the customer signed up" = **inherited** +- #5: "Total number of orders placed" → "Total number of orders placed" = **inherited** +- #6: "Total amount spent across all orders" → "Total amount spent across all orders" = **inherited** +- #7: "Date of the most recent order" → "Date of the most recent order" = **inherited** + +#### dim_products + +Source: `stg_products` (single table, no alias, WHERE `active = true`) + +| # | target_column | source_model | source_column | kind | desc_status | +| - | -------------- | ------------ | ------------- | --------- | ----------- | +| 1 | product_id | stg_products | product_id | copy | inherited | +| 2 | product_name | stg_products | product_name | copy | inherited | +| 3 | category | stg_products | category | copy | modified | +| 4 | price | stg_products | price | copy | inherited | +| 5 | category_group | stg_products | category | transform | n/a | +| 6 | price_tier | stg_products | price | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | ------------ | ------------- | ------- | +| 1 | stg_products | active | WHERE | + +Description details: +- #1: "Unique product identifier" → "Unique product identifier" = **inherited** +- #2: "Product display name" → "Product display name" = **inherited** +- #3: "Product category" → "Original product category" = **modified** +- #4: "Product price in dollars" → "Product price in dollars" = **inherited** + +#### dim_products_extended + +Source: `stg_products` (single table, no alias, SELECT DISTINCT) + +| # | target_column | source_model | source_column | kind | desc_status | +| - | ----------------- | ------------ | ------------- | --------- | ----------- | +| 1 | product_id | stg_products | product_id | copy | inherited | +| 2 | product_name | stg_products | product_name | copy | inherited | +| 3 | category | stg_products | category | copy | modified | +| 4 | price | stg_products | price | copy | inherited | +| 5 | id_scaled | stg_products | product_id | transform | n/a | +| 6 | detailed_category | stg_products | category | transform | n/a | +| 7 | detailed_category | stg_products | price | transform | n/a | + +Inspect edges: none + +Note: `detailed_category` has **two** source columns — both `category` and `price` feed into the nested CASE expression. This produces two edges for the same target column. + +Description details: +- #1: "Unique product identifier" → "Unique product identifier" = **inherited** +- #2: "Product display name" → "Product display name" = **inherited** +- #3: "Product category" → "Original product category" = **modified** +- #4: "Product price in dollars" → "Product price in dollars" = **inherited** + +#### fct_orders + +Sources: `int_orders_enriched e` JOIN `stg_customers c` ON `e.customer_id = c.customer_id` + +| # | target_column | source_model | source_column | kind | desc_status | +| -- | ------------- | ------------------- | ------------- | --------- | ----------- | +| 1 | order_id | int_orders_enriched | order_id | copy | modified | +| 2 | customer_id | int_orders_enriched | customer_id | copy | inherited | +| 3 | customer_name | stg_customers | customer_name | copy | modified | +| 4 | customer_tier | stg_customers | customer_tier | copy | modified | +| 5 | order_date | int_orders_enriched | order_date | copy | inherited | +| 6 | amount | int_orders_enriched | order_amount | rename | n/a | +| 7 | status | int_orders_enriched | status | copy | inherited | +| 8 | payment_total | int_orders_enriched | payment_total | copy | modified | +| 9 | payment_count | int_orders_enriched | payment_count | copy | modified | +| 10 | balance_due | int_orders_enriched | order_amount | transform | n/a | +| 11 | balance_due | int_orders_enriched | payment_total | transform | n/a | +| 12 | payment_ratio | int_orders_enriched | payment_total | transform | n/a | +| 13 | payment_ratio | int_orders_enriched | order_amount | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | ------------- | ------------- | ------- | +| 1 | stg_customers | customer_id | JOIN ON | + +Description details: +- #1: "Unique order identifier" → "Unique identifier for the order" = **modified** +- #2: "Reference to the customer" → "Reference to the customer" = **inherited** +- #3: "Full name of the customer" → "Customer full name" = **modified** +- #4: "Customer tier (gold, silver, bronze)" → "Customer tier at time of order" = **modified** +- #5: "Date the order was placed" → "Date the order was placed" = **inherited** +- #7: "Order status" → "Order status" = **inherited** +- #8: "Total payments received for this order" → "Total payments received" = **modified** +- #9: "Number of payments made for this order" → "Number of payments made" = **modified** + +#### rpt_customer_orders + +Sources: `stg_customers c` JOIN `int_orders_enriched e` ON `c.customer_id = e.customer_id` JOIN `stg_orders o` ON `e.order_id = o.order_id`, WHERE `e.order_amount BETWEEN o.amount AND o.amount` + +| # | target_column | source_model | source_column | kind | desc_status | +| - | --------------- | ------------------- | ------------- | --------- | ----------- | +| 1 | customer_id | stg_customers | customer_id | copy | modified | +| 2 | customer_name | stg_customers | customer_name | copy | modified | +| 3 | email | stg_customers | email | copy | inherited | +| 4 | order_id | int_orders_enriched | order_id | copy | modified | +| 5 | order_amount | int_orders_enriched | order_amount | copy | inherited | +| 6 | payment_total | int_orders_enriched | payment_total | copy | modified | +| 7 | balance_with_fee| int_orders_enriched | order_amount | transform | n/a | +| 8 | balance_with_fee| int_orders_enriched | payment_total | transform | n/a | +| 9 | combined_metric | int_orders_enriched | order_amount | transform | n/a | +| 10| combined_metric | int_orders_enriched | payment_total | transform | n/a | +| 11| combined_metric | int_orders_enriched | payment_count | transform | n/a | + +Inspect edges: + +| # | source_model | source_column | used_in | +| - | ------------------- | ------------- | ------- | +| 1 | int_orders_enriched | customer_id | JOIN ON | +| 2 | stg_orders | order_id | JOIN ON | +| 3 | stg_orders | amount | WHERE | + +Description details: +- #1: "Unique identifier for the customer" → "Unique customer identifier" = **modified** +- #2: "Full name of the customer" → "Customer full name" = **modified** +- #3: "Customer email address" → "Customer email address" = **inherited** +- #4: "Unique order identifier" → "Order identifier" = **modified** +- #5: "Original order amount" → "Original order amount" = **inherited** +- #6: "Total payments received for this order" → "Total payments for order" = **modified** + +#### rpt_order_volume + +Source: `order_volume_by_status({{ var("min_order_count") }})` — table function call + +| # | target_column | source_model | source_column | kind | desc_status | +| - | -------------- | ------------------------ | ------------- | --------- | ----------- | +| 1 | status | order_volume_by_status | status | copy | n/a | +| 2 | order_count | order_volume_by_status | order_count | copy | n/a | +| 3 | pct_of_hundred | order_volume_by_status | order_count | transform | n/a | + +**Special case**: lineage stops at the table function boundary. The function body internally reads from `fct_orders`, but tracing through function definitions is a future enhancement. Test assertions should verify edges exist from the function output; cross-function lineage is out of scope for initial implementation. + +--- + +### Full Upstream Chains: Every Mart/Report Column to Raw Sources + +These chains trace every terminal column all the way back to raw_ sources. Each line is one hop. The test harness should verify that `trace_column_recursive(model, column)` returns exactly these edges. + +#### dim_customers + +**dim_customers.customer_id**: + +```text +raw_customers.id ──rename──▶ stg_customers.customer_id +stg_customers.customer_id ──copy──▶ int_customer_metrics.customer_id +int_customer_metrics.customer_id ──copy──▶ dim_customers.customer_id +``` + +**dim_customers.customer_name**: + +```text +raw_customers.name ──rename──▶ stg_customers.customer_name +stg_customers.customer_name ──copy──▶ dim_customers.customer_name +``` + +**dim_customers.email**: + +```text +raw_customers.email ──copy──▶ stg_customers.email +stg_customers.email ──copy──▶ dim_customers.email +``` + +**dim_customers.signup_date**: + +```text +raw_customers.created_at ──rename──▶ stg_customers.signup_date +stg_customers.signup_date ──copy──▶ dim_customers.signup_date +``` + +**dim_customers.total_orders**: + +```text +raw_orders.id ──rename──▶ stg_orders.order_id +stg_orders.order_id ──transform(count)──▶ int_customer_metrics.total_orders +int_customer_metrics.total_orders ──copy──▶ dim_customers.total_orders +``` + +**dim_customers.lifetime_value**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──transform(coalesce+sum)──▶ int_customer_metrics.lifetime_value +int_customer_metrics.lifetime_value ──copy──▶ dim_customers.lifetime_value +``` + +**dim_customers.last_order_date**: + +```text +raw_orders.created_at ──rename──▶ stg_orders.order_date +stg_orders.order_date ──transform(max)──▶ int_customer_metrics.last_order_date +int_customer_metrics.last_order_date ──copy──▶ dim_customers.last_order_date +``` + +**dim_customers.computed_tier**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──transform(coalesce+sum)──▶ int_customer_metrics.lifetime_value +int_customer_metrics.lifetime_value ──transform(case)──▶ dim_customers.computed_tier +``` + +#### fct_orders + +**fct_orders.order_id**: + +```text +raw_orders.id ──rename──▶ stg_orders.order_id +stg_orders.order_id ──copy──▶ int_orders_enriched.order_id +int_orders_enriched.order_id ──copy──▶ fct_orders.order_id +``` + +**fct_orders.customer_id**: + +```text +raw_orders.user_id ──rename──▶ stg_orders.customer_id +stg_orders.customer_id ──copy──▶ int_orders_enriched.customer_id +int_orders_enriched.customer_id ──copy──▶ fct_orders.customer_id +``` + +**fct_orders.customer_name**: + +```text +raw_customers.name ──rename──▶ stg_customers.customer_name +stg_customers.customer_name ──copy──▶ fct_orders.customer_name +``` + +**fct_orders.customer_tier**: + +```text +raw_customers.tier ──rename──▶ stg_customers.customer_tier +stg_customers.customer_tier ──copy──▶ fct_orders.customer_tier +``` + +**fct_orders.order_date**: + +```text +raw_orders.created_at ──rename──▶ stg_orders.order_date +stg_orders.order_date ──copy──▶ int_orders_enriched.order_date +int_orders_enriched.order_date ──copy──▶ fct_orders.order_date +``` + +**fct_orders.amount**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──rename──▶ int_orders_enriched.order_amount +int_orders_enriched.order_amount ──rename──▶ fct_orders.amount +``` + +**fct_orders.status**: + +```text +raw_orders.status ──copy──▶ stg_orders.status +stg_orders.status ──copy──▶ int_orders_enriched.status +int_orders_enriched.status ──copy──▶ fct_orders.status +``` + +**fct_orders.payment_total**: + +```text +raw_payments.amount ──transform(cents_to_dollars)──▶ stg_payments.amount +stg_payments.amount ──transform(coalesce+sum)──▶ int_orders_enriched.payment_total +int_orders_enriched.payment_total ──copy──▶ fct_orders.payment_total +``` + +**fct_orders.payment_count**: + +```text +raw_payments.id ──rename──▶ stg_payments.payment_id +stg_payments.payment_id ──transform(count)──▶ int_orders_enriched.payment_count +int_orders_enriched.payment_count ──copy──▶ fct_orders.payment_count +``` + +**fct_orders.balance_due** (multi-source transform): + +```text +Path A: + raw_orders.amount ──copy──▶ stg_orders.amount + stg_orders.amount ──rename──▶ int_orders_enriched.order_amount + int_orders_enriched.order_amount ──transform(subtraction)──▶ fct_orders.balance_due + +Path B: + raw_payments.amount ──transform(cents_to_dollars)──▶ stg_payments.amount + stg_payments.amount ──transform(coalesce+sum)──▶ int_orders_enriched.payment_total + int_orders_enriched.payment_total ──transform(subtraction)──▶ fct_orders.balance_due +``` + +**fct_orders.payment_ratio** (multi-source transform): + +```text +Path A: + raw_payments.amount ──transform(cents_to_dollars)──▶ stg_payments.amount + stg_payments.amount ──transform(coalesce+sum)──▶ int_orders_enriched.payment_total + int_orders_enriched.payment_total ──transform(safe_divide)──▶ fct_orders.payment_ratio + +Path B: + raw_orders.amount ──copy──▶ stg_orders.amount + stg_orders.amount ──rename──▶ int_orders_enriched.order_amount + int_orders_enriched.order_amount ──transform(safe_divide)──▶ fct_orders.payment_ratio +``` + +#### dim_products + +**dim_products.product_id**: + +```text +raw_products.id ──rename──▶ stg_products.product_id +stg_products.product_id ──copy──▶ dim_products.product_id +``` + +**dim_products.product_name**: + +```text +raw_products.name ──rename──▶ stg_products.product_name +stg_products.product_name ──copy──▶ dim_products.product_name +``` + +**dim_products.category**: + +```text +raw_products.category ──copy──▶ stg_products.category +stg_products.category ──copy──▶ dim_products.category +``` + +**dim_products.price**: + +```text +raw_products.price ──transform(cast)──▶ stg_products.price +stg_products.price ──copy──▶ dim_products.price +``` + +**dim_products.category_group**: + +```text +raw_products.category ──copy──▶ stg_products.category +stg_products.category ──transform(case)──▶ dim_products.category_group +``` + +**dim_products.price_tier**: + +```text +raw_products.price ──transform(cast)──▶ stg_products.price +stg_products.price ──transform(case)──▶ dim_products.price_tier +``` + +#### dim_products_extended + +**dim_products_extended.product_id**: + +```text +raw_products.id ──rename──▶ stg_products.product_id +stg_products.product_id ──copy──▶ dim_products_extended.product_id +``` + +**dim_products_extended.product_name**: + +```text +raw_products.name ──rename──▶ stg_products.product_name +stg_products.product_name ──copy──▶ dim_products_extended.product_name +``` + +**dim_products_extended.category**: + +```text +raw_products.category ──copy──▶ stg_products.category +stg_products.category ──copy──▶ dim_products_extended.category +``` + +**dim_products_extended.price**: + +```text +raw_products.price ──transform(cast)──▶ stg_products.price +stg_products.price ──copy──▶ dim_products_extended.price +``` + +**dim_products_extended.id_scaled**: + +```text +raw_products.id ──rename──▶ stg_products.product_id +stg_products.product_id ──transform(cast+multiply)──▶ dim_products_extended.id_scaled +``` + +**dim_products_extended.detailed_category** (multi-source transform): + +```text +Path A: + raw_products.category ──copy──▶ stg_products.category + stg_products.category ──transform(nested case)──▶ dim_products_extended.detailed_category + +Path B: + raw_products.price ──transform(cast)──▶ stg_products.price + stg_products.price ──transform(nested case)──▶ dim_products_extended.detailed_category +``` + +#### rpt_customer_orders + +**rpt_customer_orders.customer_id**: + +```text +raw_customers.id ──rename──▶ stg_customers.customer_id +stg_customers.customer_id ──copy──▶ rpt_customer_orders.customer_id +``` + +**rpt_customer_orders.customer_name**: + +```text +raw_customers.name ──rename──▶ stg_customers.customer_name +stg_customers.customer_name ──copy──▶ rpt_customer_orders.customer_name +``` + +**rpt_customer_orders.email**: + +```text +raw_customers.email ──copy──▶ stg_customers.email +stg_customers.email ──copy──▶ rpt_customer_orders.email +``` + +**rpt_customer_orders.order_id**: + +```text +raw_orders.id ──rename──▶ stg_orders.order_id +stg_orders.order_id ──copy──▶ int_orders_enriched.order_id +int_orders_enriched.order_id ──copy──▶ rpt_customer_orders.order_id +``` + +**rpt_customer_orders.order_amount**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──rename──▶ int_orders_enriched.order_amount +int_orders_enriched.order_amount ──copy──▶ rpt_customer_orders.order_amount +``` + +**rpt_customer_orders.payment_total**: + +```text +raw_payments.amount ──transform(cents_to_dollars)──▶ stg_payments.amount +stg_payments.amount ──transform(coalesce+sum)──▶ int_orders_enriched.payment_total +int_orders_enriched.payment_total ──copy──▶ rpt_customer_orders.payment_total +``` + +**rpt_customer_orders.balance_with_fee** (multi-source transform): + +```text +Path A: + raw_orders.amount ──copy──▶ stg_orders.amount + stg_orders.amount ──rename──▶ int_orders_enriched.order_amount + int_orders_enriched.order_amount ──transform(expression)──▶ rpt_customer_orders.balance_with_fee + +Path B: + raw_payments.amount ──transform(cents_to_dollars)──▶ stg_payments.amount + stg_payments.amount ──transform(coalesce+sum)──▶ int_orders_enriched.payment_total + int_orders_enriched.payment_total ──transform(expression)──▶ rpt_customer_orders.balance_with_fee +``` + +**rpt_customer_orders.combined_metric** (multi-source transform): + +```text +Path A: int_orders_enriched.order_amount (same chain as balance_with_fee Path A) +Path B: int_orders_enriched.payment_total (same chain as balance_with_fee Path B) +Path C: + raw_payments.id ──rename──▶ stg_payments.payment_id + stg_payments.payment_id ──transform(count)──▶ int_orders_enriched.payment_count + int_orders_enriched.payment_count ──transform(expression)──▶ rpt_customer_orders.combined_metric +``` + +#### int_customer_ranking (the original bug report) + +**int_customer_ranking.customer_id**: + +```text +raw_customers.id ──rename──▶ stg_customers.customer_id +stg_customers.customer_id ──copy──▶ int_customer_ranking.customer_id +``` + +**int_customer_ranking.customer_name**: + +```text +raw_customers.name ──rename──▶ stg_customers.customer_name +stg_customers.customer_name ──copy──▶ int_customer_ranking.customer_name +``` + +**int_customer_ranking.lifetime_value**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──transform(coalesce+sum)──▶ int_customer_metrics.lifetime_value +int_customer_metrics.lifetime_value ──copy──▶ int_customer_ranking.lifetime_value +``` + +**int_customer_ranking.value_or_zero**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──transform(coalesce+sum)──▶ int_customer_metrics.lifetime_value +int_customer_metrics.lifetime_value ──transform(coalesce)──▶ int_customer_ranking.value_or_zero +``` + +**int_customer_ranking.nonzero_orders**: + +```text +raw_orders.id ──rename──▶ stg_orders.order_id +stg_orders.order_id ──transform(count)──▶ int_customer_metrics.total_orders +int_customer_metrics.total_orders ──transform(nullif)──▶ int_customer_ranking.nonzero_orders +``` + +--- + +### Downstream Trace Test Cases + +The test harness should also verify `column_consumers_recursive`. Key downstream traces: + +**raw_customers.id downstream**: + +```text +raw_customers.id ──rename──▶ stg_customers.customer_id +stg_customers.customer_id ──copy──▶ int_customer_metrics.customer_id +stg_customers.customer_id ──copy──▶ int_customer_ranking.customer_id +stg_customers.customer_id ──copy──▶ rpt_customer_orders.customer_id +stg_customers.customer_id ──inspect──▶ dim_customers (JOIN ON) +stg_customers.customer_id ──inspect──▶ fct_orders (JOIN ON) +int_customer_metrics.customer_id ──copy──▶ dim_customers.customer_id +int_customer_metrics.customer_id ──inspect──▶ int_customer_ranking (JOIN ON) +``` + +**raw_orders.amount downstream**: + +```text +raw_orders.amount ──copy──▶ stg_orders.amount +stg_orders.amount ──rename──▶ int_orders_enriched.order_amount +stg_orders.amount ──transform──▶ int_customer_metrics.lifetime_value +stg_orders.amount ──transform──▶ int_high_value_orders.total_amount +stg_orders.amount ──transform──▶ int_high_value_orders.min_order +stg_orders.amount ──transform──▶ int_high_value_orders.max_order +stg_orders.amount ──transform──▶ int_high_value_orders.avg_order +stg_orders.amount ──rename──▶ int_all_orders.order_amount (B2) +stg_orders.amount ──inspect──▶ rpt_customer_orders (WHERE) +int_orders_enriched.order_amount ──rename──▶ fct_orders.amount +int_orders_enriched.order_amount ──transform──▶ fct_orders.balance_due +int_orders_enriched.order_amount ──transform──▶ fct_orders.payment_ratio +int_orders_enriched.order_amount ──copy──▶ int_all_orders.order_amount (B1) +int_orders_enriched.order_amount ──copy──▶ rpt_customer_orders.order_amount +int_orders_enriched.order_amount ──transform──▶ rpt_customer_orders.balance_with_fee +int_orders_enriched.order_amount ──transform──▶ rpt_customer_orders.combined_metric +int_customer_metrics.lifetime_value ──copy──▶ dim_customers.lifetime_value +int_customer_metrics.lifetime_value ──transform──▶ dim_customers.computed_tier +int_customer_metrics.lifetime_value ──copy──▶ int_customer_ranking.lifetime_value +int_customer_metrics.lifetime_value ──transform──▶ int_customer_ranking.value_or_zero +``` + +--- + +### Edge Count Summary + +Expected total SELECT edges per model (for test assertions): + +| Model | SELECT Edges | Inspect Edges | Notes | +| ---------------------- | ------------ | ------------- | ------------------------------ | +| stg_customers | 5 | 0 | | +| stg_orders | 5 | 0 | | +| stg_payments | 3 | 0 | | +| stg_payments_star | 5 | 0 | SELECT * expansion | +| stg_products | 5 | 0 | | +| int_customer_metrics | 5 | 1 | | +| int_customer_ranking | 5 | 1 | | +| int_orders_enriched | 7 | 1 | | +| int_high_value_orders | 6 | 0 | | +| int_all_orders | 10 | 0 | 5 cols x 2 branches, +2 literals | +| dim_customers | 8 | 1 | | +| dim_products | 6 | 1 | | +| dim_products_extended | 7 | 0 | detailed_category has 2 sources| +| fct_orders | 13 | 1 | balance_due, payment_ratio multi-source | +| rpt_customer_orders | 11 | 3 | balance_with_fee, combined_metric multi-source | +| rpt_order_volume | 3 | 0 | Table function boundary | +| **TOTAL** | **104** | **8** | | + +Note: int_all_orders `source` column (literal) has no upstream model edge — it's 2 literal transforms. The 10 count includes 5 columns x 2 branches for the non-literal columns, but `source` has 0 model edges (just literals). Adjust to **10 edges from models** (5 per branch) + 0 for literals = 10 model edges. The 2 literal entries would be transform edges with no source_model. + +--- + +### Special Cases for Test Coverage + +#### 1. Unqualified Column References (Phase 1 fix) + +Models where SQL uses bare column names without table prefix. The column qualification pass must resolve these using `source_tables`: + +- **stg_customers**: all 5 columns are unqualified (`id`, `name`, `email`, `created_at`, `tier`) — single source table `raw_customers` +- **stg_orders**: all 5 columns are unqualified — single source table `raw_orders` +- **stg_payments**: all 3 columns are unqualified — single source table `raw_payments` +- **stg_products**: all 5 columns are unqualified — single source table `raw_products` +- **stg_payments_star**: `SELECT *` — requires schema expansion +- **dim_products**: `product_id`, `product_name`, `category`, `price`, `active` are unqualified — single source table `stg_products` +- **dim_products_extended**: same as dim_products — unqualified, single source +- **int_all_orders** branch 1: `order_id`, `customer_id`, `order_date`, `order_amount`, `status` are unqualified — single source table `int_orders_enriched` + +#### 2. Seeds and Sources in known_nodes (Phase 1 fix) + +These models reference seeds/sources that must be in `known_nodes`: + +- **stg_customers** → `raw_customers` (seed) +- **stg_orders** → `raw_orders` (seed) +- **stg_payments** → `raw_payments` (seed) +- **stg_payments_star** → `raw_payments` (seed) +- **stg_products** → `raw_products` (seed) + +#### 3. SELECT * Expansion (Phase 2) + +- **stg_payments_star**: `SELECT * FROM raw_payments` must expand to 5 columns using `SchemaRegistry` for `raw_payments`: `id`, `order_id`, `payment_method`, `amount`, `created_at` + +#### 4. UNION ALL (positional matching) + +- **int_all_orders**: columns matched positionally across branches. Branch 2 `amount` maps to output `order_amount` (rename). Test must verify both branches produce edges. + +#### 5. Multi-Source Transform Columns + +Columns with edges from 2+ source columns: + +- **fct_orders.balance_due**: `order_amount - payment_total` +- **fct_orders.payment_ratio**: `safe_divide(payment_total, order_amount)` +- **rpt_customer_orders.balance_with_fee**: `(order_amount - payment_total) * 1.1` +- **rpt_customer_orders.combined_metric**: `order_amount + payment_total + payment_count` +- **dim_products_extended.detailed_category**: `CASE WHEN category ... WHEN price ...` + +#### 6. Table Function Boundary + +- **rpt_order_volume**: lineage stops at `order_volume_by_status` function output. Does not trace through to `fct_orders` (future enhancement). + +#### 7. Jinja-Rendered Function Call + +- **stg_payments**: `{{ cents_to_dollars("amount") }}` renders to `amount / 100.0` — the lineage must be extracted from the **rendered** SQL, not the Jinja template. + +#### 8. Inspect-Only Columns + +Columns used in WHERE/JOIN/GROUP BY but NOT in SELECT: + +| Model | Inspect Column | Source Model | Used In | +| -------------------- | ---------------------------------- | ------------------- | ------- | +| int_customer_metrics | stg_orders.customer_id | stg_orders | JOIN ON | +| int_customer_ranking | int_customer_metrics.customer_id | int_customer_metrics| JOIN ON | +| int_orders_enriched | stg_payments.order_id | stg_payments | JOIN ON | +| dim_customers | stg_customers.customer_id | stg_customers | JOIN ON | +| dim_products | stg_products.active | stg_products | WHERE | +| fct_orders | stg_customers.customer_id | stg_customers | JOIN ON | +| rpt_customer_orders | int_orders_enriched.customer_id | int_orders_enriched | JOIN ON | +| rpt_customer_orders | stg_orders.order_id | stg_orders | JOIN ON | +| rpt_customer_orders | stg_orders.amount | stg_orders | WHERE | + +--- + +### Test Assertion Format + +Each test should assert against the per-model edge matrix above. Suggested Rust test structure: + +```rust +#[test] +fn test_stg_customers_lineage_edges() { + let project_lineage = build_sample_project_lineage(); + + // SELECT edges + assert_edge(&project_lineage, "raw_customers", "id", "stg_customers", "customer_id", LineageKind::Rename); + assert_edge(&project_lineage, "raw_customers", "name", "stg_customers", "customer_name", LineageKind::Rename); + assert_edge(&project_lineage, "raw_customers", "email", "stg_customers", "email", LineageKind::Copy); + assert_edge(&project_lineage, "raw_customers", "created_at", "stg_customers", "signup_date", LineageKind::Rename); + assert_edge(&project_lineage, "raw_customers", "tier", "stg_customers", "customer_tier", LineageKind::Rename); + + // No inspect edges + assert_no_inspect_edges(&project_lineage, "stg_customers"); + + // Edge count + assert_select_edge_count(&project_lineage, "stg_customers", 5); +} + +#[test] +fn test_full_upstream_chain_dim_customers_customer_id() { + let project_lineage = build_sample_project_lineage(); + let chain = project_lineage.trace_column_recursive("dim_customers", "customer_id"); + + assert_eq!(chain.len(), 3); + assert_chain_contains(&chain, "raw_customers", "id", "stg_customers", "customer_id", LineageKind::Rename); + assert_chain_contains(&chain, "stg_customers", "customer_id", "int_customer_metrics", "customer_id", LineageKind::Copy); + assert_chain_contains(&chain, "int_customer_metrics", "customer_id", "dim_customers", "customer_id", LineageKind::Copy); +} + +#[test] +fn test_description_status_tracking() { + let project_lineage = build_sample_project_lineage(); + + // Inherited: exact match + assert_desc_status(&project_lineage, "int_customer_metrics", "lifetime_value", "dim_customers", "lifetime_value", DescriptionStatus::Inherited); + + // Modified: both exist, differ + assert_desc_status(&project_lineage, "stg_customers", "customer_name", "dim_customers", "customer_name", DescriptionStatus::Modified); + + // Missing: source has no description + assert_desc_status(&project_lineage, "raw_customers", "id", "stg_customers", "customer_id", DescriptionStatus::Missing); +} +```