From bb2dac86292f6223938375e1f78ccc6aa388346d Mon Sep 17 00:00:00 2001 From: brian moore Date: Thu, 19 Feb 2026 20:50:47 -0500 Subject: [PATCH] bm/cleanup-cleanup --- crates/ff-analysis/src/pass/expr_utils.rs | 11 +- .../src/pass/plan_unused_columns.rs | 19 +-- crates/ff-analysis/src/schema.rs | 2 - crates/ff-cli/src/commands/compile.rs | 77 ++++++----- crates/ff-cli/src/commands/docs/generate.rs | 121 ++++++++---------- crates/ff-cli/src/commands/parse.rs | 10 +- crates/ff-cli/src/commands/run/compile.rs | 36 +++--- crates/ff-cli/src/commands/run/execute.rs | 34 ++--- crates/ff-core/src/dag.rs | 15 +-- crates/ff-core/src/project/loading.rs | 53 ++++---- crates/ff-meta/src/lib.rs | 1 + crates/ff-sql/src/dialect.rs | 2 +- crates/ff-sql/src/inline.rs | 2 +- crates/ff-sql/src/lineage.rs | 56 +++++--- 14 files changed, 228 insertions(+), 211 deletions(-) diff --git a/crates/ff-analysis/src/pass/expr_utils.rs b/crates/ff-analysis/src/pass/expr_utils.rs index 7776ee10..2addb8ed 100644 --- a/crates/ff-analysis/src/pass/expr_utils.rs +++ b/crates/ff-analysis/src/pass/expr_utils.rs @@ -35,16 +35,7 @@ where Expr::Cast(cast) => walk_expr_columns(&cast.expr, collector), Expr::TryCast(try_cast) => walk_expr_columns(&try_cast.expr, collector), Expr::Case(case) => { - if let Some(ref operand) = case.expr { - walk_expr_columns(operand, collector); - } - for (when, then) in &case.when_then_expr { - walk_expr_columns(when, collector); - walk_expr_columns(then, collector); - } - if let Some(ref else_expr) = case.else_expr { - walk_expr_columns(else_expr, collector); - } + for_each_case_subexpr(case, |e| walk_expr_columns(e, collector)); } Expr::IsNull(inner) | Expr::IsNotNull(inner) | Expr::Not(inner) | Expr::Negative(inner) => { walk_expr_columns(inner, collector); diff --git a/crates/ff-analysis/src/pass/plan_unused_columns.rs b/crates/ff-analysis/src/pass/plan_unused_columns.rs index 8c6e2930..f7a0ff6d 100644 --- a/crates/ff-analysis/src/pass/plan_unused_columns.rs +++ b/crates/ff-analysis/src/pass/plan_unused_columns.rs @@ -113,13 +113,18 @@ fn collect_consumed_columns( consumed } +/// Collect lowercased column refs from a slice of expressions. +fn collect_refs_from_exprs(exprs: &[Expr], consumed: &mut HashSet) { + for expr in exprs { + collect_column_refs_lowercase(expr, consumed); + } +} + /// Walk a LogicalPlan tree and collect referenced column names (lowercased) fn collect_column_refs_from_plan(plan: &LogicalPlan, consumed: &mut HashSet) { match plan { LogicalPlan::Projection(proj) => { - for expr in &proj.expr { - collect_column_refs_lowercase(expr, consumed); - } + collect_refs_from_exprs(&proj.expr, consumed); collect_column_refs_from_plan(&proj.input, consumed); } LogicalPlan::Filter(filter) => { @@ -132,12 +137,8 @@ fn collect_column_refs_from_plan(plan: &LogicalPlan, consumed: &mut HashSet { - for expr in &agg.group_expr { - collect_column_refs_lowercase(expr, consumed); - } - for expr in &agg.aggr_expr { - collect_column_refs_lowercase(expr, consumed); - } + collect_refs_from_exprs(&agg.group_expr, consumed); + collect_refs_from_exprs(&agg.aggr_expr, consumed); collect_column_refs_from_plan(&agg.input, consumed); } LogicalPlan::Sort(sort) => { diff --git a/crates/ff-analysis/src/schema.rs b/crates/ff-analysis/src/schema.rs index c2228a5e..9d172e5b 100644 --- a/crates/ff-analysis/src/schema.rs +++ b/crates/ff-analysis/src/schema.rs @@ -47,7 +47,6 @@ impl RelSchema { /// If `source_table` metadata is available, filters by it first. /// Falls back to column-name-only lookup when table info is missing. pub fn find_qualified(&self, table: &str, column: &str) -> Option<&TypedColumn> { - // Try to find a column that matches both table and name let qualified_match = self.columns.iter().find(|c| { c.name.eq_ignore_ascii_case(column) && c.source_table @@ -55,7 +54,6 @@ impl RelSchema { .is_some_and(|t| t.eq_ignore_ascii_case(table)) }); - // Fall back to column-name-only if no qualified match qualified_match.or_else(|| self.find_column(column)) } diff --git a/crates/ff-cli/src/commands/compile.rs b/crates/ff-cli/src/commands/compile.rs index 7b15b308..742a4b10 100644 --- a/crates/ff-cli/src/commands/compile.rs +++ b/crates/ff-cli/src/commands/compile.rs @@ -654,6 +654,49 @@ fn compute_compiled_path( Ok(output_dir.join(filename)) } +/// Populate meta database rows for a single successfully compiled model. +fn populate_single_model_compile( + conn: &ff_meta::DuckDbConnection, + project: &Project, + result: &ModelCompileResult, + model_id: i64, + dependencies: &HashMap>, + model_id_map: &HashMap, +) -> ff_meta::MetaResult<()> { + let compiled_sql = project + .get_model(&result.model) + .and_then(|m| m.compiled_sql.as_deref()) + .unwrap_or(""); + let compiled_path = result.output_path.as_deref().unwrap_or(""); + let checksum = ff_core::compute_checksum(compiled_sql); + + ff_meta::populate::compilation::update_model_compiled( + conn, + model_id, + compiled_sql, + compiled_path, + &checksum, + )?; + + if let Some(deps) = dependencies.get(&result.model) { + let dep_ids: Vec = deps + .iter() + .filter_map(|d| model_id_map.get(d.as_str()).copied()) + .collect(); + ff_meta::populate::compilation::populate_dependencies(conn, model_id, &dep_ids)?; + } + + let ext_deps: Vec<&str> = project + .get_model(&result.model) + .map(|m| m.external_deps.iter().map(|t| t.as_ref()).collect()) + .unwrap_or_default(); + if !ext_deps.is_empty() { + ff_meta::populate::compilation::populate_external_dependencies(conn, model_id, &ext_deps)?; + } + + Ok(()) +} + /// Populate the meta database with compile-phase data (non-fatal). fn populate_meta_compile( project: &Project, @@ -679,38 +722,14 @@ fn populate_meta_compile( let Some(&model_id) = model_id_map.get(result.model.as_str()) else { continue; }; - let compiled_sql = project - .get_model(&result.model) - .and_then(|m| m.compiled_sql.as_deref()) - .unwrap_or(""); - let compiled_path = result.output_path.as_deref().unwrap_or(""); - let checksum = ff_core::compute_checksum(compiled_sql); - - ff_meta::populate::compilation::update_model_compiled( + populate_single_model_compile( conn, + project, + result, model_id, - compiled_sql, - compiled_path, - &checksum, + dependencies, + &model_id_map, )?; - - if let Some(deps) = dependencies.get(&result.model) { - let dep_ids: Vec = deps - .iter() - .filter_map(|d| model_id_map.get(d.as_str()).copied()) - .collect(); - ff_meta::populate::compilation::populate_dependencies(conn, model_id, &dep_ids)?; - } - - let ext_deps: Vec<&str> = project - .get_model(&result.model) - .map(|m| m.external_deps.iter().map(|t| t.as_ref()).collect()) - .unwrap_or_default(); - if !ext_deps.is_empty() { - ff_meta::populate::compilation::populate_external_dependencies( - conn, model_id, &ext_deps, - )?; - } } Ok(()) }); diff --git a/crates/ff-cli/src/commands/docs/generate.rs b/crates/ff-cli/src/commands/docs/generate.rs index fe84be05..9111ac1c 100644 --- a/crates/ff-cli/src/commands/docs/generate.rs +++ b/crates/ff-cli/src/commands/docs/generate.rs @@ -914,75 +914,66 @@ fn generate_lineage_dot(project: &Project) -> String { } } - dot.push_str( - "\n // Model nodes (blue for views, green for tables, gold for incremental, gray for ephemeral)\n", - ); - if let Some(ref manifest) = manifest { - for (name, model) in &manifest.models { - let color = match model.materialized { - ff_core::config::Materialization::Table => COLOR_TABLE, - ff_core::config::Materialization::View => COLOR_VIEW, - ff_core::config::Materialization::Incremental => COLOR_INCREMENTAL, - ff_core::config::Materialization::Ephemeral => COLOR_EPHEMERAL, - }; - dot.push_str(&format!( - " \"{}\" [label=\"{}\" fillcolor=\"{}\"];\n", - name, name, color - )); - } - } else { - for (name, model) in &project.models { - let mat = model.materialization(project.config.materialization); - let color = match mat { - ff_core::config::Materialization::Table => COLOR_TABLE, - ff_core::config::Materialization::View => COLOR_VIEW, - ff_core::config::Materialization::Incremental => COLOR_INCREMENTAL, - ff_core::config::Materialization::Ephemeral => COLOR_EPHEMERAL, - }; - dot.push_str(&format!( - " \"{}\" [label=\"{}\" fillcolor=\"{}\"];\n", - name, name, color - )); - } + dot.push('\n'); + + // Collect model info from manifest (preferred) or project (fallback) + struct ModelDotInfo<'a> { + name: &'a str, + materialization: ff_core::config::Materialization, + depends_on: Vec<&'a str>, + external_deps: Vec<&'a str>, } + let model_infos: Vec> = if let Some(ref manifest) = manifest { + manifest + .models + .iter() + .map(|(name, model)| ModelDotInfo { + name: name.as_str(), + materialization: model.materialized, + depends_on: model.depends_on.iter().map(|d| d.as_str()).collect(), + external_deps: model.external_deps.iter().map(|e| e.as_str()).collect(), + }) + .collect() + } else { + project + .models + .iter() + .map(|(name, model)| ModelDotInfo { + name: name.as_str(), + materialization: model.materialization(project.config.materialization), + depends_on: model.depends_on.iter().map(|d| d.as_str()).collect(), + external_deps: model.external_deps.iter().map(|e| e.as_ref()).collect(), + }) + .collect() + }; - dot.push_str("\n // Dependencies (edges)\n"); - if let Some(ref manifest) = manifest { - // Use manifest for accurate dependencies - for (name, model) in &manifest.models { - for dep in &model.depends_on { - dot.push_str(&format!(" \"{}\" -> \"{}\";\n", dep, name)); - } - - for ext in &model.external_deps { - let source_node = project - .sources - .iter() - .flat_map(|s| s.tables.iter().map(move |t| (s, t))) - .find(|(_, t)| *ext == t.name) - .map(|(s, t)| format!("{}_{}", s.name, t.name)) - .unwrap_or_else(|| ext.to_string()); + for info in &model_infos { + let color = match info.materialization { + ff_core::config::Materialization::Table => COLOR_TABLE, + ff_core::config::Materialization::View => COLOR_VIEW, + ff_core::config::Materialization::Incremental => COLOR_INCREMENTAL, + ff_core::config::Materialization::Ephemeral => COLOR_EPHEMERAL, + }; + dot.push_str(&format!( + " \"{}\" [label=\"{}\" fillcolor=\"{}\"];\n", + info.name, info.name, color + )); + } - dot.push_str(&format!(" \"{}\" -> \"{}\";\n", source_node, name)); - } + dot.push('\n'); + for info in &model_infos { + for dep in &info.depends_on { + dot.push_str(&format!(" \"{}\" -> \"{}\";\n", dep, info.name)); } - } else { - // Fall back to project model info (may be incomplete) - for (name, model) in &project.models { - for dep in &model.depends_on { - dot.push_str(&format!(" \"{}\" -> \"{}\";\n", dep, name)); - } - for ext in &model.external_deps { - let source_node = project - .sources - .iter() - .flat_map(|s| s.tables.iter().map(move |t| (s, t))) - .find(|(_, t)| *ext == t.name) - .map(|(s, t)| format!("{}_{}", s.name, t.name)) - .unwrap_or_else(|| ext.to_string()); - - dot.push_str(&format!(" \"{}\" -> \"{}\";\n", source_node, name)); - } + for ext in &info.external_deps { + let source_node = project + .sources + .iter() + .flat_map(|s| s.tables.iter().map(move |t| (s, t))) + .find(|(_, t)| *ext == t.name) + .map(|(s, t)| format!("{}_{}", s.name, t.name)) + .unwrap_or_else(|| ext.to_string()); + dot.push_str(&format!(" \"{}\" -> \"{}\";\n", source_node, info.name)); } } diff --git a/crates/ff-cli/src/commands/parse.rs b/crates/ff-cli/src/commands/parse.rs index 4687bca8..d4f59467 100644 --- a/crates/ff-cli/src/commands/parse.rs +++ b/crates/ff-cli/src/commands/parse.rs @@ -44,12 +44,10 @@ pub(crate) async fn execute(args: &ParseArgs, global: &GlobalArgs) -> Result<()> .with_context(|| format!("Failed to parse SQL for model: {}", name))?; let deps = extract_dependencies(&statements); + let all_tables: Vec = deps.iter().cloned().collect(); - let (model_deps, ext_deps) = ff_sql::extractor::categorize_dependencies( - deps.clone(), - &known_models, - &external_tables, - ); + let (model_deps, ext_deps) = + ff_sql::extractor::categorize_dependencies(deps, &known_models, &external_tables); dep_map.insert(name.clone(), model_deps.clone()); @@ -58,7 +56,7 @@ pub(crate) async fn execute(args: &ParseArgs, global: &GlobalArgs) -> Result<()> path: model.path.display().to_string(), model_dependencies: model_deps, external_dependencies: ext_deps, - all_tables: deps.into_iter().collect(), + all_tables, }); } diff --git a/crates/ff-cli/src/commands/run/compile.rs b/crates/ff-cli/src/commands/run/compile.rs index a10d42bc..1c5c99bc 100644 --- a/crates/ff-cli/src/commands/run/compile.rs +++ b/crates/ff-cli/src/commands/run/compile.rs @@ -342,24 +342,24 @@ fn resolve_deferred_dependencies( // Also check for transitive dependencies of deferred models let mut to_check: Vec = deferred_models.iter().cloned().collect(); while let Some(model_name) = to_check.pop() { - if let Some(manifest_model) = deferred_manifest.get_model(&model_name) { - for dep in &manifest_model.depends_on { - let dep_str = dep.as_str(); - if !selected_set.contains(dep_str) - && !deferred_models.contains(dep_str) - && deferred_manifest.get_model(dep_str).is_some() - { - deferred_models.insert(dep_str.to_string()); - to_check.push(dep_str.to_string()); - if global.verbose { - eprintln!( - "[verbose] Deferring {} to production manifest (transitive)", - dep - ); - } - } - // Note: Don't fail on transitive deps missing from manifest - // They might be external tables or already executed + let Some(manifest_model) = deferred_manifest.get_model(&model_name) else { + continue; + }; + for dep in &manifest_model.depends_on { + let dep_str = dep.as_str(); + if selected_set.contains(dep_str) + || deferred_models.contains(dep_str) + || deferred_manifest.get_model(dep_str).is_none() + { + continue; + } + deferred_models.insert(dep_str.to_string()); + to_check.push(dep_str.to_string()); + if global.verbose { + eprintln!( + "[verbose] Deferring {} to production manifest (transitive)", + dep + ); } } } diff --git a/crates/ff-cli/src/commands/run/execute.rs b/crates/ff-cli/src/commands/run/execute.rs index 86acbbb3..8b662258 100644 --- a/crates/ff-cli/src/commands/run/execute.rs +++ b/crates/ff-cli/src/commands/run/execute.rs @@ -553,7 +553,6 @@ async fn execute_models_sequential( } else { success_count += 1; - // Try to get row count for state tracking (non-blocking) let qualified_name = build_qualified_name(compiled.schema.as_deref(), name); let row_count = match ctx .db @@ -608,19 +607,19 @@ struct ParallelExecutionState { /// Prepare a model for parallel execution. /// -/// Returns `None` if the model should be skipped (missing or ephemeral). +/// Returns `false` if the model should be skipped (missing or ephemeral). /// Ephemeral models record a success result as a side effect. fn prepare_level_model( name: &str, compiled_models: &HashMap, state: &Arc, -) -> Option> { +) -> bool { let Some(compiled) = compiled_models.get(name) else { eprintln!( "[warn] Model '{}' missing from compiled_models, skipping", name ); - return None; + return false; }; if compiled.materialization == Materialization::Ephemeral { @@ -633,17 +632,19 @@ fn prepare_level_model( error: None, }); recover_mutex(&state.completed).insert(name.to_string()); - return None; + return false; } - Some(Arc::new(compiled.clone())) + true } /// Async task body for executing a single model in parallel mode. +/// +/// Borrows the compiled model from `state.all_compiled_models` instead of +/// receiving a cloned `Arc`, avoiding a deep copy per model. async fn execute_model_task( db: Arc, name: String, - compiled: Arc, state: Arc, ) { let Ok(_permit) = state.semaphore.acquire().await else { @@ -654,11 +655,15 @@ async fn execute_model_task( return; } + let Some(compiled) = state.all_compiled_models.get(&name) else { + return; + }; + let model_result = if compiled.is_python { super::python::run_python_model( &db, &name, - &compiled, + compiled, &state.all_compiled_models, state.db_path.as_deref().unwrap_or(":memory:"), ) @@ -667,7 +672,7 @@ async fn execute_model_task( run_single_model( &db, &name, - &compiled, + compiled, state.full_refresh, state.wap_schema.as_deref(), ) @@ -764,17 +769,12 @@ async fn execute_models_parallel( break; } - let Some(compiled) = prepare_level_model(name, &ctx.compiled_models, &state) else { + if !prepare_level_model(name, &ctx.compiled_models, &state) { continue; - }; + } let db = Arc::clone(ctx.db); - set.spawn(execute_model_task( - db, - name.clone(), - compiled, - Arc::clone(&state), - )); + set.spawn(execute_model_task(db, name.clone(), Arc::clone(&state))); } while let Some(res) = set.join_next().await { diff --git a/crates/ff-core/src/dag.rs b/crates/ff-core/src/dag.rs index e979f44a..8af3842e 100644 --- a/crates/ff-core/src/dag.rs +++ b/crates/ff-core/src/dag.rs @@ -29,15 +29,14 @@ impl ModelDag { /// Add a model to the DAG pub fn add_model(&mut self, name: &str) -> CoreResult { if let Some(&idx) = self.node_map.get(name) { - Ok(idx) - } else { - let model_name = ModelName::try_new(name).ok_or_else(|| CoreError::EmptyName { - context: "model name in DAG".into(), - })?; - let idx = self.graph.add_node(model_name.clone()); - self.node_map.insert(model_name, idx); - Ok(idx) + return Ok(idx); } + let model_name = ModelName::try_new(name).ok_or_else(|| CoreError::EmptyName { + context: "model name in DAG".into(), + })?; + let idx = self.graph.add_node(model_name.clone()); + self.node_map.insert(model_name, idx); + Ok(idx) } /// Add a dependency edge (from depends on to) diff --git a/crates/ff-core/src/project/loading.rs b/crates/ff-core/src/project/loading.rs index 64347761..db9ea046 100644 --- a/crates/ff-core/src/project/loading.rs +++ b/crates/ff-core/src/project/loading.rs @@ -40,20 +40,23 @@ where let path = entry.path(); if path.is_dir() { discover_yaml_recursive(&path, items, probe, load)?; - } else if path.extension().is_some_and(|e| e == "yml" || e == "yaml") { - let content = match std::fs::read_to_string(&path) { - Ok(c) => c, - Err(e) => { - log::warn!("Cannot read {}: {}", path.display(), e); - continue; - } - }; - if !probe(&content) { + continue; + } + if !path.extension().is_some_and(|e| e == "yml" || e == "yaml") { + continue; + } + let content = match std::fs::read_to_string(&path) { + Ok(c) => c, + Err(e) => { + log::warn!("Cannot read {}: {}", path.display(), e); continue; } - let item = load(&path)?; - items.push(item); + }; + if !probe(&content) { + continue; } + let item = load(&path)?; + items.push(item); } Ok(()) } @@ -87,23 +90,17 @@ fn categorize_dir_files(dir: &Path) -> CoreResult { .filter(|p| p.is_file() && !is_hidden_file(p)) .collect(); - let sql = all_visible - .iter() - .filter(|p| p.extension().is_some_and(|e| e == "sql")) - .cloned() - .collect(); - - let csv = all_visible - .iter() - .filter(|p| p.extension().is_some_and(|e| e == "csv")) - .cloned() - .collect(); - - let py = all_visible - .iter() - .filter(|p| p.extension().is_some_and(|e| e == "py")) - .cloned() - .collect(); + let mut sql = Vec::new(); + let mut csv = Vec::new(); + let mut py = Vec::new(); + for p in &all_visible { + match file_extension_str(p) { + "sql" => sql.push(p.clone()), + "csv" => csv.push(p.clone()), + "py" => py.push(p.clone()), + _ => {} + } + } Ok(CategorizedFiles { all_visible, diff --git a/crates/ff-meta/src/lib.rs b/crates/ff-meta/src/lib.rs index fc0c7156..bbd40803 100644 --- a/crates/ff-meta/src/lib.rs +++ b/crates/ff-meta/src/lib.rs @@ -15,5 +15,6 @@ pub(crate) mod row_helpers; pub mod rules; pub use connection::MetaDb; +pub use duckdb::Connection as DuckDbConnection; pub use error::{MetaError, MetaResult}; pub use manifest::Manifest; diff --git a/crates/ff-sql/src/dialect.rs b/crates/ff-sql/src/dialect.rs index ef89b0ee..73745df2 100644 --- a/crates/ff-sql/src/dialect.rs +++ b/crates/ff-sql/src/dialect.rs @@ -91,7 +91,7 @@ impl ResolvedIdent { /// Debug-asserts that `parts` is non-empty. Every call-site in production /// passes at least one part (from `resolve_object_name`). pub fn from_parts(parts: Vec) -> Self { - debug_assert!( + assert!( !parts.is_empty(), "ResolvedIdent requires at least one part" ); diff --git a/crates/ff-sql/src/inline.rs b/crates/ff-sql/src/inline.rs index 3f44b86a..c51c5b44 100644 --- a/crates/ff-sql/src/inline.rs +++ b/crates/ff-sql/src/inline.rs @@ -237,7 +237,7 @@ fn collect_ephemeral_recursive( ); continue; }; - let dep = dep.clone(); + let dep = dep.to_string(); ephemeral_sql.insert(dep.clone(), sql); order.push(dep); } diff --git a/crates/ff-sql/src/lineage.rs b/crates/ff-sql/src/lineage.rs index 3d781e1d..4034d42f 100644 --- a/crates/ff-sql/src/lineage.rs +++ b/crates/ff-sql/src/lineage.rs @@ -345,13 +345,17 @@ impl ProjectLineage { let mut dot = String::from("digraph lineage {\n rankdir=LR;\n node [shape=record];\n\n"); for (name, lineage) in &self.models { - let cols: Vec<&str> = lineage + let cols: Vec = lineage .columns .iter() - .map(|c| c.output_column.as_str()) + .map(|c| dot_escape(&c.output_column)) .collect(); - let label = format!("{}|{}", name, cols.join("\\l")); - dot.push_str(&format!(" \"{}\" [label=\"{{{}}}\"];\n", name, label)); + let escaped_name = dot_escape(name); + let label = format!("{}|{}", escaped_name, cols.join("\\l")); + dot.push_str(&format!( + " \"{}\" [label=\"{{{}}}\"];\n", + escaped_name, label + )); } dot.push('\n'); @@ -364,7 +368,11 @@ impl ProjectLineage { }; dot.push_str(&format!( " \"{}\":\"{}\" -> \"{}\":\"{}\"{};\n", - edge.source_model, edge.source_column, edge.target_model, edge.target_column, style + dot_escape(&edge.source_model), + dot_escape(&edge.source_column), + dot_escape(&edge.target_model), + dot_escape(&edge.target_column), + style )); } @@ -373,6 +381,17 @@ impl ProjectLineage { } } +/// Escape DOT/Graphviz special characters in a string used inside labels or IDs. +fn dot_escape(s: &str) -> String { + s.replace('\\', "\\\\") + .replace('"', "\\\"") + .replace('{', "\\{") + .replace('}', "\\}") + .replace('<', "\\<") + .replace('>', "\\>") + .replace('|', "\\|") +} + fn resolve_single_edge( target_model: &str, lineage: &ModelLineage, @@ -453,6 +472,20 @@ fn extract_lineage_from_set_expr(set_expr: &SetExpr, lineage: &mut ModelLineage) } } +/// Build lineage for a `table.*` qualified wildcard. +fn extract_qualified_wildcard_lineage(kind: &SelectItemQualifiedWildcardKind) -> ColumnLineage { + let table_name = match kind { + SelectItemQualifiedWildcardKind::ObjectName(name) => crate::object_name_to_string(name), + SelectItemQualifiedWildcardKind::Expr(expr) => format!("{expr}"), + }; + let mut col_lineage = ColumnLineage::new(&format!("{}.*", table_name)); + col_lineage.expr_type = ExprType::Wildcard; + col_lineage + .source_columns + .insert(ColumnRef::qualified(&table_name, "*")); + col_lineage +} + /// Extract lineage from a SELECT clause fn extract_lineage_from_select(select: &Select, lineage: &mut ModelLineage) { for table in &select.from { @@ -471,18 +504,7 @@ fn extract_lineage_from_select(select: &Select, lineage: &mut ModelLineage) { lineage.add_column(col_lineage); } SelectItem::QualifiedWildcard(kind, _) => { - let table_name = match kind { - SelectItemQualifiedWildcardKind::ObjectName(name) => { - crate::object_name_to_string(name) - } - SelectItemQualifiedWildcardKind::Expr(expr) => format!("{expr}"), - }; - let mut col_lineage = ColumnLineage::new(&format!("{}.*", table_name)); - col_lineage.expr_type = ExprType::Wildcard; - col_lineage - .source_columns - .insert(ColumnRef::qualified(&table_name, "*")); - lineage.add_column(col_lineage); + lineage.add_column(extract_qualified_wildcard_lineage(kind)); } SelectItem::Wildcard(_) => { let mut col_lineage = ColumnLineage::new("*");