Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tests/test_project_example_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch):
assert clean_meta["read_params_source"]
assert clean_meta["sql"] == "sql/clean.sql"
assert clean_meta["sql_rendered"] == "data/clean/project_example/2022/_run/clean_rendered.sql"
assert clean_meta["output_profile"]["row_count"] > 0
assert clean_meta["output_profile"]["columns"]
assert any(item["name"] == "regione" for item in clean_meta["output_profile"]["columns"])
assert "debug" not in clean_meta

_assert_no_absolute_paths_in_json_payload(clean_meta, root)
Expand All @@ -158,6 +161,13 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch):
"data/mart/project_example/2022/rd_by_regione.parquet",
"data/mart/project_example/2022/rd_by_provincia.parquet",
]
assert mart_meta["clean_input_profile"]["row_count"] == clean_meta["output_profile"]["row_count"]
assert set(mart_meta["table_profiles"].keys()) == {"rd_by_regione", "rd_by_provincia"}
assert len(mart_meta["transition_profiles"]) == 2
assert {item["target_name"] for item in mart_meta["transition_profiles"]} == {
"rd_by_regione",
"rd_by_provincia",
}
assert mart_meta["tables"] == [
{
"name": "rd_by_regione",
Expand All @@ -172,6 +182,14 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch):
"output": "data/mart/project_example/2022/rd_by_provincia.parquet",
},
]
for item in mart_meta["transition_profiles"]:
assert item["from"] == "clean"
assert item["to"] == "mart"
assert isinstance(item["source_row_count"], int)
assert isinstance(item["target_row_count"], int)
assert isinstance(item["added_columns"], list)
assert isinstance(item["removed_columns"], list)
assert isinstance(item["type_changes"], list)
assert "debug" not in mart_meta

_assert_no_absolute_paths_in_json_payload(mart_meta, root)
Expand Down
21 changes: 17 additions & 4 deletions toolkit/clean/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from toolkit.clean.input_selection import select_raw_input
from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write
from toolkit.core.layer_profile import profile_relation
from toolkit.core.metadata import config_hash_for_year, file_record, write_layer_manifest, write_metadata
from toolkit.core.paths import layer_year_dir, resolve_root, to_root_relative
from toolkit.core.template import build_runtime_template_ctx, public_template_ctx, render_template
Expand Down Expand Up @@ -194,21 +195,30 @@ def _run_sql(
read_cfg: dict[str, Any] | None = None,
read_mode: str = "fallback",
logger=None,
) -> tuple[str, dict[str, Any], int]:
) -> tuple[str, dict[str, Any], dict[str, Any]]:
con = duckdb.connect(":memory:")
try:
read_info = read_raw_to_relation(con, input_files, read_cfg, read_mode, logger)
con.execute(f"CREATE TABLE clean_out AS {sql_query}")
output_profile = profile_relation(con, "clean_out")
output_path.parent.mkdir(parents=True, exist_ok=True)
con.execute(
f"COPY clean_out TO '{sql_path(output_path)}' (FORMAT PARQUET);"
)
row_count: int = con.execute("SELECT count(*) FROM clean_out").fetchone()[0]
return read_info.source, read_info.params_used, row_count
return read_info.source, read_info.params_used, output_profile
finally:
con.close()


def _normalize_output_profile(output_profile: dict[str, Any] | int) -> dict[str, Any]:
if isinstance(output_profile, dict):
return output_profile
return {
"row_count": int(output_profile),
"columns": [],
}


def run_clean(
dataset: str,
year: int,
Expand Down Expand Up @@ -261,14 +271,15 @@ def run_clean(
)

output_path = out_dir / f"{dataset}_{year}_clean.parquet"
read_source_used, read_params_used, output_rows = _run_sql(
read_source_used, read_params_used, output_profile = _run_sql(
input_files,
sql,
output_path,
read_cfg=relation_read_cfg,
read_mode=read_mode,
logger=logger,
)
output_profile = _normalize_output_profile(output_profile)
output_bytes: int | None = output_path.stat().st_size if output_path.exists() else None

outputs = [file_record(output_path)]
Expand All @@ -289,6 +300,7 @@ def run_clean(
outputs=outputs,
policy=policy,
)
metadata_payload["output_profile"] = output_profile
metadata_path = write_metadata(
out_dir,
metadata_payload,
Expand All @@ -303,4 +315,5 @@ def run_clean(
warnings_count=None,
)
logger.info(f"CLEAN -> {output_path}")
output_rows = int(output_profile.get("row_count") or 0)
return {"output_rows": output_rows, "output_bytes": output_bytes}
85 changes: 85 additions & 0 deletions toolkit/core/layer_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from __future__ import annotations

from pathlib import Path
from typing import Any

import duckdb


def _q_ident(value: str) -> str:
return '"' + value.replace('"', '""') + '"'


def profile_relation(con: duckdb.DuckDBPyConnection, relation_name: str) -> dict[str, Any]:
q_relation = _q_ident(relation_name)
row_count = int(con.execute(f"SELECT COUNT(*) FROM {q_relation}").fetchone()[0])
described = con.execute(f"DESCRIBE {q_relation}").fetchall()
columns = [{"name": row[0], "type": row[1]} for row in described]
return {
"row_count": row_count,
"columns": columns,
}


def profile_parquet_files(files: list[Path]) -> dict[str, Any]:
if not files:
raise ValueError("Cannot profile empty parquet file list")

con = duckdb.connect(":memory:")
try:
if len(files) == 1:
con.execute(
f"CREATE VIEW profiled_input AS SELECT * FROM read_parquet('{files[0].as_posix()}')"
)
else:
paths = ",".join(f"'{path.as_posix()}'" for path in files)
con.execute(
f"CREATE VIEW profiled_input AS SELECT * FROM read_parquet([{paths}])"
)
return profile_relation(con, "profiled_input")
finally:
con.close()


def compare_layer_profiles(
source: dict[str, Any] | None,
target: dict[str, Any] | None,
*,
source_layer: str,
target_layer: str,
target_name: str | None = None,
) -> dict[str, Any] | None:
if source is None or target is None:
return None

source_columns = {item["name"]: item["type"] for item in source.get("columns", [])}
target_columns = {item["name"]: item["type"] for item in target.get("columns", [])}

source_names = set(source_columns.keys())
target_names = set(target_columns.keys())
shared_names = sorted(source_names & target_names)

type_changes = []
for name in shared_names:
if source_columns[name] != target_columns[name]:
type_changes.append(
{
"column": name,
"from": source_columns[name],
"to": target_columns[name],
}
)

payload = {
"from": source_layer,
"to": target_layer,
"source_row_count": source.get("row_count"),
"target_row_count": target.get("row_count"),
"row_count_delta": (target.get("row_count") or 0) - (source.get("row_count") or 0),
"added_columns": sorted(target_names - source_names),
"removed_columns": sorted(source_names - target_names),
"type_changes": type_changes,
}
if target_name is not None:
payload["target_name"] = target_name
return payload
21 changes: 20 additions & 1 deletion toolkit/mart/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import duckdb

from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write
from toolkit.core.layer_profile import compare_layer_profiles, profile_relation, profile_parquet_files
from toolkit.core.metadata import config_hash_for_year, file_record, write_layer_manifest, write_metadata
from toolkit.core.paths import layer_year_dir, resolve_root, to_root_relative
from toolkit.core.support import flatten_support_template_ctx, resolve_support_payloads
Expand Down Expand Up @@ -61,6 +62,7 @@ def run_mart(
clean_files = list(clean_dir.glob("*.parquet"))
if not clean_files:
raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}")
clean_input_profile = profile_parquet_files(clean_files) if clean_files else None

con = duckdb.connect(":memory:")
try:
Expand Down Expand Up @@ -95,6 +97,8 @@ def run_mart(

written: list[Path] = []
executed: list[dict[str, Any]] = []
table_profiles: dict[str, Any] = {}
transition_profiles: list[dict[str, Any]] = []
debug_tables: list[dict[str, Any]] = []
total_rows = 0

Expand Down Expand Up @@ -127,12 +131,24 @@ def run_mart(

# Create table and export
con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}")
total_rows += con.execute(f"SELECT count(*) FROM {name}").fetchone()[0]
output_profile = profile_relation(con, name)
row_count = int(output_profile.get("row_count") or 0)
total_rows += row_count

out = mart_dir / f"{name}.parquet"
con.execute(f"COPY {name} TO '{out}' (FORMAT PARQUET);")

written.append(out)
table_profiles[name] = output_profile
transition_profile = compare_layer_profiles(
clean_input_profile,
output_profile,
source_layer="clean",
target_layer="mart",
target_name=name,
)
if transition_profile is not None:
transition_profiles.append(transition_profile)
executed.append(
{
"name": name,
Expand Down Expand Up @@ -165,6 +181,9 @@ def run_mart(
"output_paths": [_serialize_metadata_path(p, root_dir) for p in written],
"template_ctx": public_template_ctx(template_ctx),
"tables": executed,
"clean_input_profile": clean_input_profile,
"table_profiles": table_profiles,
"transition_profiles": transition_profiles,
}
if policy == ARTIFACT_POLICY_DEBUG:
metadata_payload["debug"] = {
Expand Down
Loading