Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
]
}
]
},
"env": {
"CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS": "1"
},
"permissions": {
"allow": [
Expand Down
56 changes: 1 addition & 55 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,6 @@ A Rust CLI tool for SQL templating and execution, similar to dbt.
- duckdb-rs (database, bundled feature)
- tokio (async runtime)

## Project Structure
- `crates/ff-cli`: Main binary, subcommands in `commands/` module
- `crates/ff-core`: Shared types, config, DAG logic
- `crates/ff-jinja`: Template rendering (config, var functions only)
- `crates/ff-sql`: SQL parsing, table extraction from AST
- `crates/ff-db`: Database trait + DuckDB implementation
- `crates/ff-test`: Schema test generation (unique, not_null)

## Key Commands
```bash
make build # Build all crates
make test # Run all tests
make lint # Run clippy + fmt check
make ci # Full CI check locally
cargo run -p ff-cli -- <subcommand>
```

## Architecture Notes
- Dependencies extracted from SQL AST via `visit_relations`, NOT Jinja functions
Expand All @@ -46,47 +30,9 @@ YAML determines the resource type:
| `function` | `<name>.sql` | User-defined SQL function |
| `python` | `<name>.py` | Python transformation (planned) |

### Unified node_paths layout (preferred)
```yaml
# featherflow.yml
node_paths: ["nodes"]
```
```
nodes/
stg_orders/
stg_orders.sql
stg_orders.yml # kind: sql
raw_orders/
raw_orders.csv
raw_orders.yml # kind: seed
raw_ecommerce/
raw_ecommerce.yml # kind: source
cents_to_dollars/
cents_to_dollars.sql
cents_to_dollars.yml # kind: function
```

### Legacy per-type layout (still supported)
```yaml
# featherflow.yml
model_paths: ["models"]
source_paths: ["sources"]
function_paths: ["functions"]
```

Legacy kind values (`model`, `sources`, `functions`) are normalised to their
modern equivalents (`sql`, `source`, `function`) automatically.

## Testing
- All tests: `make test`
- Unit tests only: `make test-unit`
- Integration tests: `make test-integration`
- Verbose output: `make test-verbose`
- Test fixtures in `tests/fixtures/sample_project/`
- Seed data in `testdata/seeds/`

## Code Style
- Use `?` for error propagation, add `.context()` at boundaries
- Prefer `impl Trait` over `Box<dyn Trait>` where possible
- All public items need rustdoc comments
- No unwrap() except in tests
- End to end test harness: `make ci-e2e`
31 changes: 30 additions & 1 deletion crates/ff-analysis/src/datafusion_bridge/lineage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! its source columns — whether it's a direct copy, a transformation,
//! or merely inspected (e.g. in a WHERE clause).

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use datafusion_expr::{Expr, LogicalPlan};
use ff_core::ModelName;
Expand Down Expand Up @@ -182,6 +182,35 @@ fn collect_column_refs(expr: &Expr, refs: &mut Vec<(String, String)>) {
});
}

/// Extract alias → real table name mappings from a LogicalPlan.
///
/// Walks the plan tree looking for `SubqueryAlias` nodes wrapping `TableScan`
/// nodes, which represent `FROM table AS alias` patterns. Returns a map from
/// alias name to real table name.
pub fn extract_alias_map(plan: &LogicalPlan) -> HashMap<String, String> {
let mut aliases = HashMap::new();
collect_aliases(plan, &mut aliases);
aliases
}

fn collect_aliases(plan: &LogicalPlan, aliases: &mut HashMap<String, String>) {
match plan {
LogicalPlan::SubqueryAlias(sa) => {
let alias_name = sa.alias.table().to_string();
if let LogicalPlan::TableScan(scan) = sa.input.as_ref() {
aliases.insert(alias_name, scan.table_name.table().to_string());
} else {
collect_aliases(sa.input.as_ref(), aliases);
}
}
_ => {
for input in plan.inputs() {
collect_aliases(input, aliases);
}
}
}
}

/// Deduplicate lineage edges, keeping the first occurrence per (output, source) pair
pub fn deduplicate_edges(edges: &[ColumnLineageEdge]) -> Vec<ColumnLineageEdge> {
let mut seen: HashSet<(&str, &str, &str)> = HashSet::with_capacity(edges.len());
Expand Down
4 changes: 2 additions & 2 deletions crates/ff-analysis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub use types::{parse_sql_type, FloatBitWidth, IntBitWidth, Nullability, SqlType

// DataFusion bridge re-exports
pub use datafusion_bridge::lineage::{
deduplicate_edges, extract_column_lineage as extract_plan_column_lineage, ColumnLineageEdge,
LineageKind, ModelColumnLineage,
deduplicate_edges, extract_alias_map, extract_column_lineage as extract_plan_column_lineage,
ColumnLineageEdge, LineageKind, ModelColumnLineage,
};
pub use datafusion_bridge::planner::sql_to_plan;
pub use datafusion_bridge::propagation::{
Expand Down
10 changes: 10 additions & 0 deletions crates/ff-analysis/src/pass/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub(crate) mod expr_utils;
pub(crate) mod plan_cross_model;
pub(crate) mod plan_description_drift;
pub(crate) mod plan_join_keys;
pub(crate) mod plan_nullability;
pub mod plan_pass;
Expand Down Expand Up @@ -50,6 +51,12 @@ pub enum DiagnosticCode {
A040,
/// A041: Cross-model nullability mismatch
A041,
/// A050: Copy/Rename column with missing description — suggest inheriting from upstream
A050,
/// A051: Copy/Rename column with modified description — potential documentation drift
A051,
/// A052: Transform column with missing description — needs new documentation
A052,
}

impl std::fmt::Display for DiagnosticCode {
Expand Down Expand Up @@ -77,6 +84,9 @@ impl std::str::FromStr for DiagnosticCode {
"A033" => Ok(DiagnosticCode::A033),
"A040" => Ok(DiagnosticCode::A040),
"A041" => Ok(DiagnosticCode::A041),
"A050" => Ok(DiagnosticCode::A050),
"A051" => Ok(DiagnosticCode::A051),
"A052" => Ok(DiagnosticCode::A052),
_ => Err(format!("unknown diagnostic code: {s}")),
}
}
Expand Down
151 changes: 151 additions & 0 deletions crates/ff-analysis/src/pass/plan_description_drift.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//! Description drift detection pass (A050-A052)
//!
//! Checks column-level lineage edges for documentation drift:
//! - A050: Copy/Rename column with missing description — suggest inheriting from upstream
//! - A051: Copy/Rename column with modified description — potential drift
//! - A052: Transform column with missing description — needs new documentation

use std::collections::HashMap;

use ff_core::ModelName;

use crate::context::AnalysisContext;
use crate::datafusion_bridge::propagation::ModelPlanResult;

use super::plan_pass::DagPlanPass;
use super::{Diagnostic, DiagnosticCode, Severity};

/// DAG-level pass that checks description propagation across lineage edges.
pub struct PlanDescriptionDrift;

impl DagPlanPass for PlanDescriptionDrift {
fn name(&self) -> &'static str {
"description_drift"
}

fn description(&self) -> &'static str {
"Detect missing or drifted column descriptions across lineage edges"
}

fn run_project(
&self,
_models: &HashMap<ModelName, ModelPlanResult>,
ctx: &AnalysisContext,
) -> Vec<Diagnostic> {
let mut diagnostics = Vec::new();
let lineage = ctx.lineage();
let project = ctx.project();

// Build description lookup from project schemas
let desc_lookup = build_project_descriptions(project);

for edge in &lineage.edges {
let src_desc = desc_lookup
.get(&edge.source_model)
.and_then(|cols| cols.get(&edge.source_column.to_lowercase()));
let tgt_desc = desc_lookup
.get(&edge.target_model)
.and_then(|cols| cols.get(&edge.target_column.to_lowercase()));

// Only check edges targeting models (skip edges targeting seeds/sources)
if !project.models.contains_key(edge.target_model.as_str()) {
continue;
}

match (edge.is_direct, src_desc, tgt_desc) {
// Copy/Rename with missing target description
(true, Some(_src), None) => {
diagnostics.push(Diagnostic {
code: DiagnosticCode::A050,
severity: Severity::Warning,
message: format!(
"Column '{}' is a direct pass-through from '{}.{}' but has no description — consider inheriting from upstream",
edge.target_column, edge.source_model, edge.source_column
),
model: ModelName::new(&edge.target_model),
column: Some(edge.target_column.clone()),
hint: Some(format!(
"Add a description to '{}' in the YAML schema, or copy it from '{}.{}'",
edge.target_column, edge.source_model, edge.source_column
)),
pass_name: "description_drift".into(),
});
}
// Copy/Rename with modified description
(true, Some(src), Some(tgt)) if src != tgt => {
diagnostics.push(Diagnostic {
code: DiagnosticCode::A051,
severity: Severity::Info,
message: format!(
"Column '{}' is a direct pass-through from '{}.{}' but has a different description — verify this is intentional",
edge.target_column, edge.source_model, edge.source_column
),
model: ModelName::new(&edge.target_model),
column: Some(edge.target_column.clone()),
hint: None,
pass_name: "description_drift".into(),
});
}
// Transform with missing target description
(false, _, None) => {
diagnostics.push(Diagnostic {
code: DiagnosticCode::A052,
severity: Severity::Warning,
message: format!(
"Column '{}' is a transformation but has no description — consider documenting it",
edge.target_column,
),
model: ModelName::new(&edge.target_model),
column: Some(edge.target_column.clone()),
hint: Some(format!(
"Add a description to '{}' in the YAML schema",
edge.target_column
)),
pass_name: "description_drift".into(),
});
}
_ => {}
}
}

diagnostics
}
}

/// Build a lookup of model_name -> { column_name_lowercase -> description }
/// from model YAML schemas and source definitions.
fn build_project_descriptions(
project: &ff_core::Project,
) -> HashMap<String, HashMap<String, String>> {
let mut lookup: HashMap<String, HashMap<String, String>> = HashMap::new();

for (name, model) in &project.models {
if let Some(schema) = &model.schema {
let mut col_descs = HashMap::new();
for col in &schema.columns {
if let Some(ref desc) = col.description {
col_descs.insert(col.name.to_lowercase(), desc.clone());
}
}
if !col_descs.is_empty() {
lookup.insert(name.to_string(), col_descs);
}
}
}

for source_file in &project.sources {
for table in &source_file.tables {
let mut col_descs = HashMap::new();
for col in &table.columns {
if let Some(ref desc) = col.description {
col_descs.insert(col.name.to_lowercase(), desc.clone());
}
}
if !col_descs.is_empty() {
lookup.insert(table.name.clone(), col_descs);
}
}
}

lookup
}
1 change: 1 addition & 0 deletions crates/ff-analysis/src/pass/plan_pass.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl PlanPassManager {
dag_passes: vec![
Box::new(super::plan_unused_columns::PlanUnusedColumns),
Box::new(super::plan_cross_model::CrossModelConsistency),
Box::new(super::plan_description_drift::PlanDescriptionDrift),
],
}
}
Expand Down
Loading