From aedf424e09ed93af2d89945288133671dce0805e Mon Sep 17 00:00:00 2001 From: Damon McCullough Date: Thu, 30 Apr 2026 13:41:44 -0400 Subject: [PATCH 1/4] include production row counts in validation outputs --- .../cscl/poc_validation/summarize_diffs.py | 30 ++++++++++++------- .../cscl/poc_validation/validate_outputs.sh | 23 ++++++++++---- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/products/cscl/poc_validation/summarize_diffs.py b/products/cscl/poc_validation/summarize_diffs.py index 570f92899e..e6e247699a 100644 --- a/products/cscl/poc_validation/summarize_diffs.py +++ b/products/cscl/poc_validation/summarize_diffs.py @@ -14,16 +14,7 @@ VALIDATION_DIR = Path("output/validation_output") OUTPUT_PATH = VALIDATION_DIR / "diffs_summary.csv" - -FIELDNAMES = [ - "Group", - "File name", - "Table name", - "Name", - "Has diffs", - "# of rows with diffs", - "DE Note", -] +VALIDATION_SUMMARY_PATH = VALIDATION_DIR / "validation_summary.csv" # Files to exclude from the summary EXCLUDE = { @@ -31,6 +22,7 @@ "source_data_versions.csv", "build_metadata.json", "diffs_summary.csv", + "validation_summary.csv", "log.csv", } @@ -73,9 +65,17 @@ def count_diff_rows(path: Path) -> int: return sum(1 for line in text.splitlines() if line.strip()) +def load_prod_row_counts() -> dict[str, int]: + with VALIDATION_SUMMARY_PATH.open(newline="") as f: + return { + row["filename"]: int(row["prod_row_count"]) for row in csv.DictReader(f) + } + + def main() -> None: filename_to_table = build_filename_to_table_name() export_name_to_group = build_export_name_to_group() + prod_row_counts = load_prod_row_counts() files = sorted( p for p in VALIDATION_DIR.iterdir() if p.is_file() and p.name not in EXCLUDE @@ -83,7 +83,11 @@ def main() -> None: rows = [] for path in files: + prod_row_count = prod_row_counts.get(path.name) diff_count = count_diff_rows(path) + percennt_of_rows_with_diffs = ( + (diff_count / prod_row_count) if prod_row_count else None + ) table_name = filename_to_table.get(path.name, "") group_name = export_name_to_group.get(table_name, "") display_name = " ".join([group_name, path.name.split(".")[0]]).strip() @@ -94,7 +98,11 @@ def main() -> None: "Table name": table_name, "Name": display_name, "Has diffs": diff_count > 0, + "# of rows in prod": prod_row_count, "# of rows with diffs": diff_count, + "% of rows with diffs": f"{percennt_of_rows_with_diffs:.2%}" + if percennt_of_rows_with_diffs is not None + else None, "DE Note": "", } ) @@ -102,7 +110,7 @@ def main() -> None: rows.sort(key=lambda r: r["Group"]) with OUTPUT_PATH.open("w", newline="") as f: - writer = csv.DictWriter(f, fieldnames=FIELDNAMES) + writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()) if rows else []) writer.writeheader() writer.writerows(rows) diff --git a/products/cscl/poc_validation/validate_outputs.sh b/products/cscl/poc_validation/validate_outputs.sh index d321a1ee14..56bd6d3490 100755 --- a/products/cscl/poc_validation/validate_outputs.sh +++ b/products/cscl/poc_validation/validate_outputs.sh @@ -1,10 +1,20 @@ #!/bin/bash -# Expects two folders in current directory -# output - contains outputs of build -# .data/prod - contains "production" 25a (or whatever version) for comparison +# Compares dev build output files against production files and writes per-file diff results. +# +# For each file in output/, performs a line-level comparison against the matching file in +# .data/prod/ and writes the mismatched (dev-only) rows to output/validation_output/. +# Also writes a summary CSV (validation_summary.csv) with per-file prod row counts and +# mismatched row counts. +# +# Expects two folders in the current directory: +# output/ - contains outputs of the current dev build +# .data/prod/ - contains the production files to compare against mkdir output/validation_output +csv_file="output/validation_output/validation_summary.csv" +echo "filename,prod_row_count,mismatched_rows" > "$csv_file" + total_records=0 total_mismatched=0 for filepath in output/*; do @@ -14,9 +24,9 @@ for filepath in output/*; do fi echo "Validating $file" - n_records="$(cat output/$file | wc -l | awk '{print $1}')" - echo "Total records: $n_records" - total_records=$(($total_records + $n_records)) + prod_row_count="$(cat .data/prod/$file | wc -l | awk '{print $1}')" + echo "Total records: $prod_row_count" + total_records=$(($total_records + $prod_row_count)) mismatched_rows=$(comm -23 <(sort output/$file) <(sort .data/prod/$file)) if [ -z "$mismatched_rows" ]; then @@ -28,6 +38,7 @@ for filepath in output/*; do total_mismatched=$(($total_mismatched + $n_mismatched)) echo -e "$mismatched_rows" > output/validation_output/$file + echo "$file,$prod_row_count,$n_mismatched" >> "$csv_file" echo "" done From 53fb10ca2096d30ab72a6283dafcd776f89cd3cd Mon Sep 17 00:00:00 2001 From: Damon McCullough Date: Sun, 3 May 2026 20:41:43 -0400 Subject: [PATCH 2/4] add script to run CSCL validation independently of builds --- .../cscl/poc_validation/run_validation.py | 87 +++++++++++++++++++ .../cscl/poc_validation/validate_outputs.sh | 2 +- 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 products/cscl/poc_validation/run_validation.py diff --git a/products/cscl/poc_validation/run_validation.py b/products/cscl/poc_validation/run_validation.py new file mode 100644 index 0000000000..6c232af3a4 --- /dev/null +++ b/products/cscl/poc_validation/run_validation.py @@ -0,0 +1,87 @@ +""" +Downloads a CSCL build's output files and the corresponding prod files from S3, +then runs diff validation locally — without needing to re-run the full build. + +Run from the products/cscl directory: + python poc_validation/run_validation.py --build + +The build's output files are downloaded to output/ and the prod files to .data/prod/. +Then validate_outputs.sh is run to generate the diffs in output/validation_output/. + +The prod version is read from the build metadata by default; override with --prod-version. +""" + +import subprocess +from pathlib import Path + +import typer + +from dcpy.configuration import PUBLISHING_BUCKET +from dcpy.connectors.edm import publishing +from dcpy.models.connectors.edm.publishing import BuildKey +from dcpy.utils import s3 + +PRODUCT = "db-cscl" +PROD_BUCKET = "edm-private" +OUTPUT_DIR = Path("output") +PROD_DIR = Path(".data/prod") + +app = typer.Typer(add_completion=False) + + +@app.command() +def run( + build: str = typer.Option(..., "--build", "-b", help="Build name, e.g. nightly_qa"), + prod_version: str | None = typer.Option( + None, + "--prod-version", + "-p", + help="Prod CSCL version to compare against, e.g. 25a. Defaults to version in build metadata.", + ), +) -> None: + """Download build outputs and prod files from S3, then run diff validation locally.""" + build_key = BuildKey(PRODUCT, build) + + if prod_version is None: + print(f"Reading prod version from build metadata for '{build}'...") + prod_version = publishing.get_build_metadata(build_key).version + print(f"Prod version: {prod_version}") + + # Files present in the build S3 folder that are not CSCL output files + BUILD_METADATA_FILES = { + "build_metadata.json", + "log.csv", + "output.zip", + "recipe.lock.yml", + "source_data_versions.csv", + } + + bucket = PUBLISHING_BUCKET + all_filenames = publishing.get_filenames(build_key) + # Only top-level files (no subdirectory separator) are the build outputs + output_filenames = {f for f in all_filenames if "/" not in f} + cscl_output_filenames = output_filenames - BUILD_METADATA_FILES + + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + print(f"\nDownloading {len(cscl_output_filenames)} build output files to {OUTPUT_DIR}/...") + for filename in sorted(cscl_output_filenames): + s3.download_file(bucket, f"{build_key.path}/{filename}", OUTPUT_DIR / filename) + + PROD_DIR.mkdir(parents=True, exist_ok=True) + print(f"\nDownloading prod files (version {prod_version}) to {PROD_DIR}/...") + for filename in sorted(cscl_output_filenames): + s3.download_file( + PROD_BUCKET, + f"cscl_etl/{prod_version}/{filename}", + PROD_DIR / filename, + ) + + print("\nRunning validation...") + subprocess.run( + ["bash", "poc_validation/validate_outputs.sh"], + check=True, + ) + + +if __name__ == "__main__": + app() diff --git a/products/cscl/poc_validation/validate_outputs.sh b/products/cscl/poc_validation/validate_outputs.sh index 56bd6d3490..960aa38299 100755 --- a/products/cscl/poc_validation/validate_outputs.sh +++ b/products/cscl/poc_validation/validate_outputs.sh @@ -10,7 +10,7 @@ # Expects two folders in the current directory: # output/ - contains outputs of the current dev build # .data/prod/ - contains the production files to compare against -mkdir output/validation_output +mkdir -p output/validation_output csv_file="output/validation_output/validation_summary.csv" echo "filename,prod_row_count,mismatched_rows" > "$csv_file" From 22f2a4f236c28d43201938e78d559849e7fbd4da Mon Sep 17 00:00:00 2001 From: Damon McCullough Date: Sun, 3 May 2026 21:01:35 -0400 Subject: [PATCH 3/4] add diff generation to CSCL page --- apps/qa/src/pages/cscl/cscl.py | 24 ++++ apps/qa/src/pages/cscl/helpers.py | 36 +++++- .../cscl/poc_validation/run_validation.py | 118 ++++++++++++------ .../cscl/poc_validation/validate_outputs.sh | 5 + 4 files changed, 141 insertions(+), 42 deletions(-) diff --git a/apps/qa/src/pages/cscl/cscl.py b/apps/qa/src/pages/cscl/cscl.py index 32b7b79fa6..b2e29c08a0 100644 --- a/apps/qa/src/pages/cscl/cscl.py +++ b/apps/qa/src/pages/cscl/cscl.py @@ -55,3 +55,27 @@ def cscl(): ), }, ) + + st.subheader("Diff Row Viewer") + files_with_diffs = diffs_summary[diffs_summary["Has diffs"]]["File name"].tolist() + if not files_with_diffs: + st.info("No files with diffs.") + return + + selected_file = st.selectbox( + "Select a file to view diff rows", + options=files_with_diffs, + ) + + if selected_file: + with st.spinner(f"Loading diff rows for {selected_file}..."): + diff_rows = helpers.get_diff_rows(selected_build, selected_file) + st.caption( + f"{len(diff_rows)} rows present in dev but not in prod (prod version {prod_version})" + ) + st.text_area( + label="Diff rows", + value="\n".join(diff_rows), + height=400, + label_visibility="collapsed", + ) diff --git a/apps/qa/src/pages/cscl/helpers.py b/apps/qa/src/pages/cscl/helpers.py index b5bb9b9f50..727d87e896 100644 --- a/apps/qa/src/pages/cscl/helpers.py +++ b/apps/qa/src/pages/cscl/helpers.py @@ -2,8 +2,11 @@ import streamlit as st from dcpy.connectors.edm import publishing +from dcpy.models.connectors.edm.publishing import BuildKey +from dcpy.utils import s3 PRODUCT = "db-cscl" +PROD_BUCKET = "edm-private" @st.cache_data(show_spinner=False) @@ -13,11 +16,40 @@ def get_builds() -> list[str]: @st.cache_data(show_spinner=False) def get_build_version(build: str) -> str: - product_key = publishing.BuildKey(PRODUCT, build) + product_key = BuildKey(PRODUCT, build) return publishing.get_build_metadata(product_key).version @st.cache_data(show_spinner=False) def get_diffs_summary(build: str) -> pd.DataFrame: - product_key = publishing.BuildKey(PRODUCT, build) + product_key = BuildKey(PRODUCT, build) return publishing.read_csv(product_key, "validation_output/diffs_summary.csv") + + +@st.cache_data(show_spinner=False) +def get_diff_rows(build: str, filename: str) -> list[str]: + """Stream dev and prod files from S3 and return lines present in dev but not prod. + + Equivalent to comm -23 <(sort dev) <(sort prod). + Cached so repeated selections don't re-fetch from S3. + """ + build_key = BuildKey(PRODUCT, build) + prod_version = get_build_version(build) + + dev_buffer = publishing.get_file(build_key, filename) + dev_lines = { + line + for line in dev_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + prod_buffer = s3.get_file_as_stream( + PROD_BUCKET, f"cscl_etl/{prod_version}/{filename}" + ) + prod_lines = { + line + for line in prod_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + return sorted(dev_lines - prod_lines) diff --git a/products/cscl/poc_validation/run_validation.py b/products/cscl/poc_validation/run_validation.py index 6c232af3a4..3b13430847 100644 --- a/products/cscl/poc_validation/run_validation.py +++ b/products/cscl/poc_validation/run_validation.py @@ -1,34 +1,68 @@ """ -Downloads a CSCL build's output files and the corresponding prod files from S3, -then runs diff validation locally — without needing to re-run the full build. +Streams CSCL build outputs and production files from S3 and computes diffs — without +downloading input files to disk or re-running the full build. Run from the products/cscl directory: python poc_validation/run_validation.py --build -The build's output files are downloaded to output/ and the prod files to .data/prod/. -Then validate_outputs.sh is run to generate the diffs in output/validation_output/. +Diff results are written to output/validation_output/ (one file per CSCL output file) +along with a validation_summary.csv. No input files are written to disk. The prod version is read from the build metadata by default; override with --prod-version. """ -import subprocess +import csv from pathlib import Path import typer -from dcpy.configuration import PUBLISHING_BUCKET from dcpy.connectors.edm import publishing from dcpy.models.connectors.edm.publishing import BuildKey from dcpy.utils import s3 PRODUCT = "db-cscl" PROD_BUCKET = "edm-private" -OUTPUT_DIR = Path("output") -PROD_DIR = Path(".data/prod") +VALIDATION_OUTPUT_DIR = Path("output/validation_output") + +# Files present in the build S3 folder that are not CSCL output files +BUILD_METADATA_FILES = { + "build_metadata.json", + "log.csv", + "output.zip", + "recipe.lock.yml", + "source_data_versions.csv", +} app = typer.Typer(add_completion=False) +def _compute_file_diff( + build_key: BuildKey, filename: str, prod_version: str +) -> tuple[list[str], int]: + """Stream both files from S3 and return (diff_rows, prod_row_count). + + diff_rows contains lines present in the dev build but not in prod, + equivalent to comm -23 <(sort dev) <(sort prod). + """ + dev_buffer = publishing.get_file(build_key, filename) + dev_lines = { + line + for line in dev_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + prod_buffer = s3.get_file_as_stream( + PROD_BUCKET, f"cscl_etl/{prod_version}/{filename}" + ) + prod_lines = { + line + for line in prod_buffer.read().decode("latin-1").splitlines() + if line.strip() + } + + return sorted(dev_lines - prod_lines), len(prod_lines) + + @app.command() def run( build: str = typer.Option(..., "--build", "-b", help="Build name, e.g. nightly_qa"), @@ -39,7 +73,7 @@ def run( help="Prod CSCL version to compare against, e.g. 25a. Defaults to version in build metadata.", ), ) -> None: - """Download build outputs and prod files from S3, then run diff validation locally.""" + """Stream build outputs and prod files from S3 and compute diffs without using disk.""" build_key = BuildKey(PRODUCT, build) if prod_version is None: @@ -47,40 +81,44 @@ def run( prod_version = publishing.get_build_metadata(build_key).version print(f"Prod version: {prod_version}") - # Files present in the build S3 folder that are not CSCL output files - BUILD_METADATA_FILES = { - "build_metadata.json", - "log.csv", - "output.zip", - "recipe.lock.yml", - "source_data_versions.csv", - } - - bucket = PUBLISHING_BUCKET all_filenames = publishing.get_filenames(build_key) - # Only top-level files (no subdirectory separator) are the build outputs - output_filenames = {f for f in all_filenames if "/" not in f} - cscl_output_filenames = output_filenames - BUILD_METADATA_FILES - - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - print(f"\nDownloading {len(cscl_output_filenames)} build output files to {OUTPUT_DIR}/...") - for filename in sorted(cscl_output_filenames): - s3.download_file(bucket, f"{build_key.path}/{filename}", OUTPUT_DIR / filename) - - PROD_DIR.mkdir(parents=True, exist_ok=True) - print(f"\nDownloading prod files (version {prod_version}) to {PROD_DIR}/...") - for filename in sorted(cscl_output_filenames): - s3.download_file( - PROD_BUCKET, - f"cscl_etl/{prod_version}/{filename}", - PROD_DIR / filename, + cscl_output_filenames = sorted( + f for f in all_filenames if "/" not in f and f not in BUILD_METADATA_FILES + ) + print(f"\nValidating {len(cscl_output_filenames)} files...") + + VALIDATION_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + summary_rows = [] + total_records = 0 + total_mismatched = 0 + + for filename in cscl_output_filenames: + print(f"Validating {filename}") + diff_rows, prod_row_count = _compute_file_diff( + build_key, filename, prod_version ) + n_mismatched = len(diff_rows) + print(f" Total records: {prod_row_count}") + print(f" Mismatched records: {n_mismatched}") - print("\nRunning validation...") - subprocess.run( - ["bash", "poc_validation/validate_outputs.sh"], - check=True, - ) + total_records += prod_row_count + total_mismatched += n_mismatched + + (VALIDATION_OUTPUT_DIR / filename).write_text( + "\n".join(diff_rows), encoding="latin-1" + ) + summary_rows.append((filename, prod_row_count, n_mismatched)) + + summary_path = VALIDATION_OUTPUT_DIR / "validation_summary.csv" + with summary_path.open("w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["filename", "prod_row_count", "mismatched_rows"]) + writer.writerows(summary_rows) + + print("\nComparison complete!") + print(f"Total records: {total_records}") + print(f"Mismatched records: {total_mismatched}") + print(f"Summary written to {summary_path}") if __name__ == "__main__": diff --git a/products/cscl/poc_validation/validate_outputs.sh b/products/cscl/poc_validation/validate_outputs.sh index 960aa38299..dfe5a333d2 100755 --- a/products/cscl/poc_validation/validate_outputs.sh +++ b/products/cscl/poc_validation/validate_outputs.sh @@ -2,6 +2,11 @@ # Compares dev build output files against production files and writes per-file diff results. # +# NOTE: This script is a legacy local tool. The primary method for generating diffs is +# poc_validation/run_validation.py, which streams files from S3 without requiring local +# copies and is used by both the nightly build and the QA app. +# Use this script only if you have local copies of both output/ and .data/prod/ already. +# # For each file in output/, performs a line-level comparison against the matching file in # .data/prod/ and writes the mismatched (dev-only) rows to output/validation_output/. # Also writes a summary CSV (validation_summary.csv) with per-file prod row counts and From 482d42f4912746dd11256208ab662e5e92f6bcce Mon Sep 17 00:00:00 2001 From: Damon McCullough Date: Sun, 3 May 2026 21:45:42 -0400 Subject: [PATCH 4/4] add views of relevant dbt QA models --- apps/qa/src/pages/cscl/cscl.py | 23 ++++++++++++++++++ apps/qa/src/pages/cscl/helpers.py | 40 ++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/apps/qa/src/pages/cscl/cscl.py b/apps/qa/src/pages/cscl/cscl.py index b2e29c08a0..67959d327e 100644 --- a/apps/qa/src/pages/cscl/cscl.py +++ b/apps/qa/src/pages/cscl/cscl.py @@ -79,3 +79,26 @@ def cscl(): height=400, label_visibility="collapsed", ) + + st.subheader("Field-Level Diffs (dbt QA Models)") + st.markdown( + body=""" + These tables are generated by dbt models in `models/etl_dev_qa/` and provide + field-level comparisons between dev and prod outputs. + Not all output files have a corresponding dbt model. + """ + ) + for group, tables in helpers.DBT_QA_TABLES.items(): + with st.expander(group): + selected_table = st.selectbox( + "Select a table", + options=tables, + key=f"dbt_table_{group}", + ) + if selected_table: + try: + with st.spinner(f"Loading {selected_table}..."): + df = helpers.get_dbt_qa_table(selected_build, selected_table) + st.dataframe(df, use_container_width=True) + except Exception as e: + st.warning(f"Could not load `{selected_table}`: {e}") diff --git a/apps/qa/src/pages/cscl/helpers.py b/apps/qa/src/pages/cscl/helpers.py index 727d87e896..0de8cabce3 100644 --- a/apps/qa/src/pages/cscl/helpers.py +++ b/apps/qa/src/pages/cscl/helpers.py @@ -3,11 +3,33 @@ from dcpy.connectors.edm import publishing from dcpy.models.connectors.edm.publishing import BuildKey -from dcpy.utils import s3 +from dcpy.utils import postgres, s3 PRODUCT = "db-cscl" PROD_BUCKET = "edm-private" +# dbt QA models in etl_dev_qa/ and the files they relate to (for display grouping) +# Schema = build name (BUILD_ENGINE_SCHEMA is set to the build name in CI) +DBT_QA_TABLES: dict[str, list[str]] = { + "LION DAT": [ + "qa__lion_dat_summary", + "qa__lion_dat_by_row", + "qa__lion_dat_individual_diffs", + ], + "ThinLION": [ + "qa__thinlion_all_comparison", + "qa__thinlion_bronx_comparison", + "qa__thinlion_brooklyn_comparison", + "qa__thinlion_manhattan_comparison", + "qa__thinlion_queens_comparison", + "qa__thinlion_statenisland_comparison", + ], + "RPL": [ + "qa__rpl_order", + "qa__rpl_order_diffs", + ], +} + @st.cache_data(show_spinner=False) def get_builds() -> list[str]: @@ -53,3 +75,19 @@ def get_diff_rows(build: str, filename: str) -> list[str]: } return sorted(dev_lines - prod_lines) + + +def get_pg_client(build: str) -> postgres.PostgresClient: + """Return a PostgresClient scoped to the dbt schema for the given build. + + In CI, BUILD_ENGINE_SCHEMA is set to the build name, so the dbt tables + for build 'nightly_qa' live in schema 'nightly_qa'. + """ + return postgres.PostgresClient(database=PRODUCT, schema=build) + + +@st.cache_data(show_spinner=False) +def get_dbt_qa_table(build: str, table_name: str) -> pd.DataFrame: + """Read a dbt QA model table from Postgres into a DataFrame.""" + client = get_pg_client(build) + return client.read_table_df(table_name)