diff --git a/crates/ff-analysis/src/datafusion_bridge/propagation.rs b/crates/ff-analysis/src/datafusion_bridge/propagation.rs index 43b2d98..5525e74 100644 --- a/crates/ff-analysis/src/datafusion_bridge/propagation.rs +++ b/crates/ff-analysis/src/datafusion_bridge/propagation.rs @@ -147,6 +147,7 @@ pub fn propagate_schemas( let mut failures: HashMap = HashMap::new(); let registry = FunctionRegistry::with_user_functions(user_functions, user_table_functions); + let mut provider = FeatherFlowProvider::new(&catalog, ®istry); for model_name in topo_order { let sql = match sql_sources.get(model_name) { @@ -157,7 +158,6 @@ pub fn propagate_schemas( } }; - let provider = FeatherFlowProvider::new(&catalog, ®istry); let plan = match sql_to_plan(sql, &provider) { Ok(p) => p, Err(e) => { @@ -175,6 +175,7 @@ pub fn propagate_schemas( }; catalog.insert(model_name.to_string(), Arc::clone(&inferred_schema)); + provider.insert_schema(model_name.to_string(), &inferred_schema); model_plans.insert( model_name.clone(), diff --git a/crates/ff-analysis/src/datafusion_bridge/provider.rs b/crates/ff-analysis/src/datafusion_bridge/provider.rs index 5a6fba8..d453e62 100644 --- a/crates/ff-analysis/src/datafusion_bridge/provider.rs +++ b/crates/ff-analysis/src/datafusion_bridge/provider.rs @@ -209,6 +209,14 @@ impl<'a> FeatherFlowProvider<'a> { } } + /// Insert (or update) a single schema entry without rebuilding the entire provider. + pub fn insert_schema(&mut self, name: String, schema: &RelSchema) { + let arrow = Self::rel_schema_to_arrow(schema); + self.lowercase_schemas + .insert(name.to_lowercase(), arrow.clone()); + self.arrow_schemas.insert(name, arrow); + } + /// Convert a RelSchema to an Arrow SchemaRef fn rel_schema_to_arrow(schema: &RelSchema) -> SchemaRef { let fields: Vec = schema diff --git a/crates/ff-cli/src/commands/compile.rs b/crates/ff-cli/src/commands/compile.rs index 742a4b1..cb62a55 100644 --- a/crates/ff-cli/src/commands/compile.rs +++ b/crates/ff-cli/src/commands/compile.rs @@ -8,7 +8,8 @@ use ff_core::model::ModelConfig; use ff_core::Project; use ff_jinja::JinjaEnvironment; use ff_sql::{ - collect_ephemeral_dependencies, extract_dependencies, inline_ephemeral_ctes, SqlParser, + collect_ephemeral_dependencies, extract_dependencies, inline_ephemeral_ctes, + qualify_statements, SqlParser, Statement, }; use serde::Serialize; use std::collections::{HashMap, HashSet}; @@ -47,6 +48,8 @@ use crate::commands::common::parse_hooks_from_config; struct CompileOutput { name: String, sql: String, + /// Parsed AST statements — kept to avoid re-parsing during qualification. + statements: Vec, materialization: Materialization, dependencies: Vec, output_path: std::path::PathBuf, @@ -242,15 +245,13 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<( let qualification_map = common::build_qualification_map(&project, &compiled_schemas); for compiled in &mut compiled_models { - match ff_sql::qualify_table_references(&compiled.sql, &qualification_map) { - Ok(qualified) => compiled.sql = qualified, - Err(e) => { - eprintln!( - "Warning: Failed to qualify table references in '{}': {}", - compiled.name, e - ); - } - } + qualify_statements(&mut compiled.statements, &qualification_map); + compiled.sql = compiled + .statements + .iter() + .map(|s| s.to_string()) + .collect::>() + .join(";\n"); } let ephemeral_sql: HashMap = compiled_models @@ -484,7 +485,7 @@ fn compile_model_phase1( ctx: &CompileContext<'_>, ) -> Result { // Immutable borrow block: render + parse + extract deps + resolve functions - let (rendered, config_values, model_deps, ext_deps) = { + let (rendered, config_values, model_deps, ext_deps, statements) = { let model = project .get_model(name) .with_context(|| format!("Model not found: {}", name))?; @@ -528,7 +529,7 @@ fn compile_model_phase1( ); } - (rendered, config_values, model_deps, ext_deps) + (rendered, config_values, model_deps, ext_deps, statements) }; // Mutable borrow: apply results to model @@ -572,6 +573,7 @@ fn compile_model_phase1( Ok(CompileOutput { name: name.to_string(), sql: rendered, + statements, materialization: mat, dependencies: model_deps, output_path, diff --git a/crates/ff-core/src/function.rs b/crates/ff-core/src/function.rs index bede879..22df2f9 100644 --- a/crates/ff-core/src/function.rs +++ b/crates/ff-core/src/function.rs @@ -194,9 +194,15 @@ impl FunctionDef { path: yaml_path.display().to_string(), source: e, })?; + Self::load_from_str(&content, yaml_path) + } + /// Load a function definition from already-read YAML content. + /// + /// Expects a matching `.sql` file in the same directory as `yaml_path`. + pub fn load_from_str(content: &str, yaml_path: &Path) -> CoreResult { let schema: FunctionSchema = - serde_yaml::from_str(&content).map_err(|e| CoreError::FunctionParseError { + serde_yaml::from_str(content).map_err(|e| CoreError::FunctionParseError { path: yaml_path.display().to_string(), details: e.to_string(), })?; diff --git a/crates/ff-core/src/model/mod.rs b/crates/ff-core/src/model/mod.rs index 12a0496..ed57d2c 100644 --- a/crates/ff-core/src/model/mod.rs +++ b/crates/ff-core/src/model/mod.rs @@ -224,6 +224,55 @@ impl Model { }) } + /// Create a new model from a file path, using already-read YAML schema content. + /// + /// Avoids re-reading and re-parsing the YAML file when the caller has already + /// read the content (e.g., during node discovery). + pub fn from_file_with_schema_content( + path: PathBuf, + schema_content: &str, + schema_path: &std::path::Path, + ) -> Result { + let name = path + .file_stem() + .and_then(|s| s.to_str()) + .ok_or_else(|| crate::error::CoreError::ModelParseError { + name: path.display().to_string(), + message: "Cannot extract model name from path".to_string(), + })? + .to_string(); + + let raw_sql = std::fs::read_to_string(&path).map_err(|e| CoreError::IoWithPath { + path: path.display().to_string(), + source: e, + })?; + + if raw_sql.trim().is_empty() { + return Err(CoreError::ModelParseError { + name, + message: "SQL file is empty".into(), + }); + } + + let schema = Some(ModelSchema::load_from_str(schema_content, schema_path)?); + + let (base_name, version) = Self::parse_version(&name); + + Ok(Self { + name: ModelName::new(name), + path, + raw_sql, + compiled_sql: None, + config: ModelConfig::default(), + depends_on: HashSet::new(), + external_deps: HashSet::new(), + schema, + base_name, + version, + kind: ModelKind::Model, + }) + } + /// Parse version suffix from model name /// /// Returns (base_name, version) where base_name is Some if the model follows _v{N} convention diff --git a/crates/ff-core/src/model/schema.rs b/crates/ff-core/src/model/schema.rs index 9bb1b81..16cdfe6 100644 --- a/crates/ff-core/src/model/schema.rs +++ b/crates/ff-core/src/model/schema.rs @@ -214,7 +214,12 @@ impl ModelSchema { path: path.display().to_string(), source: e, })?; - let schema: ModelSchema = serde_yaml::from_str(&content).map_err(|e| { + Self::load_from_str(&content, path) + } + + /// Load schema from already-read YAML content, using `path` only for error messages. + pub fn load_from_str(content: &str, path: &std::path::Path) -> Result { + let schema: ModelSchema = serde_yaml::from_str(content).map_err(|e| { use serde::de::Error as _; CoreError::YamlParse(serde_yaml::Error::custom(format!( "{}: {}", diff --git a/crates/ff-core/src/project/loading.rs b/crates/ff-core/src/project/loading.rs index db9ea04..d1296f2 100644 --- a/crates/ff-core/src/project/loading.rs +++ b/crates/ff-core/src/project/loading.rs @@ -315,10 +315,10 @@ impl Project { }; match raw_kind.normalize() { - NodeKind::Sql => Self::load_sql_node(path, &dir_name, models), + NodeKind::Sql => Self::load_sql_node(path, &dir_name, &content, &config_path, models), NodeKind::Seed => Self::load_seed_node(path, &dir_name, seeds), - NodeKind::Source => Self::load_source_node(&config_path, sources), - NodeKind::Function => Self::load_function_node(&config_path, functions), + NodeKind::Source => Self::load_source_node(&content, &config_path, sources), + NodeKind::Function => Self::load_function_node(&content, &config_path, functions), kind => Err(CoreError::NodeUnsupportedKind { directory: dir_name, kind: kind.to_string(), @@ -330,6 +330,8 @@ impl Project { fn load_sql_node( dir: &Path, dir_name: &str, + yaml_content: &str, + yaml_path: &Path, models: &mut HashMap, ) -> CoreResult<()> { let sql_path = dir.join(format!("{}.sql", dir_name)); @@ -341,7 +343,7 @@ impl Project { }); } - let model = Model::from_file(sql_path)?; + let model = Model::from_file_with_schema_content(sql_path, yaml_content, yaml_path)?; if models.contains_key(model.name.as_str()) { return Err(CoreError::DuplicateModel { @@ -380,15 +382,23 @@ impl Project { } /// Load a `kind: source` node as a [`SourceFile`]. - fn load_source_node(yaml_path: &Path, sources: &mut Vec) -> CoreResult<()> { - let source = SourceFile::load(yaml_path)?; + fn load_source_node( + yaml_content: &str, + yaml_path: &Path, + sources: &mut Vec, + ) -> CoreResult<()> { + let source = SourceFile::load_from_str(yaml_content, yaml_path)?; sources.push(source); Ok(()) } /// Load a `kind: function` node as a [`FunctionDef`]. - fn load_function_node(yaml_path: &Path, functions: &mut Vec) -> CoreResult<()> { - let func = FunctionDef::load(yaml_path)?; + fn load_function_node( + yaml_content: &str, + yaml_path: &Path, + functions: &mut Vec, + ) -> CoreResult<()> { + let func = FunctionDef::load_from_str(yaml_content, yaml_path)?; functions.push(func); Ok(()) } diff --git a/crates/ff-core/src/source.rs b/crates/ff-core/src/source.rs index f731436..3f2d702 100644 --- a/crates/ff-core/src/source.rs +++ b/crates/ff-core/src/source.rs @@ -105,9 +105,13 @@ impl SourceFile { path: path.display().to_string(), source: e, })?; + Self::load_from_str(&content, path) + } + /// Load and validate from already-read YAML content, using `path` only for error messages. + pub fn load_from_str(content: &str, path: &Path) -> CoreResult { let source: SourceFile = - serde_yaml::from_str(&content).map_err(|e| CoreError::SourceParseError { + serde_yaml::from_str(content).map_err(|e| CoreError::SourceParseError { path: path.display().to_string(), details: e.to_string(), })?; diff --git a/crates/ff-sql/src/lib.rs b/crates/ff-sql/src/lib.rs index cfe066a..878f6a1 100644 --- a/crates/ff-sql/src/lib.rs +++ b/crates/ff-sql/src/lib.rs @@ -29,7 +29,8 @@ pub use lineage::{ LineageKind, ModelLineage, ProjectLineage, }; pub use parser::SqlParser; -pub use qualify::qualify_table_references; +pub use qualify::{qualify_statements, qualify_table_references}; +pub use sqlparser::ast::Statement; pub use suggestions::{suggest_tests, ColumnSuggestions, ModelSuggestions, TestSuggestion}; pub use validator::validate_no_complex_queries; diff --git a/crates/ff-sql/src/qualify.rs b/crates/ff-sql/src/qualify.rs index d8315e5..b12d7a1 100644 --- a/crates/ff-sql/src/qualify.rs +++ b/crates/ff-sql/src/qualify.rs @@ -6,7 +6,7 @@ //! only for cross-database (attached) references. Only single-part (bare) //! names are qualified; already-qualified references are left unchanged. -use sqlparser::ast::{visit_relations_mut, Ident, ObjectName, ObjectNamePart}; +use sqlparser::ast::{visit_relations_mut, Ident, ObjectName, ObjectNamePart, Statement}; use sqlparser::dialect::DuckDbDialect; use sqlparser::parser::Parser; use std::collections::HashMap; @@ -56,12 +56,7 @@ pub fn qualify_table_references( } })?; - for stmt in &mut statements { - let _ = visit_relations_mut(stmt, |name: &mut ObjectName| { - qualify_single_name(name, qualification_map); - std::ops::ControlFlow::<()>::Continue(()) - }); - } + qualify_statements(&mut statements, qualification_map); Ok(statements .iter() @@ -70,6 +65,25 @@ pub fn qualify_table_references( .join(";\n")) } +/// Qualify bare table references in already-parsed statements (no re-parse). +/// +/// Mutates statements in-place. Use this when you already have a parsed AST +/// to avoid the cost of re-parsing SQL text. +pub fn qualify_statements( + statements: &mut [Statement], + qualification_map: &HashMap, +) { + if qualification_map.is_empty() { + return; + } + for stmt in statements.iter_mut() { + let _ = visit_relations_mut(stmt, |name: &mut ObjectName| { + qualify_single_name(name, qualification_map); + std::ops::ControlFlow::<()>::Continue(()) + }); + } +} + /// Qualify a single-part (bare) table name using the qualification map. /// /// Only rewrites names with exactly one part. Already-qualified names are