Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions crates/ff-analysis/src/pass/expr_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,7 @@ where
Expr::Cast(cast) => walk_expr_columns(&cast.expr, collector),
Expr::TryCast(try_cast) => walk_expr_columns(&try_cast.expr, collector),
Expr::Case(case) => {
if let Some(ref operand) = case.expr {
walk_expr_columns(operand, collector);
}
for (when, then) in &case.when_then_expr {
walk_expr_columns(when, collector);
walk_expr_columns(then, collector);
}
if let Some(ref else_expr) = case.else_expr {
walk_expr_columns(else_expr, collector);
}
for_each_case_subexpr(case, |e| walk_expr_columns(e, collector));
}
Expr::IsNull(inner) | Expr::IsNotNull(inner) | Expr::Not(inner) | Expr::Negative(inner) => {
walk_expr_columns(inner, collector);
Expand Down
19 changes: 10 additions & 9 deletions crates/ff-analysis/src/pass/plan_unused_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,18 @@ fn collect_consumed_columns(
consumed
}

/// Collect lowercased column refs from a slice of expressions.
fn collect_refs_from_exprs(exprs: &[Expr], consumed: &mut HashSet<String>) {
for expr in exprs {
collect_column_refs_lowercase(expr, consumed);
}
}

/// Walk a LogicalPlan tree and collect referenced column names (lowercased)
fn collect_column_refs_from_plan(plan: &LogicalPlan, consumed: &mut HashSet<String>) {
match plan {
LogicalPlan::Projection(proj) => {
for expr in &proj.expr {
collect_column_refs_lowercase(expr, consumed);
}
collect_refs_from_exprs(&proj.expr, consumed);
collect_column_refs_from_plan(&proj.input, consumed);
}
LogicalPlan::Filter(filter) => {
Expand All @@ -132,12 +137,8 @@ fn collect_column_refs_from_plan(plan: &LogicalPlan, consumed: &mut HashSet<Stri
collect_column_refs_from_plan(&join.right, consumed);
}
LogicalPlan::Aggregate(agg) => {
for expr in &agg.group_expr {
collect_column_refs_lowercase(expr, consumed);
}
for expr in &agg.aggr_expr {
collect_column_refs_lowercase(expr, consumed);
}
collect_refs_from_exprs(&agg.group_expr, consumed);
collect_refs_from_exprs(&agg.aggr_expr, consumed);
collect_column_refs_from_plan(&agg.input, consumed);
}
LogicalPlan::Sort(sort) => {
Expand Down
2 changes: 0 additions & 2 deletions crates/ff-analysis/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ impl RelSchema {
/// If `source_table` metadata is available, filters by it first.
/// Falls back to column-name-only lookup when table info is missing.
pub fn find_qualified(&self, table: &str, column: &str) -> Option<&TypedColumn> {
// Try to find a column that matches both table and name
let qualified_match = self.columns.iter().find(|c| {
c.name.eq_ignore_ascii_case(column)
&& c.source_table
.as_ref()
.is_some_and(|t| t.eq_ignore_ascii_case(table))
});

// Fall back to column-name-only if no qualified match
qualified_match.or_else(|| self.find_column(column))
}

Expand Down
77 changes: 48 additions & 29 deletions crates/ff-cli/src/commands/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,49 @@ fn compute_compiled_path(
Ok(output_dir.join(filename))
}

/// Populate meta database rows for a single successfully compiled model.
fn populate_single_model_compile(
conn: &ff_meta::DuckDbConnection,
project: &Project,
result: &ModelCompileResult,
model_id: i64,
dependencies: &HashMap<String, Vec<String>>,
model_id_map: &HashMap<ff_core::ModelName, i64>,
) -> ff_meta::MetaResult<()> {
let compiled_sql = project
.get_model(&result.model)
.and_then(|m| m.compiled_sql.as_deref())
.unwrap_or("");
let compiled_path = result.output_path.as_deref().unwrap_or("");
let checksum = ff_core::compute_checksum(compiled_sql);

ff_meta::populate::compilation::update_model_compiled(
conn,
model_id,
compiled_sql,
compiled_path,
&checksum,
)?;

if let Some(deps) = dependencies.get(&result.model) {
let dep_ids: Vec<i64> = deps
.iter()
.filter_map(|d| model_id_map.get(d.as_str()).copied())
.collect();
ff_meta::populate::compilation::populate_dependencies(conn, model_id, &dep_ids)?;
}

let ext_deps: Vec<&str> = project
.get_model(&result.model)
.map(|m| m.external_deps.iter().map(|t| t.as_ref()).collect())
.unwrap_or_default();
if !ext_deps.is_empty() {
ff_meta::populate::compilation::populate_external_dependencies(conn, model_id, &ext_deps)?;
}

Ok(())
}

/// Populate the meta database with compile-phase data (non-fatal).
fn populate_meta_compile(
project: &Project,
Expand All @@ -679,38 +722,14 @@ fn populate_meta_compile(
let Some(&model_id) = model_id_map.get(result.model.as_str()) else {
continue;
};
let compiled_sql = project
.get_model(&result.model)
.and_then(|m| m.compiled_sql.as_deref())
.unwrap_or("");
let compiled_path = result.output_path.as_deref().unwrap_or("");
let checksum = ff_core::compute_checksum(compiled_sql);

ff_meta::populate::compilation::update_model_compiled(
populate_single_model_compile(
conn,
project,
result,
model_id,
compiled_sql,
compiled_path,
&checksum,
dependencies,
&model_id_map,
)?;

if let Some(deps) = dependencies.get(&result.model) {
let dep_ids: Vec<i64> = deps
.iter()
.filter_map(|d| model_id_map.get(d.as_str()).copied())
.collect();
ff_meta::populate::compilation::populate_dependencies(conn, model_id, &dep_ids)?;
}

let ext_deps: Vec<&str> = project
.get_model(&result.model)
.map(|m| m.external_deps.iter().map(|t| t.as_ref()).collect())
.unwrap_or_default();
if !ext_deps.is_empty() {
ff_meta::populate::compilation::populate_external_dependencies(
conn, model_id, &ext_deps,
)?;
}
}
Ok(())
});
Expand Down
121 changes: 56 additions & 65 deletions crates/ff-cli/src/commands/docs/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,75 +914,66 @@ fn generate_lineage_dot(project: &Project) -> String {
}
}

dot.push_str(
"\n // Model nodes (blue for views, green for tables, gold for incremental, gray for ephemeral)\n",
);
if let Some(ref manifest) = manifest {
for (name, model) in &manifest.models {
let color = match model.materialized {
ff_core::config::Materialization::Table => COLOR_TABLE,
ff_core::config::Materialization::View => COLOR_VIEW,
ff_core::config::Materialization::Incremental => COLOR_INCREMENTAL,
ff_core::config::Materialization::Ephemeral => COLOR_EPHEMERAL,
};
dot.push_str(&format!(
" \"{}\" [label=\"{}\" fillcolor=\"{}\"];\n",
name, name, color
));
}
} else {
for (name, model) in &project.models {
let mat = model.materialization(project.config.materialization);
let color = match mat {
ff_core::config::Materialization::Table => COLOR_TABLE,
ff_core::config::Materialization::View => COLOR_VIEW,
ff_core::config::Materialization::Incremental => COLOR_INCREMENTAL,
ff_core::config::Materialization::Ephemeral => COLOR_EPHEMERAL,
};
dot.push_str(&format!(
" \"{}\" [label=\"{}\" fillcolor=\"{}\"];\n",
name, name, color
));
}
dot.push('\n');

// Collect model info from manifest (preferred) or project (fallback)
struct ModelDotInfo<'a> {
name: &'a str,
materialization: ff_core::config::Materialization,
depends_on: Vec<&'a str>,
external_deps: Vec<&'a str>,
}
let model_infos: Vec<ModelDotInfo<'_>> = if let Some(ref manifest) = manifest {
manifest
.models
.iter()
.map(|(name, model)| ModelDotInfo {
name: name.as_str(),
materialization: model.materialized,
depends_on: model.depends_on.iter().map(|d| d.as_str()).collect(),
external_deps: model.external_deps.iter().map(|e| e.as_str()).collect(),
})
.collect()
} else {
project
.models
.iter()
.map(|(name, model)| ModelDotInfo {
name: name.as_str(),
materialization: model.materialization(project.config.materialization),
depends_on: model.depends_on.iter().map(|d| d.as_str()).collect(),
external_deps: model.external_deps.iter().map(|e| e.as_ref()).collect(),
})
.collect()
};

dot.push_str("\n // Dependencies (edges)\n");
if let Some(ref manifest) = manifest {
// Use manifest for accurate dependencies
for (name, model) in &manifest.models {
for dep in &model.depends_on {
dot.push_str(&format!(" \"{}\" -> \"{}\";\n", dep, name));
}

for ext in &model.external_deps {
let source_node = project
.sources
.iter()
.flat_map(|s| s.tables.iter().map(move |t| (s, t)))
.find(|(_, t)| *ext == t.name)
.map(|(s, t)| format!("{}_{}", s.name, t.name))
.unwrap_or_else(|| ext.to_string());
for info in &model_infos {
let color = match info.materialization {
ff_core::config::Materialization::Table => COLOR_TABLE,
ff_core::config::Materialization::View => COLOR_VIEW,
ff_core::config::Materialization::Incremental => COLOR_INCREMENTAL,
ff_core::config::Materialization::Ephemeral => COLOR_EPHEMERAL,
};
dot.push_str(&format!(
" \"{}\" [label=\"{}\" fillcolor=\"{}\"];\n",
info.name, info.name, color
));
}

dot.push_str(&format!(" \"{}\" -> \"{}\";\n", source_node, name));
}
dot.push('\n');
for info in &model_infos {
for dep in &info.depends_on {
dot.push_str(&format!(" \"{}\" -> \"{}\";\n", dep, info.name));
}
} else {
// Fall back to project model info (may be incomplete)
for (name, model) in &project.models {
for dep in &model.depends_on {
dot.push_str(&format!(" \"{}\" -> \"{}\";\n", dep, name));
}
for ext in &model.external_deps {
let source_node = project
.sources
.iter()
.flat_map(|s| s.tables.iter().map(move |t| (s, t)))
.find(|(_, t)| *ext == t.name)
.map(|(s, t)| format!("{}_{}", s.name, t.name))
.unwrap_or_else(|| ext.to_string());

dot.push_str(&format!(" \"{}\" -> \"{}\";\n", source_node, name));
}
for ext in &info.external_deps {
let source_node = project
.sources
.iter()
.flat_map(|s| s.tables.iter().map(move |t| (s, t)))
.find(|(_, t)| *ext == t.name)
.map(|(s, t)| format!("{}_{}", s.name, t.name))
.unwrap_or_else(|| ext.to_string());
dot.push_str(&format!(" \"{}\" -> \"{}\";\n", source_node, info.name));
}
}

Expand Down
10 changes: 4 additions & 6 deletions crates/ff-cli/src/commands/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@ pub(crate) async fn execute(args: &ParseArgs, global: &GlobalArgs) -> Result<()>
.with_context(|| format!("Failed to parse SQL for model: {}", name))?;

let deps = extract_dependencies(&statements);
let all_tables: Vec<String> = deps.iter().cloned().collect();

let (model_deps, ext_deps) = ff_sql::extractor::categorize_dependencies(
deps.clone(),
&known_models,
&external_tables,
);
let (model_deps, ext_deps) =
ff_sql::extractor::categorize_dependencies(deps, &known_models, &external_tables);

dep_map.insert(name.clone(), model_deps.clone());

Expand All @@ -58,7 +56,7 @@ pub(crate) async fn execute(args: &ParseArgs, global: &GlobalArgs) -> Result<()>
path: model.path.display().to_string(),
model_dependencies: model_deps,
external_dependencies: ext_deps,
all_tables: deps.into_iter().collect(),
all_tables,
});
}

Expand Down
36 changes: 18 additions & 18 deletions crates/ff-cli/src/commands/run/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,24 +342,24 @@ fn resolve_deferred_dependencies(
// Also check for transitive dependencies of deferred models
let mut to_check: Vec<String> = deferred_models.iter().cloned().collect();
while let Some(model_name) = to_check.pop() {
if let Some(manifest_model) = deferred_manifest.get_model(&model_name) {
for dep in &manifest_model.depends_on {
let dep_str = dep.as_str();
if !selected_set.contains(dep_str)
&& !deferred_models.contains(dep_str)
&& deferred_manifest.get_model(dep_str).is_some()
{
deferred_models.insert(dep_str.to_string());
to_check.push(dep_str.to_string());
if global.verbose {
eprintln!(
"[verbose] Deferring {} to production manifest (transitive)",
dep
);
}
}
// Note: Don't fail on transitive deps missing from manifest
// They might be external tables or already executed
let Some(manifest_model) = deferred_manifest.get_model(&model_name) else {
continue;
};
for dep in &manifest_model.depends_on {
let dep_str = dep.as_str();
if selected_set.contains(dep_str)
|| deferred_models.contains(dep_str)
|| deferred_manifest.get_model(dep_str).is_none()
{
continue;
}
deferred_models.insert(dep_str.to_string());
to_check.push(dep_str.to_string());
if global.verbose {
eprintln!(
"[verbose] Deferring {} to production manifest (transitive)",
dep
);
}
}
}
Expand Down
Loading