diff --git a/acro/acro_regression.py b/acro/acro_regression.py index 9c5056a..0859e32 100644 --- a/acro/acro_regression.py +++ b/acro/acro_regression.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import re import warnings from inspect import stack from typing import Any @@ -22,6 +23,119 @@ logger = logging.getLogger("acro") +def _get_endog_exog_variables(endog: ArrayLike, exog: ArrayLike) -> list[str]: + """Extract variable names from endog and exog arguments. + + Parameters + ---------- + endog : array_like + The dependent variable (Series or array). + exog : array_like + The independent variables (DataFrame, Series, or array). + + Returns + ------- + list[str] + List of variable names: [dependent, independent1, independent2, ...]. + """ + variables: list[str] = [] + + if hasattr(endog, "name") and endog.name is not None: + variables.append(str(endog.name)) + if hasattr(exog, "columns"): + for col in exog.columns: + if str(col) != "const": + variables.append(str(col)) + elif hasattr(exog, "name") and exog.name is not None: + variables.append(str(exog.name)) + return variables + + +def _split_formula_terms(text: str, delimiters: str = "+") -> list[str]: + """Split a formula string on delimiters, but only outside parentheses. + + Parameters + ---------- + text : str + The string to split. + delimiters : str + Characters to split on (e.g., '+' or ':*'). + + Returns + ------- + list[str] + The split terms. + """ + terms: list[str] = [] + depth = 0 + current: list[str] = [] + for char in text: + if char == "(": + depth += 1 + current.append(char) + elif char == ")": + depth -= 1 + current.append(char) + elif char in delimiters and depth == 0: + terms.append("".join(current)) + current = [] + else: + current.append(char) + terms.append("".join(current)) + return terms + + +def _get_formula_variables(formula: str) -> list[str]: # noqa: C901 + """Extract variable names from an formula string. + + Parses formulas like 'y ~ x1 + x2 + x3' to extract variable names. + Handles interaction terms (x1:x2), polynomial terms I(x^2), and + categorical terms C(x), respecting parentheses nesting. + + Parameters + ---------- + formula : str + An R-style formula string, e.g., 'y ~ x1 + x2'. + + Returns + ------- + list[str] + List of variable names: [dependent, independent1, independent2, ...]. + """ + variables: list[str] = [] + parts = formula.split("~") + if len(parts) != 2: + return variables + dep_var = parts[0].strip() + if dep_var: + variables.append(dep_var) + rhs = parts[1].strip() + terms = _split_formula_terms(rhs, "+") + for term in terms: + term = term.strip() + if not term or term == "1": + continue + sub_terms = _split_formula_terms(term, ":*") + for sub in sub_terms: + sub = sub.strip() + if not sub or sub == "1": + continue + sub = re.sub(r"^[IC]\(", "", sub) + sub = re.sub(r"\)$", "", sub) + sub = re.sub(r"\^\d+$", "", sub) + while sub.startswith("(") and sub.endswith(")"): + sub = sub[1:-1] + sub = sub.strip() + if "+" in sub: + for inner in _split_formula_terms(sub, "+"): + inner = inner.strip() + if inner and inner not in variables: + variables.append(inner) + elif sub and sub not in variables: + variables.append(sub) + return variables + + class Regression: """Creates regression models.""" @@ -73,10 +187,11 @@ def ols( results = model.fit() status, summary, dof = self.__check_model_dof("ols", model) tables: list[SimpleTable] = results.summary().tables + vars_used = _get_endog_exog_variables(endog, exog) self.results.add( status=status, output_type="regression", - properties={"method": "ols", "dof": dof}, + properties={"method": "ols", "dof": dof, "variables": vars_used}, sdc={}, command=command, summary=summary, @@ -85,7 +200,7 @@ def ols( ) return results - def olsr( + def olsr( # pylint: disable=keyword-arg-before-vararg self, formula: str, data: Any, @@ -144,10 +259,11 @@ def olsr( results = model.fit() status, summary, dof = self.__check_model_dof("olsr", model) tables: list[SimpleTable] = results.summary().tables + vars_used = _get_formula_variables(formula) self.results.add( status=status, output_type="regression", - properties={"method": "olsr", "dof": dof}, + properties={"method": "olsr", "dof": dof, "variables": vars_used}, sdc={}, command=command, summary=summary, @@ -193,10 +309,11 @@ def logit( results = model.fit() status, summary, dof = self.__check_model_dof("logit", model) tables: list[SimpleTable] = results.summary().tables + vars_used = _get_endog_exog_variables(endog, exog) self.results.add( status=status, output_type="regression", - properties={"method": "logit", "dof": dof}, + properties={"method": "logit", "dof": dof, "variables": vars_used}, sdc={}, command=command, summary=summary, @@ -205,7 +322,7 @@ def logit( ) return results - def logitr( + def logitr( # pylint: disable=keyword-arg-before-vararg self, formula: str, data: Any, @@ -264,10 +381,11 @@ def logitr( results = model.fit() status, summary, dof = self.__check_model_dof("logitr", model) tables: list[SimpleTable] = results.summary().tables + vars_used = _get_formula_variables(formula) self.results.add( status=status, output_type="regression", - properties={"method": "logitr", "dof": dof}, + properties={"method": "logitr", "dof": dof, "variables": vars_used}, sdc={}, command=command, summary=summary, @@ -313,10 +431,11 @@ def probit( results = model.fit() status, summary, dof = self.__check_model_dof("probit", model) tables: list[SimpleTable] = results.summary().tables + vars_used = _get_endog_exog_variables(endog, exog) self.results.add( status=status, output_type="regression", - properties={"method": "probit", "dof": dof}, + properties={"method": "probit", "dof": dof, "variables": vars_used}, sdc={}, command=command, summary=summary, @@ -325,7 +444,7 @@ def probit( ) return results - def probitr( + def probitr( # pylint: disable=keyword-arg-before-vararg self, formula: str, data: Any, @@ -384,10 +503,11 @@ def probitr( results = model.fit() status, summary, dof = self.__check_model_dof("probitr", model) tables: list[SimpleTable] = results.summary().tables + vars_used = _get_formula_variables(formula) self.results.add( status=status, output_type="regression", - properties={"method": "probitr", "dof": dof}, + properties={"method": "probitr", "dof": dof, "variables": vars_used}, sdc={}, command=command, summary=summary, diff --git a/acro/acro_tables.py b/acro/acro_tables.py index 72ecbe2..e290d0b 100644 --- a/acro/acro_tables.py +++ b/acro/acro_tables.py @@ -1,5 +1,6 @@ """ACRO: Tables functions.""" +# pylint: disable=too-many-lines from __future__ import annotations import logging @@ -72,7 +73,7 @@ def __init__(self, suppress: bool) -> None: self.suppress: bool = suppress self.results: Records = Records() - def crosstab( + def crosstab( # pylint: disable=too-many-arguments,too-many-locals # noqa: C901 self, index: Any, columns: Any, @@ -212,14 +213,29 @@ def crosstab( colnames=colnames, normalize=normalize, ) - sdc = get_table_sdc(masks, self.suppress, table) + + vars_used: list[str] = [] + if isinstance(index, pd.Series): + vars_used.append(index.name) + elif isinstance(index, list): + for var in index: + if isinstance(var, pd.Series): + vars_used.append(var.name) + if isinstance(columns, pd.Series): + vars_used.append(columns.name) + elif isinstance(columns, list): + for var in columns: + if isinstance(var, pd.Series): + vars_used.append(var.name) + if values is not None and isinstance(values, pd.Series): + vars_used.append(values.name) # record output self.results.add( status=status, output_type="table", - properties={"method": "crosstab"}, + properties={"method": "crosstab", "variables": vars_used}, sdc=sdc, command=command, summary=summary, @@ -234,7 +250,7 @@ def crosstab( ) return table - def pivot_table( + def pivot_table( # pylint: disable=too-many-arguments,too-many-locals # noqa: C901 self, data: DataFrame, values: Any = None, @@ -422,12 +438,27 @@ def pivot_table( observed=observed, sort=sort, ) - sdc = get_table_sdc(masks, self.suppress, table) + + vars_used: list[str] = [] + if isinstance(index, list): + vars_used.extend(index) + elif index is not None: + vars_used.append(index) + if isinstance(columns, list): + vars_used.extend(columns) + elif columns is not None: + vars_used.append(columns) + if isinstance(values, list): + vars_used.extend(values) + elif values is not None: + vars_used.append(values) + vars_used = [str(v) for v in vars_used] + # record output self.results.add( status=status, output_type="table", - properties={"method": "pivot_table"}, + properties={"method": "pivot_table", "variables": vars_used}, sdc=sdc, command=command, summary=summary, @@ -442,7 +473,7 @@ def pivot_table( ) return table - def surv_func( + def surv_func( # pylint: disable=too-many-arguments,too-many-locals self, time: Any, status: Any, @@ -541,7 +572,7 @@ def surv_func( return (plot, output_filename) return None - def survival_table( + def survival_table( # pylint: disable=too-many-arguments self, survival_table: DataFrame, safe_table: DataFrame, @@ -566,7 +597,7 @@ def survival_table( ) return survival_table - def survival_plot( + def survival_plot( # pylint: disable=too-many-arguments self, survival_table: DataFrame, survival_func: Any, @@ -617,7 +648,7 @@ def survival_plot( ) return (plot, unique_filename) - def hist( + def hist( # pylint: disable=too-many-arguments,too-many-locals self, data: DataFrame, column: str, @@ -914,7 +945,7 @@ def pie( return unique_filename -def create_crosstab_masks( +def create_crosstab_masks( # pylint: disable=too-many-arguments,too-many-locals index: Any, columns: Any, values: Any, @@ -1365,7 +1396,7 @@ def _align_mask_columns(m: DataFrame, table: DataFrame) -> DataFrame: if table_nlevels == 2 and mask_nlevels == 2: table_top = table.columns.get_level_values(0).unique().tolist() mask_top = m.columns.get_level_values(0).unique().tolist() - if len(mask_top) == 1 and len(table_top) > 1: + if mask_top != table_top: n_base = len(table.columns.get_level_values(1).unique()) base_mask = m.iloc[:, :n_base] flat_cols = base_mask.columns.get_level_values(1) @@ -1771,7 +1802,7 @@ def get_index_columns( return index_new, columns_new -def crosstab_with_totals( +def crosstab_with_totals( # pylint: disable=too-many-arguments,too-many-locals masks: dict[str, DataFrame], aggfunc: Any, index: Any, @@ -1907,7 +1938,7 @@ def crosstab_with_totals( return table -def manual_crossstab_with_totals( +def manual_crossstab_with_totals( # pylint: disable=too-many-arguments table: DataFrame, aggfunc: str | list[str] | None, index: Any, diff --git a/acro/record.py b/acro/record.py index 1868cfe..fd15f14 100644 --- a/acro/record.py +++ b/acro/record.py @@ -58,7 +58,7 @@ def load_output(path: str, output: list[str]) -> list[str] | list[DataFrame]: return loaded -class Record: +class Record: # pylint: disable=too-many-instance-attributes """Stores data related to a single output record. Attributes @@ -89,7 +89,7 @@ class Record: Time the record was created in ISO format. """ - def __init__( + def __init__( # pylint: disable=too-many-arguments self, uid: str, status: str, @@ -214,7 +214,7 @@ def __init__(self) -> None: self.results: dict[str, Record] = {} self.output_id: int = 0 - def add( + def add( # pylint: disable=too-many-arguments self, status: str, output_type: str, @@ -430,6 +430,318 @@ def validate_outputs(self) -> None: ) record.exception = input("") + def _extract_table_info( + self, + output: list, + ) -> tuple[list[str], int]: + """Extract variables and total records from table output. + + Parameters + ---------- + output : list + The output to extract information from. + + Returns + ------- + tuple[list[str], int] + A sorted, deduplicated list of variables and the total record count. + """ + variables: list[str] = [] + total_records: int = 0 + + for table in output: + if isinstance(table, DataFrame): + if hasattr(table.index, "names") and any(table.index.names): + variables.extend(str(n) for n in table.index.names if n is not None) + + if hasattr(table.columns, "names") and any(table.columns.names): + variables.extend( + str(n) for n in table.columns.names if n is not None + ) + + try: + # Count non-NaN cells for record count + cell_sum = table.values[~pd.isna(table.values)].sum() + if cell_sum > 0: + total_records = int(cell_sum) + else: + total_records = int(table.shape[0] * table.shape[1]) + except (TypeError, ValueError): # pragma: no cover + pass + + return sorted(list(set(variables))), total_records + + def _extract_regression_info(self, output: list) -> int: + """Extract variables and total records from regression output. + + Parameters + ---------- + output : list + The output to extract information from. + + Returns + ------- + int + Total record count. + """ + total_records: int = 0 + + for table in output: + if isinstance(table, DataFrame): + search_targets = [str(idx).lower() for idx in table.index] + for i, target in enumerate(search_targets): + if "no. observations" in target: + try: + val = table.iloc[i].dropna().iloc[0] + total_records = int(float(val)) + return total_records + except (ValueError, TypeError, IndexError): + pass + + col_targets = [str(col).lower() for col in table.columns] + for i, target in enumerate(col_targets): + if "no. observations" in target: + try: + val_str = table.columns[i + 1] + total_records = int(float(val_str)) + return total_records + except (ValueError, TypeError, IndexError): + pass + + return total_records + + def _mark_diff_risk(self, summary_df: DataFrame) -> DataFrame: + """Mark outputs with differencing risk. + + Differencing risk occurs when multiple tables share the same variables but have different + suppression settings. This allows an attacker to infer suppressed values by comparing the outputs. + + Parameters + ---------- + summary_df : DataFrame + The summary DataFrame to update. + + Returns + ------- + DataFrame + Updated summary DataFrame with diff_risk column. + """ + if summary_df.empty: + summary_df["diff_risk"] = pd.Series(dtype=bool) + else: + summary_df["diff_risk"] = False + table_mask = summary_df["type"] == "table" + table_outputs = summary_df.loc[table_mask] + if not table_outputs.empty: + for _, group in table_outputs.groupby("variables"): + if len(group) > 1: + # Check for different suppression settings + suppressions = group["suppression"].unique() + # Risk if same variables with different suppression settings + if len(suppressions) > 1: + summary_df.loc[group.index, "diff_risk"] = True + + return summary_df + + def _extract_all_variables(self) -> list[str]: + """Extract all unique variables across all outputs. + + Returns + ------- + list[str] + Sorted list of unique variable names. + """ + all_variables: set[str] = set() + + for rec in self.results.values(): + if rec.output_type == "custom": + continue + variables = self._get_output_variables(rec) + all_variables.update(variables) + + return sorted(all_variables) + + def _get_output_variables(self, rec: Record) -> list[str]: + """Extract variables from a single record. + + Parameters + ---------- + rec : Record + The record to extract variables from. + + Returns + ------- + list[str] + List of variable names. + """ + variables: list[str] = [] + + if "variables" in rec.properties: + variables = [str(v) for v in rec.properties["variables"]] + elif rec.output_type == "table": + variables, _ = self._extract_table_info(rec.output) + elif rec.output_type == "regression": + variables = self._extract_regression_variables(rec.output) + return variables + + def _build_variable_matrix(self, summary_df: DataFrame) -> DataFrame: + """Build a variable-output matrix showing variable usage. + + Parameters + ---------- + summary_df : DataFrame + The base summary DataFrame. + + Returns + ------- + DataFrame + Summary with binary variable columns added. + """ + all_variables = self._extract_all_variables() + + if not all_variables: + return summary_df + + # Create binary columns for each variable + for var in all_variables: + summary_df[var] = summary_df["variables"].apply( + lambda vars_str, v=var: 1 if v in vars_str.split("; ") else 0 + ) + + return summary_df + + def generate_variable_matrix_table(self) -> DataFrame: + """Generate a clean variable-output matrix table. + + Creates a table with one row per output and one column per variable, + plus an output_type column. Binary values indicate variable usage. + + Returns + ------- + DataFrame + Variable matrix table with columns: output_type, var1, var2, ... + """ + all_variables = self._extract_all_variables() + matrix_rows = [] + + for uid, rec in self.results.items(): + if rec.output_type == "custom": + continue # pragma: no cover + + variables = self._get_output_variables(rec) + + row: dict[str, Any] = {"output_id": uid, "output_type": rec.output_type} + for var in all_variables: + row[var] = 1 if var in variables else 0 + + matrix_rows.append(row) + + return DataFrame(matrix_rows) + + def _extract_regression_variables(self, output: list) -> list[str]: + """Extract dependent and independent variable names from regression output. + + Parameters + ---------- + output : list + The regression output DataFrames. + + Returns + ------- + list[str] + Dependent variable followed by independent variables. + """ + variables: list[str] = [] + if len(output) < 2: + return variables + table0, table1 = output[0], output[1] + if isinstance(table0, DataFrame) and len(table0.columns) > 0: + dep_var = str(table0.columns[0]) + variables.append(dep_var) + if isinstance(table1, DataFrame): + for name in table1.index: + name_str = str(name) + if name_str not in ("const", "Intercept"): + variables.append(name_str) + return variables + + def generate_summary(self) -> DataFrame: + """Generate a summary DataFrame of all outputs in the session. + + Provides output checkers with a high-level overview of all outputs, + including what method was used, what variables are involved, the + total record count, and whether there is a differencing risk. + + Returns + ------- + DataFrame + Summary of all outputs with columns: id, method, variables, status, type, + command, summary, total_records, suppression, + timestamp, diff_risk. + """ + rows = [] + for uid, rec in self.results.items(): + if rec.output_type == "custom": + continue + method = rec.properties.get("method", rec.output_type) + variables: list[str] = self._get_output_variables(rec) + total_records: int = 0 + + if rec.output_type == "table": + _, total_records = self._extract_table_info(rec.output) + elif rec.output_type == "regression": + total_records = self._extract_regression_info(rec.output) + + variables_str = "; ".join(variables) if variables else "" + + suppression: bool = False + if isinstance(rec.sdc, dict) and "summary" in rec.sdc: + suppression = bool(rec.sdc["summary"].get("suppressed", False)) + + rows.append( + { + "id": uid, + "method": method, + "variables": variables_str, + "status": rec.status, + "type": rec.output_type, + "command": rec.command, + "summary": rec.summary, + "total_records": total_records, + "suppression": suppression, + "timestamp": rec.timestamp, + } + ) + + summary_df = DataFrame(rows) + summary_df = self._mark_diff_risk(summary_df) + summary_df = self._build_variable_matrix(summary_df) + + return summary_df + + def add_summary_to_results(self) -> None: + """Add the summary DataFrame as a custom output to results. + + This generates a summary of all outputs in the session with metadata + about variables, record counts, and differencing risk. The file is + marked with a clear warning not to release. + """ + summary_df = self.generate_summary() + if summary_df.empty: + return + + os.makedirs("acro_artifacts", exist_ok=True) + # Use explicit filename to indicate this should not be released + summary_path = os.path.normpath( + "acro_artifacts/DO_NOT_RELEASE_session_summary.csv" + ) + summary_df.to_csv(summary_path, index=False) + + self.add_custom( + summary_path, + "WARNING: DO NOT RELEASE - Session summary for output checker use only", + ) + def finalise(self, path: str, ext: str, interactive: bool = False) -> None: """Create a results file for checking. @@ -445,6 +757,7 @@ def finalise(self, path: str, ext: str, interactive: bool = False) -> None: logger.debug("finalise()") if interactive: self.validate_outputs() + self.add_summary_to_results() if ext == "json": self.finalise_json(path) elif ext == "xlsx": @@ -484,7 +797,19 @@ def finalise_json(self, path: str) -> None: for file in files: outputs[key]["files"].append({"name": file, "sdc": val.sdc}) - results: dict[str, str | dict] = {"version": __version__, "results": outputs} + # Generate and include session summary for output checkers + summary_df = self.generate_summary() + session_summary = { + "DO_NOT_RELEASE": True, + "purpose": "Session summary for output checker use only", + "data": json.loads(summary_df.to_json(orient="records")), + } + + results: dict = { + "version": __version__, + "results": outputs, + "session_summary": session_summary, + } filename: str = os.path.normpath(f"{path}/results.json") try: with open(filename, "w", newline="", encoding="utf-8") as handle: @@ -510,7 +835,16 @@ def finalise_excel(self, path: str) -> None: logger.debug("Directory %s created successfully", path) except FileExistsError: # pragma: no cover logger.debug("Directory %s already exists", path) - with pd.ExcelWriter(filename, engine="openpyxl") as writer: + with pd.ExcelWriter( # pylint: disable=abstract-class-instantiated + filename, engine="openpyxl" + ) as writer: + # summary sheet + summary_df = self.generate_summary() + if not summary_df.empty: + summary_df.to_excel( + writer, sheet_name="summary", index=False, startrow=0 + ) + # description sheet sheet: list[str] = [] summary: list[str] = [] diff --git a/pyproject.toml b/pyproject.toml index 9c1b46e..9de1ee3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -143,7 +143,6 @@ lint.ignore = [ "acro_stata_parser.py" = ["C901"] "acro/acro_regression.py" = ["B026"] - [tool.ruff.lint.pep8-naming] extend-ignore-names = ["X", "X_train", "X_predict"]