From 258435045cd1e780aa1e5064a7fbe86131e55cc8 Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 4 Apr 2026 16:34:34 +0100 Subject: [PATCH 1/2] Aggiungi profili minimi e confronto clean->mart --- tests/test_project_example_e2e.py | 18 +++++++ toolkit/clean/run.py | 11 ++-- toolkit/core/layer_profile.py | 85 +++++++++++++++++++++++++++++++ toolkit/mart/run.py | 21 +++++++- 4 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 toolkit/core/layer_profile.py diff --git a/tests/test_project_example_e2e.py b/tests/test_project_example_e2e.py index e77d75d..a147dbe 100644 --- a/tests/test_project_example_e2e.py +++ b/tests/test_project_example_e2e.py @@ -137,6 +137,9 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch): assert clean_meta["read_params_source"] assert clean_meta["sql"] == "sql/clean.sql" assert clean_meta["sql_rendered"] == "data/clean/project_example/2022/_run/clean_rendered.sql" + assert clean_meta["output_profile"]["row_count"] > 0 + assert clean_meta["output_profile"]["columns"] + assert clean_meta["output_profile"]["columns"][0]["name"] == "regione" assert "debug" not in clean_meta _assert_no_absolute_paths_in_json_payload(clean_meta, root) @@ -158,6 +161,13 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch): "data/mart/project_example/2022/rd_by_regione.parquet", "data/mart/project_example/2022/rd_by_provincia.parquet", ] + assert mart_meta["clean_input_profile"]["row_count"] == clean_meta["output_profile"]["row_count"] + assert set(mart_meta["table_profiles"].keys()) == {"rd_by_regione", "rd_by_provincia"} + assert len(mart_meta["transition_profiles"]) == 2 + assert {item["target_name"] for item in mart_meta["transition_profiles"]} == { + "rd_by_regione", + "rd_by_provincia", + } assert mart_meta["tables"] == [ { "name": "rd_by_regione", @@ -172,6 +182,14 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch): "output": "data/mart/project_example/2022/rd_by_provincia.parquet", }, ] + for item in mart_meta["transition_profiles"]: + assert item["from"] == "clean" + assert item["to"] == "mart" + assert isinstance(item["source_row_count"], int) + assert isinstance(item["target_row_count"], int) + assert isinstance(item["added_columns"], list) + assert isinstance(item["removed_columns"], list) + assert isinstance(item["type_changes"], list) assert "debug" not in mart_meta _assert_no_absolute_paths_in_json_payload(mart_meta, root) diff --git a/toolkit/clean/run.py b/toolkit/clean/run.py index 3fa9d18..3b8c7ad 100644 --- a/toolkit/clean/run.py +++ b/toolkit/clean/run.py @@ -13,6 +13,7 @@ ) from toolkit.clean.input_selection import select_raw_input from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write +from toolkit.core.layer_profile import profile_relation from toolkit.core.metadata import config_hash_for_year, file_record, write_layer_manifest, write_metadata from toolkit.core.paths import layer_year_dir, resolve_root, to_root_relative from toolkit.core.template import build_runtime_template_ctx, public_template_ctx, render_template @@ -194,17 +195,17 @@ def _run_sql( read_cfg: dict[str, Any] | None = None, read_mode: str = "fallback", logger=None, -) -> tuple[str, dict[str, Any], int]: +) -> tuple[str, dict[str, Any], dict[str, Any]]: con = duckdb.connect(":memory:") try: read_info = read_raw_to_relation(con, input_files, read_cfg, read_mode, logger) con.execute(f"CREATE TABLE clean_out AS {sql_query}") + output_profile = profile_relation(con, "clean_out") output_path.parent.mkdir(parents=True, exist_ok=True) con.execute( f"COPY clean_out TO '{sql_path(output_path)}' (FORMAT PARQUET);" ) - row_count: int = con.execute("SELECT count(*) FROM clean_out").fetchone()[0] - return read_info.source, read_info.params_used, row_count + return read_info.source, read_info.params_used, output_profile finally: con.close() @@ -261,7 +262,7 @@ def run_clean( ) output_path = out_dir / f"{dataset}_{year}_clean.parquet" - read_source_used, read_params_used, output_rows = _run_sql( + read_source_used, read_params_used, output_profile = _run_sql( input_files, sql, output_path, @@ -289,6 +290,7 @@ def run_clean( outputs=outputs, policy=policy, ) + metadata_payload["output_profile"] = output_profile metadata_path = write_metadata( out_dir, metadata_payload, @@ -303,4 +305,5 @@ def run_clean( warnings_count=None, ) logger.info(f"CLEAN -> {output_path}") + output_rows = int(output_profile.get("row_count") or 0) return {"output_rows": output_rows, "output_bytes": output_bytes} diff --git a/toolkit/core/layer_profile.py b/toolkit/core/layer_profile.py new file mode 100644 index 0000000..74ef17c --- /dev/null +++ b/toolkit/core/layer_profile.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import duckdb + + +def _q_ident(value: str) -> str: + return '"' + value.replace('"', '""') + '"' + + +def profile_relation(con: duckdb.DuckDBPyConnection, relation_name: str) -> dict[str, Any]: + q_relation = _q_ident(relation_name) + row_count = int(con.execute(f"SELECT COUNT(*) FROM {q_relation}").fetchone()[0]) + described = con.execute(f"DESCRIBE {q_relation}").fetchall() + columns = [{"name": row[0], "type": row[1]} for row in described] + return { + "row_count": row_count, + "columns": columns, + } + + +def profile_parquet_files(files: list[Path]) -> dict[str, Any]: + if not files: + raise ValueError("Cannot profile empty parquet file list") + + con = duckdb.connect(":memory:") + try: + if len(files) == 1: + con.execute( + f"CREATE VIEW profiled_input AS SELECT * FROM read_parquet('{files[0].as_posix()}')" + ) + else: + paths = ",".join(f"'{path.as_posix()}'" for path in files) + con.execute( + f"CREATE VIEW profiled_input AS SELECT * FROM read_parquet([{paths}])" + ) + return profile_relation(con, "profiled_input") + finally: + con.close() + + +def compare_layer_profiles( + source: dict[str, Any] | None, + target: dict[str, Any] | None, + *, + source_layer: str, + target_layer: str, + target_name: str | None = None, +) -> dict[str, Any] | None: + if source is None or target is None: + return None + + source_columns = {item["name"]: item["type"] for item in source.get("columns", [])} + target_columns = {item["name"]: item["type"] for item in target.get("columns", [])} + + source_names = set(source_columns.keys()) + target_names = set(target_columns.keys()) + shared_names = sorted(source_names & target_names) + + type_changes = [] + for name in shared_names: + if source_columns[name] != target_columns[name]: + type_changes.append( + { + "column": name, + "from": source_columns[name], + "to": target_columns[name], + } + ) + + payload = { + "from": source_layer, + "to": target_layer, + "source_row_count": source.get("row_count"), + "target_row_count": target.get("row_count"), + "row_count_delta": (target.get("row_count") or 0) - (source.get("row_count") or 0), + "added_columns": sorted(target_names - source_names), + "removed_columns": sorted(source_names - target_names), + "type_changes": type_changes, + } + if target_name is not None: + payload["target_name"] = target_name + return payload diff --git a/toolkit/mart/run.py b/toolkit/mart/run.py index 65c4104..e50f0ec 100644 --- a/toolkit/mart/run.py +++ b/toolkit/mart/run.py @@ -7,6 +7,7 @@ import duckdb from toolkit.core.artifacts import ARTIFACT_POLICY_DEBUG, resolve_artifact_policy, should_write +from toolkit.core.layer_profile import compare_layer_profiles, profile_relation, profile_parquet_files from toolkit.core.metadata import config_hash_for_year, file_record, write_layer_manifest, write_metadata from toolkit.core.paths import layer_year_dir, resolve_root, to_root_relative from toolkit.core.support import flatten_support_template_ctx, resolve_support_payloads @@ -61,6 +62,7 @@ def run_mart( clean_files = list(clean_dir.glob("*.parquet")) if not clean_files: raise FileNotFoundError(f"No CLEAN parquet found in {clean_dir}") + clean_input_profile = profile_parquet_files(clean_files) if clean_files else None con = duckdb.connect(":memory:") try: @@ -95,6 +97,8 @@ def run_mart( written: list[Path] = [] executed: list[dict[str, Any]] = [] + table_profiles: dict[str, Any] = {} + transition_profiles: list[dict[str, Any]] = [] debug_tables: list[dict[str, Any]] = [] total_rows = 0 @@ -127,12 +131,24 @@ def run_mart( # Create table and export con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}") - total_rows += con.execute(f"SELECT count(*) FROM {name}").fetchone()[0] + output_profile = profile_relation(con, name) + row_count = int(output_profile.get("row_count") or 0) + total_rows += row_count out = mart_dir / f"{name}.parquet" con.execute(f"COPY {name} TO '{out}' (FORMAT PARQUET);") written.append(out) + table_profiles[name] = output_profile + transition_profile = compare_layer_profiles( + clean_input_profile, + output_profile, + source_layer="clean", + target_layer="mart", + target_name=name, + ) + if transition_profile is not None: + transition_profiles.append(transition_profile) executed.append( { "name": name, @@ -165,6 +181,9 @@ def run_mart( "output_paths": [_serialize_metadata_path(p, root_dir) for p in written], "template_ctx": public_template_ctx(template_ctx), "tables": executed, + "clean_input_profile": clean_input_profile, + "table_profiles": table_profiles, + "transition_profiles": transition_profiles, } if policy == ARTIFACT_POLICY_DEBUG: metadata_payload["debug"] = { From a4bdcf323abbef7f5bfa135a4b94aa150c941d3c Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 4 Apr 2026 16:38:50 +0100 Subject: [PATCH 2/2] Compatta profilo clean con test esistenti --- tests/test_project_example_e2e.py | 2 +- toolkit/clean/run.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/test_project_example_e2e.py b/tests/test_project_example_e2e.py index a147dbe..b43b197 100644 --- a/tests/test_project_example_e2e.py +++ b/tests/test_project_example_e2e.py @@ -139,7 +139,7 @@ def test_project_example_golden_path(tmp_path: Path, monkeypatch): assert clean_meta["sql_rendered"] == "data/clean/project_example/2022/_run/clean_rendered.sql" assert clean_meta["output_profile"]["row_count"] > 0 assert clean_meta["output_profile"]["columns"] - assert clean_meta["output_profile"]["columns"][0]["name"] == "regione" + assert any(item["name"] == "regione" for item in clean_meta["output_profile"]["columns"]) assert "debug" not in clean_meta _assert_no_absolute_paths_in_json_payload(clean_meta, root) diff --git a/toolkit/clean/run.py b/toolkit/clean/run.py index 3b8c7ad..adf5918 100644 --- a/toolkit/clean/run.py +++ b/toolkit/clean/run.py @@ -210,6 +210,15 @@ def _run_sql( con.close() +def _normalize_output_profile(output_profile: dict[str, Any] | int) -> dict[str, Any]: + if isinstance(output_profile, dict): + return output_profile + return { + "row_count": int(output_profile), + "columns": [], + } + + def run_clean( dataset: str, year: int, @@ -270,6 +279,7 @@ def run_clean( read_mode=read_mode, logger=logger, ) + output_profile = _normalize_output_profile(output_profile) output_bytes: int | None = output_path.stat().st_size if output_path.exists() else None outputs = [file_record(output_path)]