Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/ff-analysis/src/datafusion_bridge/propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub fn propagate_schemas(
let mut failures: HashMap<ModelName, String> = HashMap::new();

let registry = FunctionRegistry::with_user_functions(user_functions, user_table_functions);
let mut provider = FeatherFlowProvider::new(&catalog, &registry);

for model_name in topo_order {
let sql = match sql_sources.get(model_name) {
Expand All @@ -157,7 +158,6 @@ pub fn propagate_schemas(
}
};

let provider = FeatherFlowProvider::new(&catalog, &registry);
let plan = match sql_to_plan(sql, &provider) {
Ok(p) => p,
Err(e) => {
Expand All @@ -175,6 +175,7 @@ pub fn propagate_schemas(
};

catalog.insert(model_name.to_string(), Arc::clone(&inferred_schema));
provider.insert_schema(model_name.to_string(), &inferred_schema);

model_plans.insert(
model_name.clone(),
Expand Down
8 changes: 8 additions & 0 deletions crates/ff-analysis/src/datafusion_bridge/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ impl<'a> FeatherFlowProvider<'a> {
}
}

/// Insert (or update) a single schema entry without rebuilding the entire provider.
pub fn insert_schema(&mut self, name: String, schema: &RelSchema) {
let arrow = Self::rel_schema_to_arrow(schema);
self.lowercase_schemas
.insert(name.to_lowercase(), arrow.clone());
self.arrow_schemas.insert(name, arrow);
}

/// Convert a RelSchema to an Arrow SchemaRef
fn rel_schema_to_arrow(schema: &RelSchema) -> SchemaRef {
let fields: Vec<Field> = schema
Expand Down
26 changes: 14 additions & 12 deletions crates/ff-cli/src/commands/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use ff_core::model::ModelConfig;
use ff_core::Project;
use ff_jinja::JinjaEnvironment;
use ff_sql::{
collect_ephemeral_dependencies, extract_dependencies, inline_ephemeral_ctes, SqlParser,
collect_ephemeral_dependencies, extract_dependencies, inline_ephemeral_ctes,
qualify_statements, SqlParser, Statement,
};
use serde::Serialize;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -47,6 +48,8 @@ use crate::commands::common::parse_hooks_from_config;
struct CompileOutput {
name: String,
sql: String,
/// Parsed AST statements — kept to avoid re-parsing during qualification.
statements: Vec<Statement>,
materialization: Materialization,
dependencies: Vec<String>,
output_path: std::path::PathBuf,
Expand Down Expand Up @@ -242,15 +245,13 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
let qualification_map = common::build_qualification_map(&project, &compiled_schemas);

for compiled in &mut compiled_models {
match ff_sql::qualify_table_references(&compiled.sql, &qualification_map) {
Ok(qualified) => compiled.sql = qualified,
Err(e) => {
eprintln!(
"Warning: Failed to qualify table references in '{}': {}",
compiled.name, e
);
}
}
qualify_statements(&mut compiled.statements, &qualification_map);
compiled.sql = compiled
.statements
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(";\n");
}

let ephemeral_sql: HashMap<String, String> = compiled_models
Expand Down Expand Up @@ -484,7 +485,7 @@ fn compile_model_phase1(
ctx: &CompileContext<'_>,
) -> Result<CompileOutput> {
// Immutable borrow block: render + parse + extract deps + resolve functions
let (rendered, config_values, model_deps, ext_deps) = {
let (rendered, config_values, model_deps, ext_deps, statements) = {
let model = project
.get_model(name)
.with_context(|| format!("Model not found: {}", name))?;
Expand Down Expand Up @@ -528,7 +529,7 @@ fn compile_model_phase1(
);
}

(rendered, config_values, model_deps, ext_deps)
(rendered, config_values, model_deps, ext_deps, statements)
};

// Mutable borrow: apply results to model
Expand Down Expand Up @@ -572,6 +573,7 @@ fn compile_model_phase1(
Ok(CompileOutput {
name: name.to_string(),
sql: rendered,
statements,
materialization: mat,
dependencies: model_deps,
output_path,
Expand Down
8 changes: 7 additions & 1 deletion crates/ff-core/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,15 @@ impl FunctionDef {
path: yaml_path.display().to_string(),
source: e,
})?;
Self::load_from_str(&content, yaml_path)
}

/// Load a function definition from already-read YAML content.
///
/// Expects a matching `.sql` file in the same directory as `yaml_path`.
pub fn load_from_str(content: &str, yaml_path: &Path) -> CoreResult<Self> {
let schema: FunctionSchema =
serde_yaml::from_str(&content).map_err(|e| CoreError::FunctionParseError {
serde_yaml::from_str(content).map_err(|e| CoreError::FunctionParseError {
path: yaml_path.display().to_string(),
details: e.to_string(),
})?;
Expand Down
49 changes: 49 additions & 0 deletions crates/ff-core/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,55 @@ impl Model {
})
}

/// Create a new model from a file path, using already-read YAML schema content.
///
/// Avoids re-reading and re-parsing the YAML file when the caller has already
/// read the content (e.g., during node discovery).
pub fn from_file_with_schema_content(
path: PathBuf,
schema_content: &str,
schema_path: &std::path::Path,
) -> Result<Self, crate::error::CoreError> {
let name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| crate::error::CoreError::ModelParseError {
name: path.display().to_string(),
message: "Cannot extract model name from path".to_string(),
})?
.to_string();

let raw_sql = std::fs::read_to_string(&path).map_err(|e| CoreError::IoWithPath {
path: path.display().to_string(),
source: e,
})?;

if raw_sql.trim().is_empty() {
return Err(CoreError::ModelParseError {
name,
message: "SQL file is empty".into(),
});
}

let schema = Some(ModelSchema::load_from_str(schema_content, schema_path)?);

let (base_name, version) = Self::parse_version(&name);

Ok(Self {
name: ModelName::new(name),
path,
raw_sql,
compiled_sql: None,
config: ModelConfig::default(),
depends_on: HashSet::new(),
external_deps: HashSet::new(),
schema,
base_name,
version,
kind: ModelKind::Model,
})
}

/// Parse version suffix from model name
///
/// Returns (base_name, version) where base_name is Some if the model follows _v{N} convention
Expand Down
7 changes: 6 additions & 1 deletion crates/ff-core/src/model/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ impl ModelSchema {
path: path.display().to_string(),
source: e,
})?;
let schema: ModelSchema = serde_yaml::from_str(&content).map_err(|e| {
Self::load_from_str(&content, path)
}

/// Load schema from already-read YAML content, using `path` only for error messages.
pub fn load_from_str(content: &str, path: &std::path::Path) -> Result<Self, CoreError> {
let schema: ModelSchema = serde_yaml::from_str(content).map_err(|e| {
use serde::de::Error as _;
CoreError::YamlParse(serde_yaml::Error::custom(format!(
"{}: {}",
Expand Down
26 changes: 18 additions & 8 deletions crates/ff-core/src/project/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,10 @@ impl Project {
};

match raw_kind.normalize() {
NodeKind::Sql => Self::load_sql_node(path, &dir_name, models),
NodeKind::Sql => Self::load_sql_node(path, &dir_name, &content, &config_path, models),
NodeKind::Seed => Self::load_seed_node(path, &dir_name, seeds),
NodeKind::Source => Self::load_source_node(&config_path, sources),
NodeKind::Function => Self::load_function_node(&config_path, functions),
NodeKind::Source => Self::load_source_node(&content, &config_path, sources),
NodeKind::Function => Self::load_function_node(&content, &config_path, functions),
kind => Err(CoreError::NodeUnsupportedKind {
directory: dir_name,
kind: kind.to_string(),
Expand All @@ -330,6 +330,8 @@ impl Project {
fn load_sql_node(
dir: &Path,
dir_name: &str,
yaml_content: &str,
yaml_path: &Path,
models: &mut HashMap<ModelName, Model>,
) -> CoreResult<()> {
let sql_path = dir.join(format!("{}.sql", dir_name));
Expand All @@ -341,7 +343,7 @@ impl Project {
});
}

let model = Model::from_file(sql_path)?;
let model = Model::from_file_with_schema_content(sql_path, yaml_content, yaml_path)?;

if models.contains_key(model.name.as_str()) {
return Err(CoreError::DuplicateModel {
Expand Down Expand Up @@ -380,15 +382,23 @@ impl Project {
}

/// Load a `kind: source` node as a [`SourceFile`].
fn load_source_node(yaml_path: &Path, sources: &mut Vec<SourceFile>) -> CoreResult<()> {
let source = SourceFile::load(yaml_path)?;
fn load_source_node(
yaml_content: &str,
yaml_path: &Path,
sources: &mut Vec<SourceFile>,
) -> CoreResult<()> {
let source = SourceFile::load_from_str(yaml_content, yaml_path)?;
sources.push(source);
Ok(())
}

/// Load a `kind: function` node as a [`FunctionDef`].
fn load_function_node(yaml_path: &Path, functions: &mut Vec<FunctionDef>) -> CoreResult<()> {
let func = FunctionDef::load(yaml_path)?;
fn load_function_node(
yaml_content: &str,
yaml_path: &Path,
functions: &mut Vec<FunctionDef>,
) -> CoreResult<()> {
let func = FunctionDef::load_from_str(yaml_content, yaml_path)?;
functions.push(func);
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion crates/ff-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ impl SourceFile {
path: path.display().to_string(),
source: e,
})?;
Self::load_from_str(&content, path)
}

/// Load and validate from already-read YAML content, using `path` only for error messages.
pub fn load_from_str(content: &str, path: &Path) -> CoreResult<Self> {
let source: SourceFile =
serde_yaml::from_str(&content).map_err(|e| CoreError::SourceParseError {
serde_yaml::from_str(content).map_err(|e| CoreError::SourceParseError {
path: path.display().to_string(),
details: e.to_string(),
})?;
Expand Down
3 changes: 2 additions & 1 deletion crates/ff-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ pub use lineage::{
LineageKind, ModelLineage, ProjectLineage,
};
pub use parser::SqlParser;
pub use qualify::qualify_table_references;
pub use qualify::{qualify_statements, qualify_table_references};
pub use sqlparser::ast::Statement;
pub use suggestions::{suggest_tests, ColumnSuggestions, ModelSuggestions, TestSuggestion};
pub use validator::validate_no_complex_queries;

Expand Down
28 changes: 21 additions & 7 deletions crates/ff-sql/src/qualify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! only for cross-database (attached) references. Only single-part (bare)
//! names are qualified; already-qualified references are left unchanged.

use sqlparser::ast::{visit_relations_mut, Ident, ObjectName, ObjectNamePart};
use sqlparser::ast::{visit_relations_mut, Ident, ObjectName, ObjectNamePart, Statement};
use sqlparser::dialect::DuckDbDialect;
use sqlparser::parser::Parser;
use std::collections::HashMap;
Expand Down Expand Up @@ -56,12 +56,7 @@ pub fn qualify_table_references(
}
})?;

for stmt in &mut statements {
let _ = visit_relations_mut(stmt, |name: &mut ObjectName| {
qualify_single_name(name, qualification_map);
std::ops::ControlFlow::<()>::Continue(())
});
}
qualify_statements(&mut statements, qualification_map);

Ok(statements
.iter()
Expand All @@ -70,6 +65,25 @@ pub fn qualify_table_references(
.join(";\n"))
}

/// Qualify bare table references in already-parsed statements (no re-parse).
///
/// Mutates statements in-place. Use this when you already have a parsed AST
/// to avoid the cost of re-parsing SQL text.
pub fn qualify_statements(
statements: &mut [Statement],
qualification_map: &HashMap<String, QualifiedRef>,
) {
if qualification_map.is_empty() {
return;
}
for stmt in statements.iter_mut() {
let _ = visit_relations_mut(stmt, |name: &mut ObjectName| {
qualify_single_name(name, qualification_map);
std::ops::ControlFlow::<()>::Continue(())
});
}
}

/// Qualify a single-part (bare) table name using the qualification map.
///
/// Only rewrites names with exactly one part. Already-qualified names are
Expand Down