Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 102 additions & 79 deletions crates/ff-cli/src/commands/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
let comment_ctx =
common::build_query_comment_context(&project.config, global.target.as_deref());

// Get vars merged with target overrides, then merge with CLI --vars
let base_vars = project.config.get_merged_vars(target.as_deref());
let vars = merge_vars(&base_vars, &args.vars)?;

Expand All @@ -104,7 +103,7 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
let template_ctx = common::build_template_context(&project, global.target.as_deref(), false);
let jinja = JinjaEnvironment::with_context(&vars, &macro_paths, &template_ctx);

// Compile all models first — filtering happens after DAG build
// Filtering happens after DAG build, so compile all models first
let all_model_names: Vec<String> = project
.model_names()
.into_iter()
Expand Down Expand Up @@ -186,17 +185,14 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
}
}

// Validate DAG (always done even in parse-only mode)
let dag = ModelDag::build(&dependencies).context("Failed to build dependency graph")?;
let topo_order = dag
.topological_order()
.context("Circular dependency detected")?;

// Apply selector filtering now that DAG is available
let model_names: Vec<String> = common::resolve_nodes(&project, &dag, &args.nodes)?;
let model_names_set: HashSet<String> = model_names.iter().cloned().collect();

// Filter compiled_models to only those selected
compiled_models.retain(|m| model_names_set.contains(&m.name));

if !json_mode && args.nodes.is_some() {
Expand All @@ -215,7 +211,6 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
);
}

// Static analysis phase (DataFusion LogicalPlan)
if !args.skip_static_analysis {
let analysis_result = run_static_analysis(
&project,
Expand Down Expand Up @@ -273,7 +268,6 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
}

for compiled in compiled_models {
// Skip ephemeral models - they don't get written as files
if compiled.materialization == Materialization::Ephemeral {
if !json_mode {
println!(" \u{2713} {} (ephemeral) [inlined]", compiled.name);
Expand All @@ -283,44 +277,21 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
model: compiled.name,
status: RunStatus::Success,
materialization: "ephemeral".to_string(),
output_path: None, // Ephemeral models don't have output files
output_path: None,
dependencies: compiled.dependencies,
error: None,
});
continue;
}

// Inline ephemeral dependencies into this model's SQL
let final_sql = if !ephemeral_sql.is_empty() {
// Collect ephemeral dependencies for this model
let is_ephemeral =
|name: &str| materializations.get(name) == Some(&Materialization::Ephemeral);
let get_sql = |name: &str| ephemeral_sql.get(name).cloned();

let (ephemeral_deps, order) = collect_ephemeral_dependencies(
&compiled.name,
&dependencies,
is_ephemeral,
get_sql,
);

if !ephemeral_deps.is_empty() {
if global.verbose {
eprintln!(
"[verbose] Inlining {} ephemeral model(s) into {}",
ephemeral_deps.len(),
compiled.name
);
}
inline_ephemeral_ctes(&compiled.sql, &ephemeral_deps, &order)?
} else {
compiled.sql
}
} else {
compiled.sql
};
let final_sql = resolve_final_sql(
&compiled,
&ephemeral_sql,
&dependencies,
&materializations,
global,
)?;

// Write the compiled SQL (with inlined ephemerals)
if args.parse_only {
if !json_mode {
println!(
Expand All @@ -341,47 +312,14 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
error: None,
});
} else {
if let Some(parent) = compiled.output_path.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("Failed to create directory for model: {}", compiled.name)
})?;
}

// Attach query comment to written file, but keep in-memory SQL clean for checksums
let placement = comment_ctx
.as_ref()
.map(|c| c.config.placement)
.unwrap_or_default();
let sql_to_write = match &compiled.query_comment {
Some(comment) => {
ff_core::query_comment::attach_query_comment(&final_sql, comment, placement)
}
None => final_sql.clone(),
};
std::fs::write(&compiled.output_path, &sql_to_write).with_context(|| {
format!("Failed to write compiled SQL for model: {}", compiled.name)
})?;

// Also update the model's compiled_sql with the clean version (no comment)
if let Some(model) = project.get_model_mut(&compiled.name) {
model.compiled_sql = Some(final_sql);
}

if !json_mode {
println!(
" \u{2713} {} ({})",
compiled.name, compiled.materialization
);
}

if global.verbose {
eprintln!(
"[verbose] Compiled {} -> {}",
compiled.name,
compiled.output_path.display()
);
}

write_compiled_output(
&compiled,
&final_sql,
&comment_ctx,
&mut project,
global,
json_mode,
)?;
success_count += 1;
compile_results.push(ModelCompileResult {
model: compiled.name,
Expand Down Expand Up @@ -439,6 +377,91 @@ pub(crate) async fn execute(args: &CompileArgs, global: &GlobalArgs) -> Result<(
Ok(())
}

/// Resolve the final SQL for a non-ephemeral model, inlining any ephemeral dependencies.
fn resolve_final_sql(
compiled: &CompileOutput,
ephemeral_sql: &HashMap<String, String>,
dependencies: &HashMap<String, Vec<String>>,
materializations: &HashMap<String, Materialization>,
global: &GlobalArgs,
) -> Result<String> {
if ephemeral_sql.is_empty() {
return Ok(compiled.sql.clone());
}

let is_ephemeral = |name: &str| materializations.get(name) == Some(&Materialization::Ephemeral);
let get_sql = |name: &str| ephemeral_sql.get(name).cloned();

let (ephemeral_deps, order) =
collect_ephemeral_dependencies(&compiled.name, dependencies, is_ephemeral, get_sql);

if ephemeral_deps.is_empty() {
return Ok(compiled.sql.clone());
}

if global.verbose {
eprintln!(
"[verbose] Inlining {} ephemeral model(s) into {}",
ephemeral_deps.len(),
compiled.name
);
}
Ok(inline_ephemeral_ctes(
&compiled.sql,
&ephemeral_deps,
&order,
)?)
}

/// Write compiled SQL to disk, attaching query comments and updating the in-memory model.
fn write_compiled_output(
compiled: &CompileOutput,
final_sql: &str,
comment_ctx: &Option<ff_core::query_comment::QueryCommentContext>,
project: &mut Project,
global: &GlobalArgs,
json_mode: bool,
) -> Result<()> {
if let Some(parent) = compiled.output_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("Failed to create directory for model: {}", compiled.name))?;
}

let placement = comment_ctx
.as_ref()
.map(|c| c.config.placement)
.unwrap_or_default();
let sql_to_write = match &compiled.query_comment {
Some(comment) => {
ff_core::query_comment::attach_query_comment(final_sql, comment, placement)
}
None => final_sql.to_string(),
};
std::fs::write(&compiled.output_path, &sql_to_write)
.with_context(|| format!("Failed to write compiled SQL for model: {}", compiled.name))?;

if let Some(model) = project.get_model_mut(&compiled.name) {
model.compiled_sql = Some(final_sql.to_string());
}

if !json_mode {
println!(
" \u{2713} {} ({})",
compiled.name, compiled.materialization
);
}

if global.verbose {
eprintln!(
"[verbose] Compiled {} -> {}",
compiled.name,
compiled.output_path.display()
);
}

Ok(())
}

/// Merge extra vars from --vars argument into project vars
fn merge_vars(
project_vars: &HashMap<String, serde_yaml::Value>,
Expand Down
14 changes: 0 additions & 14 deletions crates/ff-cli/src/commands/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,20 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
// Open meta database early (needed for smart build and execution state)
let meta_db = common::open_meta_db(&project);

// Smart build: filter out unchanged models (queries previous execution state)
let smart_skipped: HashSet<String> = if args.smart {
compute_smart_skips(&compiled_models, global, meta_db.as_ref())?
} else {
HashSet::new()
};

// Compute config hash for run state validation
let config_hash = compute_config_hash(&project);

// Determine run state path
let run_state_path = args
.state_file
.as_ref()
.map(|s| Path::new(s).to_path_buf())
.unwrap_or_else(|| project.target_dir().join("run_state.json"));

// Handle resume mode
let (execution_order, previous_run_state) = if args.resume {
state::handle_resume_mode(
&run_state_path,
Expand All @@ -118,7 +114,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
(order, None)
};

// Apply smart build filtering
let execution_order: Vec<String> = if !smart_skipped.is_empty() {
let before = execution_order.len();
let filtered: Vec<String> = execution_order
Expand Down Expand Up @@ -160,7 +155,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
);
}

// Count non-ephemeral models (ephemeral models are inlined, not executed)
let ephemeral_count = execution_order
.iter()
.filter(|name| {
Expand All @@ -172,7 +166,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
.count();
let executable_count = execution_order.len() - ephemeral_count;

// Show resume summary if applicable (text mode only)
if !json_mode {
if let Some(ref prev_state) = previous_run_state {
let summary = prev_state.summary();
Expand All @@ -190,13 +183,11 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
}
}

// Resolve WAP schema from config
let target = ff_core::config::Config::resolve_target(global.target.as_deref());
let wap_schema = project.config.get_wap_schema(target.as_deref());

create_schemas(&db, &compiled_models, global).await?;

// Create WAP schema if configured
if let Some(ws) = wap_schema {
db.create_schema_if_not_exists(ws)
.await
Expand Down Expand Up @@ -226,12 +217,10 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
config_hash,
);

// Save initial run state
if let Err(e) = run_state.save(&run_state_path) {
eprintln!("Warning: Failed to save initial run state: {}", e);
}

// Populate meta database before execution to capture model_id_map (non-fatal)
let meta_ids = meta_db
.as_ref()
.and_then(|db| common::populate_meta_phase1(db, &project, "run", args.nodes.as_deref()));
Expand All @@ -240,7 +229,6 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
None => (None, None),
};

// Resolve database file path for Python model execution
let db_path_str = project.config.database.path.clone();
let db_path_ref = db_path_str.as_str();

Expand All @@ -259,13 +247,11 @@ pub(crate) async fn execute(args: &RunArgs, global: &GlobalArgs) -> Result<()> {
let (run_results, success_count, failure_count, stopped_early) =
execute_models_with_state(&exec_ctx, &mut run_state, &run_state_path).await?;

// Mark run as completed
run_state.mark_run_completed();
if let Err(e) = run_state.save(&run_state_path) {
eprintln!("Warning: Failed to save final run state: {}", e);
}

// Complete meta database run (non-fatal)
if let (Some(ref meta_db), Some(run_id)) = (&meta_db, meta_run_id) {
let status = if failure_count > 0 {
"error"
Expand Down
5 changes: 4 additions & 1 deletion crates/ff-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ impl Config {
path: path.display().to_string(),
source: e,
})?;
let config: Config = serde_yaml::from_str(&content)?;
let config: Config =
serde_yaml::from_str(&content).map_err(|e| CoreError::ConfigParseError {
message: format!("{}: {}", path.display(), e),
})?;
config.validate()?;
Ok(config)
}
Expand Down
1 change: 0 additions & 1 deletion crates/ff-core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl ContractValidationResult {
violation_type,
message: message.into(),
});
// If enforced, mark as failed
if self.enforced {
self.passed = false;
}
Expand Down
Loading