Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions apps/qa/src/pages/cscl/cscl.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,50 @@ def cscl():
),
},
)

st.subheader("Diff Row Viewer")
files_with_diffs = diffs_summary[diffs_summary["Has diffs"]]["File name"].tolist()
if not files_with_diffs:
st.info("No files with diffs.")
return

selected_file = st.selectbox(
"Select a file to view diff rows",
options=files_with_diffs,
)

if selected_file:
with st.spinner(f"Loading diff rows for {selected_file}..."):
diff_rows = helpers.get_diff_rows(selected_build, selected_file)
st.caption(
f"{len(diff_rows)} rows present in dev but not in prod (prod version {prod_version})"
)
st.text_area(
label="Diff rows",
value="\n".join(diff_rows),
height=400,
label_visibility="collapsed",
)

st.subheader("Field-Level Diffs (dbt QA Models)")
st.markdown(
body="""
These tables are generated by dbt models in `models/etl_dev_qa/` and provide
field-level comparisons between dev and prod outputs.
Not all output files have a corresponding dbt model.
"""
)
for group, tables in helpers.DBT_QA_TABLES.items():
with st.expander(group):
selected_table = st.selectbox(
"Select a table",
options=tables,
key=f"dbt_table_{group}",
)
if selected_table:
try:
with st.spinner(f"Loading {selected_table}..."):
df = helpers.get_dbt_qa_table(selected_build, selected_table)
st.dataframe(df, use_container_width=True)
except Exception as e:
st.warning(f"Could not load `{selected_table}`: {e}")
74 changes: 72 additions & 2 deletions apps/qa/src/pages/cscl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,33 @@
import streamlit as st

from dcpy.connectors.edm import publishing
from dcpy.models.connectors.edm.publishing import BuildKey
from dcpy.utils import postgres, s3

PRODUCT = "db-cscl"
PROD_BUCKET = "edm-private"

# dbt QA models in etl_dev_qa/ and the files they relate to (for display grouping)
# Schema = build name (BUILD_ENGINE_SCHEMA is set to the build name in CI)
DBT_QA_TABLES: dict[str, list[str]] = {
"LION DAT": [
"qa__lion_dat_summary",
"qa__lion_dat_by_row",
"qa__lion_dat_individual_diffs",
],
"ThinLION": [
"qa__thinlion_all_comparison",
"qa__thinlion_bronx_comparison",
"qa__thinlion_brooklyn_comparison",
"qa__thinlion_manhattan_comparison",
"qa__thinlion_queens_comparison",
"qa__thinlion_statenisland_comparison",
],
"RPL": [
"qa__rpl_order",
"qa__rpl_order_diffs",
],
}


@st.cache_data(show_spinner=False)
Expand All @@ -13,11 +38,56 @@ def get_builds() -> list[str]:

@st.cache_data(show_spinner=False)
def get_build_version(build: str) -> str:
product_key = publishing.BuildKey(PRODUCT, build)
product_key = BuildKey(PRODUCT, build)
return publishing.get_build_metadata(product_key).version


@st.cache_data(show_spinner=False)
def get_diffs_summary(build: str) -> pd.DataFrame:
product_key = publishing.BuildKey(PRODUCT, build)
product_key = BuildKey(PRODUCT, build)
return publishing.read_csv(product_key, "validation_output/diffs_summary.csv")


@st.cache_data(show_spinner=False)
def get_diff_rows(build: str, filename: str) -> list[str]:
"""Stream dev and prod files from S3 and return lines present in dev but not prod.

Equivalent to comm -23 <(sort dev) <(sort prod).
Cached so repeated selections don't re-fetch from S3.
"""
build_key = BuildKey(PRODUCT, build)
prod_version = get_build_version(build)

dev_buffer = publishing.get_file(build_key, filename)
dev_lines = {
line
for line in dev_buffer.read().decode("latin-1").splitlines()
if line.strip()
}

prod_buffer = s3.get_file_as_stream(
PROD_BUCKET, f"cscl_etl/{prod_version}/{filename}"
)
prod_lines = {
line
for line in prod_buffer.read().decode("latin-1").splitlines()
if line.strip()
}

return sorted(dev_lines - prod_lines)


def get_pg_client(build: str) -> postgres.PostgresClient:
"""Return a PostgresClient scoped to the dbt schema for the given build.

In CI, BUILD_ENGINE_SCHEMA is set to the build name, so the dbt tables
for build 'nightly_qa' live in schema 'nightly_qa'.
"""
return postgres.PostgresClient(database=PRODUCT, schema=build)


@st.cache_data(show_spinner=False)
def get_dbt_qa_table(build: str, table_name: str) -> pd.DataFrame:
"""Read a dbt QA model table from Postgres into a DataFrame."""
client = get_pg_client(build)
return client.read_table_df(table_name)
125 changes: 125 additions & 0 deletions products/cscl/poc_validation/run_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
Streams CSCL build outputs and production files from S3 and computes diffs — without
downloading input files to disk or re-running the full build.

Run from the products/cscl directory:
python poc_validation/run_validation.py --build <build_name>

Diff results are written to output/validation_output/ (one file per CSCL output file)
along with a validation_summary.csv. No input files are written to disk.

The prod version is read from the build metadata by default; override with --prod-version.
"""

import csv
from pathlib import Path

import typer

from dcpy.connectors.edm import publishing
from dcpy.models.connectors.edm.publishing import BuildKey
from dcpy.utils import s3

PRODUCT = "db-cscl"
PROD_BUCKET = "edm-private"
VALIDATION_OUTPUT_DIR = Path("output/validation_output")

# Files present in the build S3 folder that are not CSCL output files
BUILD_METADATA_FILES = {
"build_metadata.json",
"log.csv",
"output.zip",
"recipe.lock.yml",
"source_data_versions.csv",
}

app = typer.Typer(add_completion=False)


def _compute_file_diff(
build_key: BuildKey, filename: str, prod_version: str
) -> tuple[list[str], int]:
"""Stream both files from S3 and return (diff_rows, prod_row_count).

diff_rows contains lines present in the dev build but not in prod,
equivalent to comm -23 <(sort dev) <(sort prod).
"""
dev_buffer = publishing.get_file(build_key, filename)
dev_lines = {
line
for line in dev_buffer.read().decode("latin-1").splitlines()
if line.strip()
}

prod_buffer = s3.get_file_as_stream(
PROD_BUCKET, f"cscl_etl/{prod_version}/{filename}"
)
prod_lines = {
line
for line in prod_buffer.read().decode("latin-1").splitlines()
if line.strip()
}

return sorted(dev_lines - prod_lines), len(prod_lines)


@app.command()
def run(
build: str = typer.Option(..., "--build", "-b", help="Build name, e.g. nightly_qa"),
prod_version: str | None = typer.Option(
None,
"--prod-version",
"-p",
help="Prod CSCL version to compare against, e.g. 25a. Defaults to version in build metadata.",
),
) -> None:
"""Stream build outputs and prod files from S3 and compute diffs without using disk."""
build_key = BuildKey(PRODUCT, build)

if prod_version is None:
print(f"Reading prod version from build metadata for '{build}'...")
prod_version = publishing.get_build_metadata(build_key).version
print(f"Prod version: {prod_version}")

all_filenames = publishing.get_filenames(build_key)
cscl_output_filenames = sorted(
f for f in all_filenames if "/" not in f and f not in BUILD_METADATA_FILES
)
print(f"\nValidating {len(cscl_output_filenames)} files...")

VALIDATION_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
summary_rows = []
total_records = 0
total_mismatched = 0

for filename in cscl_output_filenames:
print(f"Validating {filename}")
diff_rows, prod_row_count = _compute_file_diff(
build_key, filename, prod_version
)
n_mismatched = len(diff_rows)
print(f" Total records: {prod_row_count}")
print(f" Mismatched records: {n_mismatched}")

total_records += prod_row_count
total_mismatched += n_mismatched

(VALIDATION_OUTPUT_DIR / filename).write_text(
"\n".join(diff_rows), encoding="latin-1"
)
summary_rows.append((filename, prod_row_count, n_mismatched))

summary_path = VALIDATION_OUTPUT_DIR / "validation_summary.csv"
with summary_path.open("w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["filename", "prod_row_count", "mismatched_rows"])
writer.writerows(summary_rows)

print("\nComparison complete!")
print(f"Total records: {total_records}")
print(f"Mismatched records: {total_mismatched}")
print(f"Summary written to {summary_path}")


if __name__ == "__main__":
app()
30 changes: 19 additions & 11 deletions products/cscl/poc_validation/summarize_diffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,15 @@

VALIDATION_DIR = Path("output/validation_output")
OUTPUT_PATH = VALIDATION_DIR / "diffs_summary.csv"

FIELDNAMES = [
"Group",
"File name",
"Table name",
"Name",
"Has diffs",
"# of rows with diffs",
"DE Note",
]
VALIDATION_SUMMARY_PATH = VALIDATION_DIR / "validation_summary.csv"

# Files to exclude from the summary
EXCLUDE = {
"recipe.lock.yml",
"source_data_versions.csv",
"build_metadata.json",
"diffs_summary.csv",
"validation_summary.csv",
"log.csv",
}

Expand Down Expand Up @@ -73,17 +65,29 @@ def count_diff_rows(path: Path) -> int:
return sum(1 for line in text.splitlines() if line.strip())


def load_prod_row_counts() -> dict[str, int]:
with VALIDATION_SUMMARY_PATH.open(newline="") as f:
return {
row["filename"]: int(row["prod_row_count"]) for row in csv.DictReader(f)
}


def main() -> None:
filename_to_table = build_filename_to_table_name()
export_name_to_group = build_export_name_to_group()
prod_row_counts = load_prod_row_counts()

files = sorted(
p for p in VALIDATION_DIR.iterdir() if p.is_file() and p.name not in EXCLUDE
)

rows = []
for path in files:
prod_row_count = prod_row_counts.get(path.name)
diff_count = count_diff_rows(path)
percennt_of_rows_with_diffs = (
(diff_count / prod_row_count) if prod_row_count else None
)
table_name = filename_to_table.get(path.name, "")
group_name = export_name_to_group.get(table_name, "")
display_name = " ".join([group_name, path.name.split(".")[0]]).strip()
Expand All @@ -94,15 +98,19 @@ def main() -> None:
"Table name": table_name,
"Name": display_name,
"Has diffs": diff_count > 0,
"# of rows in prod": prod_row_count,
"# of rows with diffs": diff_count,
"% of rows with diffs": f"{percennt_of_rows_with_diffs:.2%}"
if percennt_of_rows_with_diffs is not None
else None,
"DE Note": "",
}
)

rows.sort(key=lambda r: r["Group"])

with OUTPUT_PATH.open("w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=FIELDNAMES)
writer = csv.DictWriter(f, fieldnames=list(rows[0].keys()) if rows else [])
writer.writeheader()
writer.writerows(rows)

Expand Down
30 changes: 23 additions & 7 deletions products/cscl/poc_validation/validate_outputs.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
#!/bin/bash

# Expects two folders in current directory
# output - contains outputs of build
# .data/prod - contains "production" 25a (or whatever version) for comparison
mkdir output/validation_output
# Compares dev build output files against production files and writes per-file diff results.
#
# NOTE: This script is a legacy local tool. The primary method for generating diffs is
# poc_validation/run_validation.py, which streams files from S3 without requiring local
# copies and is used by both the nightly build and the QA app.
# Use this script only if you have local copies of both output/ and .data/prod/ already.
#
# For each file in output/, performs a line-level comparison against the matching file in
# .data/prod/ and writes the mismatched (dev-only) rows to output/validation_output/<filename>.
# Also writes a summary CSV (validation_summary.csv) with per-file prod row counts and
# mismatched row counts.
#
# Expects two folders in the current directory:
# output/ - contains outputs of the current dev build
# .data/prod/ - contains the production files to compare against
mkdir -p output/validation_output

csv_file="output/validation_output/validation_summary.csv"
echo "filename,prod_row_count,mismatched_rows" > "$csv_file"

total_records=0
total_mismatched=0
Expand All @@ -14,9 +29,9 @@ for filepath in output/*; do
fi
echo "Validating $file"

n_records="$(cat output/$file | wc -l | awk '{print $1}')"
echo "Total records: $n_records"
total_records=$(($total_records + $n_records))
prod_row_count="$(cat .data/prod/$file | wc -l | awk '{print $1}')"
echo "Total records: $prod_row_count"
total_records=$(($total_records + $prod_row_count))
mismatched_rows=$(comm -23 <(sort output/$file) <(sort .data/prod/$file))

if [ -z "$mismatched_rows" ]; then
Expand All @@ -28,6 +43,7 @@ for filepath in output/*; do
total_mismatched=$(($total_mismatched + $n_mismatched))

echo -e "$mismatched_rows" > output/validation_output/$file
echo "$file,$prod_row_count,$n_mismatched" >> "$csv_file"
echo ""
done

Expand Down
Loading