diff --git a/apps/qa/src/pages/cscl/cscl.py b/apps/qa/src/pages/cscl/cscl.py index 32b7b79fa6..67959d327e 100644 --- a/apps/qa/src/pages/cscl/cscl.py +++ b/apps/qa/src/pages/cscl/cscl.py @@ -55,3 +55,50 @@ def cscl(): ), }, ) + + st.subheader("Diff Row Viewer") + files_with_diffs = diffs_summary[diffs_summary["Has diffs"]]["File name"].tolist() + if not files_with_diffs: + st.info("No files with diffs.") + return + + selected_file = st.selectbox( + "Select a file to view diff rows", + options=files_with_diffs, + ) + + if selected_file: + with st.spinner(f"Loading diff rows for {selected_file}..."): + diff_rows = helpers.get_diff_rows(selected_build, selected_file) + st.caption( + f"{len(diff_rows)} rows present in dev but not in prod (prod version {prod_version})" + ) + st.text_area( + label="Diff rows", + value="\n".join(diff_rows), + height=400, + label_visibility="collapsed", + ) + + st.subheader("Field-Level Diffs (dbt QA Models)") + st.markdown( + body=""" + These tables are generated by dbt models in `models/etl_dev_qa/` and provide + field-level comparisons between dev and prod outputs. + Not all output files have a corresponding dbt model. + """ + ) + for group, tables in helpers.DBT_QA_TABLES.items(): + with st.expander(group): + selected_table = st.selectbox( + "Select a table", + options=tables, + key=f"dbt_table_{group}", + ) + if selected_table: + try: + with st.spinner(f"Loading {selected_table}..."): + df = helpers.get_dbt_qa_table(selected_build, selected_table) + st.dataframe(df, use_container_width=True) + except Exception as e: + st.warning(f"Could not load `{selected_table}`: {e}") diff --git a/apps/qa/src/pages/cscl/helpers.py b/apps/qa/src/pages/cscl/helpers.py index b5bb9b9f50..0de8cabce3 100644 --- a/apps/qa/src/pages/cscl/helpers.py +++ b/apps/qa/src/pages/cscl/helpers.py @@ -2,8 +2,33 @@ import streamlit as st from dcpy.connectors.edm import publishing +from dcpy.models.connectors.edm.publishing import BuildKey +from dcpy.utils import postgres, s3 PRODUCT = "db-cscl" +PROD_BUCKET = "edm-private" + +# dbt QA models in etl_dev_qa/ and the files they relate to (for display grouping) +# Schema = build name (BUILD_ENGINE_SCHEMA is set to the build name in CI) +DBT_QA_TABLES: dict[str, list[str]] = { + "LION DAT": [ + "qa__lion_dat_summary", + "qa__lion_dat_by_row", + "qa__lion_dat_individual_diffs", + ], + "ThinLION": [ + "qa__thinlion_all_comparison", + "qa__thinlion_bronx_comparison", + "qa__thinlion_brooklyn_comparison", + "qa__thinlion_manhattan_comparison", + "qa__thinlion_queens_comparison", + "qa__thinlion_statenisland_comparison", + ], + "RPL": [ + "qa__rpl_order", + "qa__rpl_order_diffs", + ], +} @st.cache_data(show_spinner=False) @@ -13,11 +38,56 @@ def get_builds() -> list[str]: @st.cache_data(show_spinner=False) def get_build_version(build: str) -> str: - product_key = publishing.BuildKey(PRODUCT, build) + product_key = BuildKey(PRODUCT, build) return publishing.get_build_metadata(product_key).version @st.cache_data(show_spinner=False) def get_diffs_summary(build: str) -> pd.DataFrame: - product_key = publishing.BuildKey(PRODUCT, build) + product_key = BuildKey(PRODUCT, build) return publishing.read_csv(product_key, "validation_output/diffs_summary.csv") + + +@st.cache_data(show_spinner=False) +def get_diff_rows(build: str, filename: str) -> list[str]: + """Stream dev and prod files from S3 and return lines present in dev but not prod. + + Equivalent to comm -23 <(sort dev) <(sort prod). + Cached so repeated selections don't re-fetch from S3. + """ + build_key = BuildKey(PRODUCT, build) + prod_version = get_build_version(build) + + dev_buffer = publishing.get_file(build_key, filename) + dev_lines = { + line + for line in dev_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + prod_buffer = s3.get_file_as_stream( + PROD_BUCKET, f"cscl_etl/{prod_version}/{filename}" + ) + prod_lines = { + line + for line in prod_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + return sorted(dev_lines - prod_lines) + + +def get_pg_client(build: str) -> postgres.PostgresClient: + """Return a PostgresClient scoped to the dbt schema for the given build. + + In CI, BUILD_ENGINE_SCHEMA is set to the build name, so the dbt tables + for build 'nightly_qa' live in schema 'nightly_qa'. + """ + return postgres.PostgresClient(database=PRODUCT, schema=build) + + +@st.cache_data(show_spinner=False) +def get_dbt_qa_table(build: str, table_name: str) -> pd.DataFrame: + """Read a dbt QA model table from Postgres into a DataFrame.""" + client = get_pg_client(build) + return client.read_table_df(table_name) diff --git a/products/cscl/poc_validation/run_validation.py b/products/cscl/poc_validation/run_validation.py new file mode 100644 index 0000000000..3b13430847 --- /dev/null +++ b/products/cscl/poc_validation/run_validation.py @@ -0,0 +1,125 @@ +""" +Streams CSCL build outputs and production files from S3 and computes diffs — without +downloading input files to disk or re-running the full build. + +Run from the products/cscl directory: + python poc_validation/run_validation.py --build + +Diff results are written to output/validation_output/ (one file per CSCL output file) +along with a validation_summary.csv. No input files are written to disk. + +The prod version is read from the build metadata by default; override with --prod-version. +""" + +import csv +from pathlib import Path + +import typer + +from dcpy.connectors.edm import publishing +from dcpy.models.connectors.edm.publishing import BuildKey +from dcpy.utils import s3 + +PRODUCT = "db-cscl" +PROD_BUCKET = "edm-private" +VALIDATION_OUTPUT_DIR = Path("output/validation_output") + +# Files present in the build S3 folder that are not CSCL output files +BUILD_METADATA_FILES = { + "build_metadata.json", + "log.csv", + "output.zip", + "recipe.lock.yml", + "source_data_versions.csv", +} + +app = typer.Typer(add_completion=False) + + +def _compute_file_diff( + build_key: BuildKey, filename: str, prod_version: str +) -> tuple[list[str], int]: + """Stream both files from S3 and return (diff_rows, prod_row_count). + + diff_rows contains lines present in the dev build but not in prod, + equivalent to comm -23 <(sort dev) <(sort prod). + """ + dev_buffer = publishing.get_file(build_key, filename) + dev_lines = { + line + for line in dev_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + prod_buffer = s3.get_file_as_stream( + PROD_BUCKET, f"cscl_etl/{prod_version}/{filename}" + ) + prod_lines = { + line + for line in prod_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + return sorted(dev_lines - prod_lines), len(prod_lines) + + +@app.command() +def run( + build: str = typer.Option(..., "--build", "-b", help="Build name, e.g. nightly_qa"), + prod_version: str | None = typer.Option( + None, + "--prod-version", + "-p", + help="Prod CSCL version to compare against, e.g. 25a. Defaults to version in build metadata.", + ), +) -> None: + """Stream build outputs and prod files from S3 and compute diffs without using disk.""" + build_key = BuildKey(PRODUCT, build) + + if prod_version is None: + print(f"Reading prod version from build metadata for '{build}'...") + prod_version = publishing.get_build_metadata(build_key).version + print(f"Prod version: {prod_version}") + + all_filenames = publishing.get_filenames(build_key) + cscl_output_filenames = sorted( + f for f in all_filenames if "/" not in f and f not in BUILD_METADATA_FILES + ) + print(f"\nValidating {len(cscl_output_filenames)} files...") + + VALIDATION_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + summary_rows = [] + total_records = 0 + total_mismatched = 0 + + for filename in cscl_output_filenames: + print(f"Validating {filename}") + diff_rows, prod_row_count = _compute_file_diff( + build_key, filename, prod_version + ) + n_mismatched = len(diff_rows) + print(f" Total records: {prod_row_count}") + print(f" Mismatched records: {n_mismatched}") + + total_records += prod_row_count + total_mismatched += n_mismatched + + (VALIDATION_OUTPUT_DIR / filename).write_text( + "\n".join(diff_rows), encoding="latin-1" + ) + summary_rows.append((filename, prod_row_count, n_mismatched)) + + summary_path = VALIDATION_OUTPUT_DIR / "validation_summary.csv" + with summary_path.open("w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["filename", "prod_row_count", "mismatched_rows"]) + writer.writerows(summary_rows) + + print("\nComparison complete!") + print(f"Total records: {total_records}") + print(f"Mismatched records: {total_mismatched}") + print(f"Summary written to {summary_path}") + + +if __name__ == "__main__": + app() diff --git a/products/cscl/poc_validation/summarize_diffs.py b/products/cscl/poc_validation/summarize_diffs.py index 570f92899e..e6e247699a 100644 --- a/products/cscl/poc_validation/summarize_diffs.py +++ b/products/cscl/poc_validation/summarize_diffs.py @@ -14,16 +14,7 @@ VALIDATION_DIR = Path("output/validation_output") OUTPUT_PATH = VALIDATION_DIR / "diffs_summary.csv" - -FIELDNAMES = [ - "Group", - "File name", - "Table name", - "Name", - "Has diffs", - "# of rows with diffs", - "DE Note", -] +VALIDATION_SUMMARY_PATH = VALIDATION_DIR / "validation_summary.csv" # Files to exclude from the summary EXCLUDE = { @@ -31,6 +22,7 @@ "source_data_versions.csv", "build_metadata.json", "diffs_summary.csv", + "validation_summary.csv", "log.csv", } @@ -73,9 +65,17 @@ def count_diff_rows(path: Path) -> int: return sum(1 for line in text.splitlines() if line.strip()) +def load_prod_row_counts() -> dict[str, int]: + with VALIDATION_SUMMARY_PATH.open(newline="") as f: + return { + row["filename"]: int(row["prod_row_count"]) for row in csv.DictReader(f) + } + + def main() -> None: filename_to_table = build_filename_to_table_name() export_name_to_group = build_export_name_to_group() + prod_row_counts = load_prod_row_counts() files = sorted( p for p in VALIDATION_DIR.iterdir() if p.is_file() and p.name not in EXCLUDE @@ -83,7 +83,11 @@ def main() -> None: rows = [] for path in files: + prod_row_count = prod_row_counts.get(path.name) diff_count = count_diff_rows(path) + percennt_of_rows_with_diffs = ( + (diff_count / prod_row_count) if prod_row_count else None + ) table_name = filename_to_table.get(path.name, "") group_name = export_name_to_group.get(table_name, "") display_name = " ".join([group_name, path.name.split(".")[0]]).strip() @@ -94,7 +98,11 @@ def main() -> None: "Table name": table_name, "Name": display_name, "Has diffs": diff_count > 0, + "# of rows in prod": prod_row_count, "# of rows with diffs": diff_count, + "% of rows with diffs": f"{percennt_of_rows_with_diffs:.2%}" + if percennt_of_rows_with_diffs is not None + else None, "DE Note": "", } ) @@ -102,7 +110,7 @@ def main() -> None: rows.sort(key=lambda r: r["Group"]) with OUTPUT_PATH.open("w", newline="") as f: - writer = csv.DictWriter(f, fieldnames=FIELDNAMES) + writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()) if rows else []) writer.writeheader() writer.writerows(rows) diff --git a/products/cscl/poc_validation/validate_outputs.sh b/products/cscl/poc_validation/validate_outputs.sh index d321a1ee14..dfe5a333d2 100755 --- a/products/cscl/poc_validation/validate_outputs.sh +++ b/products/cscl/poc_validation/validate_outputs.sh @@ -1,9 +1,24 @@ #!/bin/bash -# Expects two folders in current directory -# output - contains outputs of build -# .data/prod - contains "production" 25a (or whatever version) for comparison -mkdir output/validation_output +# Compares dev build output files against production files and writes per-file diff results. +# +# NOTE: This script is a legacy local tool. The primary method for generating diffs is +# poc_validation/run_validation.py, which streams files from S3 without requiring local +# copies and is used by both the nightly build and the QA app. +# Use this script only if you have local copies of both output/ and .data/prod/ already. +# +# For each file in output/, performs a line-level comparison against the matching file in +# .data/prod/ and writes the mismatched (dev-only) rows to output/validation_output/. +# Also writes a summary CSV (validation_summary.csv) with per-file prod row counts and +# mismatched row counts. +# +# Expects two folders in the current directory: +# output/ - contains outputs of the current dev build +# .data/prod/ - contains the production files to compare against +mkdir -p output/validation_output + +csv_file="output/validation_output/validation_summary.csv" +echo "filename,prod_row_count,mismatched_rows" > "$csv_file" total_records=0 total_mismatched=0 @@ -14,9 +29,9 @@ for filepath in output/*; do fi echo "Validating $file" - n_records="$(cat output/$file | wc -l | awk '{print $1}')" - echo "Total records: $n_records" - total_records=$(($total_records + $n_records)) + prod_row_count="$(cat .data/prod/$file | wc -l | awk '{print $1}')" + echo "Total records: $prod_row_count" + total_records=$(($total_records + $prod_row_count)) mismatched_rows=$(comm -23 <(sort output/$file) <(sort .data/prod/$file)) if [ -z "$mismatched_rows" ]; then @@ -28,6 +43,7 @@ for filepath in output/*; do total_mismatched=$(($total_mismatched + $n_mismatched)) echo -e "$mismatched_rows" > output/validation_output/$file + echo "$file,$prod_row_count,$n_mismatched" >> "$csv_file" echo "" done