From ca90c0a5e92888c1c7d537ab3f9fbda7180f052c Mon Sep 17 00:00:00 2001 From: brian moore Date: Thu, 19 Feb 2026 18:33:25 -0500 Subject: [PATCH] bm/dry-the-code-up --- crates/ff-cli/src/commands/compile.rs | 181 +++++++++++++++----------- crates/ff-cli/src/commands/run/mod.rs | 14 -- crates/ff-core/src/config.rs | 5 +- crates/ff-core/src/contract.rs | 1 - crates/ff-core/src/function.rs | 48 ++----- crates/ff-core/src/model/schema.rs | 2 - crates/ff-core/src/project/loading.rs | 44 +++++++ crates/ff-core/src/project/mod.rs | 2 +- crates/ff-core/src/source.rs | 46 ++----- crates/ff-db/src/duckdb.rs | 23 +++- crates/ff-sql/src/dialect.rs | 3 +- crates/ff-sql/src/extractor.rs | 1 - crates/ff-sql/src/qualify.rs | 5 +- 13 files changed, 195 insertions(+), 180 deletions(-) diff --git a/crates/ff-cli/src/commands/compile.rs b/crates/ff-cli/src/commands/compile.rs index 470a3f6..7b15b30 100644 --- a/crates/ff-cli/src/commands/compile.rs +++ b/crates/ff-cli/src/commands/compile.rs @@ -88,7 +88,6 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( let comment_ctx = common::build_query_comment_context(&project.config, global.target.as_deref()); - // Get vars merged with target overrides, then merge with CLI --vars let base_vars = project.config.get_merged_vars(target.as_deref()); let vars = merge_vars(&base_vars, &args.vars)?; @@ -104,7 +103,7 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( let template_ctx = common::build_template_context(&project, global.target.as_deref(), false); let jinja = JinjaEnvironment::with_context(&vars, ¯o_paths, &template_ctx); - // Compile all models first — filtering happens after DAG build + // Filtering happens after DAG build, so compile all models first let all_model_names: Vec = project .model_names() .into_iter() @@ -186,17 +185,14 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( } } - // Validate DAG (always done even in parse-only mode) let dag = ModelDag::build(&dependencies).context("Failed to build dependency graph")?; let topo_order = dag .topological_order() .context("Circular dependency detected")?; - // Apply selector filtering now that DAG is available let model_names: Vec = common::resolve_nodes(&project, &dag, &args.nodes)?; let model_names_set: HashSet = model_names.iter().cloned().collect(); - // Filter compiled_models to only those selected compiled_models.retain(|m| model_names_set.contains(&m.name)); if !json_mode && args.nodes.is_some() { @@ -215,7 +211,6 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( ); } - // Static analysis phase (DataFusion LogicalPlan) if !args.skip_static_analysis { let analysis_result = run_static_analysis( &project, @@ -273,7 +268,6 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( } for compiled in compiled_models { - // Skip ephemeral models - they don't get written as files if compiled.materialization == Materialization::Ephemeral { if !json_mode { println!(" \u{2713} {} (ephemeral) [inlined]", compiled.name); @@ -283,44 +277,21 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( model: compiled.name, status: RunStatus::Success, materialization: "ephemeral".to_string(), - output_path: None, // Ephemeral models don't have output files + output_path: None, dependencies: compiled.dependencies, error: None, }); continue; } - // Inline ephemeral dependencies into this model's SQL - let final_sql = if !ephemeral_sql.is_empty() { - // Collect ephemeral dependencies for this model - let is_ephemeral = - |name: &str| materializations.get(name) == Some(&Materialization::Ephemeral); - let get_sql = |name: &str| ephemeral_sql.get(name).cloned(); - - let (ephemeral_deps, order) = collect_ephemeral_dependencies( - &compiled.name, - &dependencies, - is_ephemeral, - get_sql, - ); - - if !ephemeral_deps.is_empty() { - if global.verbose { - eprintln!( - "[verbose] Inlining {} ephemeral model(s) into {}", - ephemeral_deps.len(), - compiled.name - ); - } - inline_ephemeral_ctes(&compiled.sql, &ephemeral_deps, &order)? - } else { - compiled.sql - } - } else { - compiled.sql - }; + let final_sql = resolve_final_sql( + &compiled, + &ephemeral_sql, + &dependencies, + &materializations, + global, + )?; - // Write the compiled SQL (with inlined ephemerals) if args.parse_only { if !json_mode { println!( @@ -341,47 +312,14 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( error: None, }); } else { - if let Some(parent) = compiled.output_path.parent() { - std::fs::create_dir_all(parent).with_context(|| { - format!("Failed to create directory for model: {}", compiled.name) - })?; - } - - // Attach query comment to written file, but keep in-memory SQL clean for checksums - let placement = comment_ctx - .as_ref() - .map(|c| c.config.placement) - .unwrap_or_default(); - let sql_to_write = match &compiled.query_comment { - Some(comment) => { - ff_core::query_comment::attach_query_comment(&final_sql, comment, placement) - } - None => final_sql.clone(), - }; - std::fs::write(&compiled.output_path, &sql_to_write).with_context(|| { - format!("Failed to write compiled SQL for model: {}", compiled.name) - })?; - - // Also update the model's compiled_sql with the clean version (no comment) - if let Some(model) = project.get_model_mut(&compiled.name) { - model.compiled_sql = Some(final_sql); - } - - if !json_mode { - println!( - " \u{2713} {} ({})", - compiled.name, compiled.materialization - ); - } - - if global.verbose { - eprintln!( - "[verbose] Compiled {} -> {}", - compiled.name, - compiled.output_path.display() - ); - } - + write_compiled_output( + &compiled, + &final_sql, + &comment_ctx, + &mut project, + global, + json_mode, + )?; success_count += 1; compile_results.push(ModelCompileResult { model: compiled.name, @@ -439,6 +377,91 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( Ok(()) } +/// Resolve the final SQL for a non-ephemeral model, inlining any ephemeral dependencies. +fn resolve_final_sql( + compiled: &CompileOutput, + ephemeral_sql: &HashMap, + dependencies: &HashMap>, + materializations: &HashMap, + global: &GlobalArgs, +) -> Result { + if ephemeral_sql.is_empty() { + return Ok(compiled.sql.clone()); + } + + let is_ephemeral = |name: &str| materializations.get(name) == Some(&Materialization::Ephemeral); + let get_sql = |name: &str| ephemeral_sql.get(name).cloned(); + + let (ephemeral_deps, order) = + collect_ephemeral_dependencies(&compiled.name, dependencies, is_ephemeral, get_sql); + + if ephemeral_deps.is_empty() { + return Ok(compiled.sql.clone()); + } + + if global.verbose { + eprintln!( + "[verbose] Inlining {} ephemeral model(s) into {}", + ephemeral_deps.len(), + compiled.name + ); + } + Ok(inline_ephemeral_ctes( + &compiled.sql, + &ephemeral_deps, + &order, + )?) +} + +/// Write compiled SQL to disk, attaching query comments and updating the in-memory model. +fn write_compiled_output( + compiled: &CompileOutput, + final_sql: &str, + comment_ctx: &Option, + project: &mut Project, + global: &GlobalArgs, + json_mode: bool, +) -> Result<()> { + if let Some(parent) = compiled.output_path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("Failed to create directory for model: {}", compiled.name))?; + } + + let placement = comment_ctx + .as_ref() + .map(|c| c.config.placement) + .unwrap_or_default(); + let sql_to_write = match &compiled.query_comment { + Some(comment) => { + ff_core::query_comment::attach_query_comment(final_sql, comment, placement) + } + None => final_sql.to_string(), + }; + std::fs::write(&compiled.output_path, &sql_to_write) + .with_context(|| format!("Failed to write compiled SQL for model: {}", compiled.name))?; + + if let Some(model) = project.get_model_mut(&compiled.name) { + model.compiled_sql = Some(final_sql.to_string()); + } + + if !json_mode { + println!( + " \u{2713} {} ({})", + compiled.name, compiled.materialization + ); + } + + if global.verbose { + eprintln!( + "[verbose] Compiled {} -> {}", + compiled.name, + compiled.output_path.display() + ); + } + + Ok(()) +} + /// Merge extra vars from --vars argument into project vars fn merge_vars( project_vars: &HashMap, diff --git a/crates/ff-cli/src/commands/run/mod.rs b/crates/ff-cli/src/commands/run/mod.rs index 7165b5d..a2a9e87 100644 --- a/crates/ff-cli/src/commands/run/mod.rs +++ b/crates/ff-cli/src/commands/run/mod.rs @@ -87,24 +87,20 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { // Open meta database early (needed for smart build and execution state) let meta_db = common::open_meta_db(&project); - // Smart build: filter out unchanged models (queries previous execution state) let smart_skipped: HashSet = if args.smart { compute_smart_skips(&compiled_models, global, meta_db.as_ref())? } else { HashSet::new() }; - // Compute config hash for run state validation let config_hash = compute_config_hash(&project); - // Determine run state path let run_state_path = args .state_file .as_ref() .map(|s| Path::new(s).to_path_buf()) .unwrap_or_else(|| project.target_dir().join("run_state.json")); - // Handle resume mode let (execution_order, previous_run_state) = if args.resume { state::handle_resume_mode( &run_state_path, @@ -118,7 +114,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { (order, None) }; - // Apply smart build filtering let execution_order: Vec = if !smart_skipped.is_empty() { let before = execution_order.len(); let filtered: Vec = execution_order @@ -160,7 +155,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { ); } - // Count non-ephemeral models (ephemeral models are inlined, not executed) let ephemeral_count = execution_order .iter() .filter(|name| { @@ -172,7 +166,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { .count(); let executable_count = execution_order.len() - ephemeral_count; - // Show resume summary if applicable (text mode only) if !json_mode { if let Some(ref prev_state) = previous_run_state { let summary = prev_state.summary(); @@ -190,13 +183,11 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { } } - // Resolve WAP schema from config let target = ff_core::config::Config::resolve_target(global.target.as_deref()); let wap_schema = project.config.get_wap_schema(target.as_deref()); create_schemas(&db, &compiled_models, global).await?; - // Create WAP schema if configured if let Some(ws) = wap_schema { db.create_schema_if_not_exists(ws) .await @@ -226,12 +217,10 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { config_hash, ); - // Save initial run state if let Err(e) = run_state.save(&run_state_path) { eprintln!("Warning: Failed to save initial run state: {}", e); } - // Populate meta database before execution to capture model_id_map (non-fatal) let meta_ids = meta_db .as_ref() .and_then(|db| common::populate_meta_phase1(db, &project, "run", args.nodes.as_deref())); @@ -240,7 +229,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { None => (None, None), }; - // Resolve database file path for Python model execution let db_path_str = project.config.database.path.clone(); let db_path_ref = db_path_str.as_str(); @@ -259,13 +247,11 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> { let (run_results, success_count, failure_count, stopped_early) = execute_models_with_state(&exec_ctx, &mut run_state, &run_state_path).await?; - // Mark run as completed run_state.mark_run_completed(); if let Err(e) = run_state.save(&run_state_path) { eprintln!("Warning: Failed to save final run state: {}", e); } - // Complete meta database run (non-fatal) if let (Some(ref meta_db), Some(run_id)) = (&meta_db, meta_run_id) { let status = if failure_count > 0 { "error" diff --git a/crates/ff-core/src/config.rs b/crates/ff-core/src/config.rs index f2edefb..64b4a70 100644 --- a/crates/ff-core/src/config.rs +++ b/crates/ff-core/src/config.rs @@ -296,7 +296,10 @@ impl Config { path: path.display().to_string(), source: e, })?; - let config: Config = serde_yaml::from_str(&content)?; + let config: Config = + serde_yaml::from_str(&content).map_err(|e| CoreError::ConfigParseError { + message: format!("{}: {}", path.display(), e), + })?; config.validate()?; Ok(config) } diff --git a/crates/ff-core/src/contract.rs b/crates/ff-core/src/contract.rs index 29d2157..d250bf2 100644 --- a/crates/ff-core/src/contract.rs +++ b/crates/ff-core/src/contract.rs @@ -70,7 +70,6 @@ impl ContractValidationResult { violation_type, message: message.into(), }); - // If enforced, mark as failed if self.enforced { self.passed = false; } diff --git a/crates/ff-core/src/function.rs b/crates/ff-core/src/function.rs index 825c5db..bede879 100644 --- a/crates/ff-core/src/function.rs +++ b/crates/ff-core/src/function.rs @@ -441,47 +441,21 @@ struct YamlKindProbe { /// Recursively discover function files in a directory fn discover_functions_recursive(dir: &Path, functions: &mut Vec) -> CoreResult<()> { - for entry in std::fs::read_dir(dir).map_err(|e| CoreError::IoWithPath { - path: dir.display().to_string(), - source: e, - })? { - let entry = entry.map_err(|e| CoreError::IoWithPath { - path: dir.display().to_string(), - source: e, - })?; - let path = entry.path(); - - if path.is_dir() { - discover_functions_recursive(&path, functions)?; - } else if path.extension().is_some_and(|e| e == "yml" || e == "yaml") { - let content = match std::fs::read_to_string(&path) { - Ok(c) => c, - Err(e) => { - log::warn!("Cannot read {}: {}", path.display(), e); - continue; - } - }; - - // Deserialize just the `kind` field to decide whether this is a function YAML - let probe: YamlKindProbe = match serde_yaml::from_str(&content) { + crate::project::loading::discover_yaml_recursive( + dir, + functions, + |content| { + let probe: YamlKindProbe = match serde_yaml::from_str(content) { Ok(p) => p, - Err(_) => continue, // Not valid YAML or unrelated file + Err(_) => return false, }; - - if !matches!( + matches!( probe.kind, Some(FunctionKind::Functions | FunctionKind::Function) - ) { - continue; - } - - // Kind probe confirmed this is a function file — parse errors are real - let func = FunctionDef::load(&path)?; - functions.push(func); - } - } - - Ok(()) + ) + }, + FunctionDef::load, + ) } /// Build a lookup map from function name to function definition. diff --git a/crates/ff-core/src/model/schema.rs b/crates/ff-core/src/model/schema.rs index b9f19b0..9bb1b81 100644 --- a/crates/ff-core/src/model/schema.rs +++ b/crates/ff-core/src/model/schema.rs @@ -254,11 +254,9 @@ impl ModelSchema { /// Get owner - prefers direct owner field, falls back to meta.owner pub fn get_owner(&self) -> Option { - // First check direct owner field if let Some(owner) = &self.owner { return Some(owner.clone()); } - // Fall back to meta.owner self.get_meta_string("owner") } diff --git a/crates/ff-core/src/project/loading.rs b/crates/ff-core/src/project/loading.rs index 46d9b31..6434776 100644 --- a/crates/ff-core/src/project/loading.rs +++ b/crates/ff-core/src/project/loading.rs @@ -14,6 +14,50 @@ use std::path::Path; use super::{Project, ProjectParts}; +/// Recursively discover YAML files in a directory, probing each for a matching +/// `kind` and loading matches with a caller-supplied loader. +/// +/// This is the shared logic behind `discover_functions_recursive` and +/// `discover_sources_recursive`. +pub(crate) fn discover_yaml_recursive( + dir: &Path, + items: &mut Vec, + probe: P, + load: L, +) -> CoreResult<()> +where + P: Fn(&str) -> bool + Copy, + L: Fn(&Path) -> CoreResult + Copy, +{ + for entry in std::fs::read_dir(dir).map_err(|e| CoreError::IoWithPath { + path: dir.display().to_string(), + source: e, + })? { + let entry = entry.map_err(|e| CoreError::IoWithPath { + path: dir.display().to_string(), + source: e, + })?; + let path = entry.path(); + if path.is_dir() { + discover_yaml_recursive(&path, items, probe, load)?; + } else if path.extension().is_some_and(|e| e == "yml" || e == "yaml") { + let content = match std::fs::read_to_string(&path) { + Ok(c) => c, + Err(e) => { + log::warn!("Cannot read {}: {}", path.display(), e); + continue; + } + }; + if !probe(&content) { + continue; + } + let item = load(&path)?; + items.push(item); + } + } + Ok(()) +} + /// Extract the file extension as a `&str`, returning `""` for paths without one. fn file_extension_str(path: &Path) -> &str { path.extension().and_then(|e| e.to_str()).unwrap_or("") diff --git a/crates/ff-core/src/project/mod.rs b/crates/ff-core/src/project/mod.rs index 173f429..2d7a3c4 100644 --- a/crates/ff-core/src/project/mod.rs +++ b/crates/ff-core/src/project/mod.rs @@ -1,6 +1,6 @@ //! Project discovery and loading -mod loading; +pub(crate) mod loading; mod versioning; pub(crate) use loading::find_yaml_path; diff --git a/crates/ff-core/src/source.rs b/crates/ff-core/src/source.rs index 3a06911..f731436 100644 --- a/crates/ff-core/src/source.rs +++ b/crates/ff-core/src/source.rs @@ -200,44 +200,18 @@ struct SourceKindProbe { /// Recursively discover source files in a directory fn discover_sources_recursive(dir: &Path, sources: &mut Vec) -> CoreResult<()> { - for entry in std::fs::read_dir(dir).map_err(|e| CoreError::IoWithPath { - path: dir.display().to_string(), - source: e, - })? { - let entry = entry.map_err(|e| CoreError::IoWithPath { - path: dir.display().to_string(), - source: e, - })?; - let path = entry.path(); - - if path.is_dir() { - discover_sources_recursive(&path, sources)?; - } else if path.extension().is_some_and(|e| e == "yml" || e == "yaml") { - let content = match std::fs::read_to_string(&path) { - Ok(c) => c, - Err(e) => { - log::warn!("Cannot read {}: {}", path.display(), e); - continue; - } - }; - - // Probe the kind field before attempting a full parse - let probe: SourceKindProbe = match serde_yaml::from_str(&content) { + crate::project::loading::discover_yaml_recursive( + dir, + sources, + |content| { + let probe: SourceKindProbe = match serde_yaml::from_str(content) { Ok(p) => p, - Err(_) => continue, + Err(_) => return false, }; - - if !matches!(probe.kind, Some(SourceKind::Sources | SourceKind::Source)) { - continue; - } - - // Kind probe confirmed this is a sources file — parse errors are real - let source = SourceFile::load(&path)?; - sources.push(source); - } - } - - Ok(()) + matches!(probe.kind, Some(SourceKind::Sources | SourceKind::Source)) + }, + SourceFile::load, + ) } /// Build lookup of known source tables for dependency categorization diff --git a/crates/ff-db/src/duckdb.rs b/crates/ff-db/src/duckdb.rs index 73fa015..8ce14d2 100644 --- a/crates/ff-db/src/duckdb.rs +++ b/crates/ff-db/src/duckdb.rs @@ -381,7 +381,11 @@ impl DuckDbBackend { /// Execute batch SQL synchronously fn execute_batch_sync(&self, sql: &str) -> DbResult<()> { let conn = self.lock_conn()?; - conn.execute_batch(sql)?; + conn.execute_batch(sql) + .map_err(|e| DbError::ExecutionFailed { + context: truncate_sql_for_error(sql), + source: e, + })?; Ok(()) } @@ -402,7 +406,12 @@ impl DuckDbBackend { let conn = self.lock_conn()?; let limited_sql = format!("SELECT * FROM ({}) AS subq LIMIT {}", sql, limit); - let mut stmt = conn.prepare(&limited_sql)?; + let mut stmt = conn + .prepare(&limited_sql) + .map_err(|e| DbError::ExecutionFailed { + context: truncate_sql_for_error(&limited_sql), + source: e, + })?; let mut rows = Vec::new(); let mut result_rows = stmt.query([])?; let column_count = result_rows.as_ref().map_or(0, |r| r.column_count()); @@ -448,8 +457,14 @@ impl DatabaseCore for DuckDbBackend { async fn query_one(&self, sql: &str) -> DbResult> { let conn = self.lock_conn()?; - let mut stmt = conn.prepare(sql)?; - let mut result_rows = stmt.query([])?; + let mut stmt = conn.prepare(sql).map_err(|e| DbError::ExecutionFailed { + context: truncate_sql_for_error(sql), + source: e, + })?; + let mut result_rows = stmt.query([]).map_err(|e| DbError::ExecutionFailed { + context: truncate_sql_for_error(sql), + source: e, + })?; let Some(row) = result_rows.next()? else { return Ok(None); diff --git a/crates/ff-sql/src/dialect.rs b/crates/ff-sql/src/dialect.rs index 9c6c991..ef89b0e 100644 --- a/crates/ff-sql/src/dialect.rs +++ b/crates/ff-sql/src/dialect.rs @@ -132,7 +132,6 @@ pub trait SqlDialect: Send + Sync { fn parse(&self, sql: &str) -> SqlResult> { Parser::parse_sql(self.parser_dialect(), sql).map_err(|e| { let msg = e.to_string(); - // Extract line/column from error message (format: "... at Line: X, Column: Y") let (line, column) = parse_location_from_error(&msg); SqlError::ParseError { message: msg, @@ -194,7 +193,7 @@ pub trait SqlDialect: Send + Sync { /// /// sqlparser 0.60's `ParserError` is a simple string wrapper with no structured /// location data, so we extract "Line: N, Column: M" from the error message text. -fn parse_location_from_error(msg: &str) -> (usize, usize) { +pub(crate) fn parse_location_from_error(msg: &str) -> (usize, usize) { let Some(line_idx) = msg.find("Line: ") else { return (0, 0); }; diff --git a/crates/ff-sql/src/extractor.rs b/crates/ff-sql/src/extractor.rs index b449f8b..81a5b24 100644 --- a/crates/ff-sql/src/extractor.rs +++ b/crates/ff-sql/src/extractor.rs @@ -165,7 +165,6 @@ pub fn extract_dependencies_resolved( return std::ops::ControlFlow::<()>::Continue(()); } - // Dedup by resolved name if seen.insert(resolved.name.clone()) { deps.push(resolved); } diff --git a/crates/ff-sql/src/qualify.rs b/crates/ff-sql/src/qualify.rs index f1aba3c..d8315e5 100644 --- a/crates/ff-sql/src/qualify.rs +++ b/crates/ff-sql/src/qualify.rs @@ -48,10 +48,11 @@ pub fn qualify_table_references( let dialect = DuckDbDialect {}; let mut statements = Parser::parse_sql(&dialect, sql).map_err(|e| { let msg = e.to_string(); + let (line, column) = crate::dialect::parse_location_from_error(&msg); SqlError::ParseError { message: msg, - line: 0, - column: 0, + line, + column, } })?;