diff --git a/BRIEFING.md b/BRIEFING.md new file mode 100644 index 00000000..1b12bcbf --- /dev/null +++ b/BRIEFING.md @@ -0,0 +1,83 @@ +# SDID Practitioner Validation Tooling - Briefing + +## Problem + +A data scientist runs `SyntheticDiD`, gets an ATT and a p-value, and then +faces the question: *should I trust this estimate?* The library gives them the +point estimate and inference, but the validation workflow - the steps between +"I got a number" and "I'm confident enough to present this" - is largely +left to the practitioner to assemble from scratch. + +The standard validation workflow for synthetic control methods is well +understood in the econometrics literature (Arkhangelsky et al. 2021, +Abadie et al. 2010, Abadie 2021). The pieces include pre-treatment fit +assessment, weight diagnostics, placebo/falsification tests, sensitivity +analysis, and cross-estimator comparison. Our library provides some of the +raw ingredients (pre-treatment RMSE, weight dicts, placebo effects array) +but doesn't connect them into an accessible diagnostic workflow. + +The gap is most visible in `practitioner.py`, where `_handle_synthetic` +recommends in-time placebos and leave-one-out analysis but provides only +comment-only pseudo-code. A practitioner following that guidance hits a wall. + +## Current state + +What we have today: + +- `results.pre_treatment_fit` (RMSE) with a warning when it exceeds the + treated pre-period SD +- `results.get_unit_weights_df()` and `results.get_time_weights_df()` +- Three variance methods: placebo (default), bootstrap, and jackknife (just + landed in v3.1.1) +- `results.placebo_effects` - stores per-iteration estimates for all three + variance methods, but for jackknife these are positional LOO estimates + with no unit labels +- `results.summary()` shows top-5 unit weights and count of non-trivial weights +- `practitioner.py` guidance that names the right steps but can't point to + runnable code for most of them + +What the practitioner must currently build themselves: + +- Mapping jackknife LOO estimates back to unit identities to answer "which + unit, when dropped, changes my estimate the most?" +- In-time placebo tests (re-estimate with a fake treatment date) +- Any weight concentration metric beyond eyeballing the sorted list +- Any sense of whether their RMSE is "bad enough to worry about" beyond + the binary warning +- Regularization sensitivity (does the ATT change if I perturb zeta?) +- Pre-treatment trajectory data for plotting (the Y matrices are internal + to `fit()` and not returned) + +## Context from prior discussion + +The jackknife work created an interesting opportunity. The delete-one-re-estimate +loop already runs for SE computation. The per-unit ATT estimates are stored in +`results.placebo_effects`. The missing piece is a presentation layer that maps +those estimates to unit identities and surfaces the diagnostic interpretation +(which units are influential, how stable is the estimate to unit composition). + +More broadly, the validation gaps fall into two categories: + +1. **Low-marginal-cost additions** - things where the computation already + exists and we just need to expose or label it (LOO diagnostic from + jackknife, weight concentration metrics, trajectory data extraction) + +2. **New functionality** - things that require new estimation loops or + helpers (in-time placebo, regularization sensitivity sweep) + +The practitioner guidance in `practitioner.py` should evolve alongside any +new tooling so that the recommended steps point to real, runnable code paths. + +## What "done" looks like + +A practitioner using SyntheticDiD should be able to follow a credible +validation workflow using library-provided tools and guidance, without +needing to reverse-engineer internals or write substantial boilerplate. +The validation steps recognized in the literature should either be directly +supported or have clear, concrete guidance for how to perform them with +the library's API. + +This is not about adding visualization or plotting (that's a separate +concern). It's about making the computational and diagnostic building +blocks accessible and well-documented through the results API and +practitioner guidance. diff --git a/diff_diff/practitioner.py b/diff_diff/practitioner.py index 04a70a71..c58a75f6 100644 --- a/diff_diff/practitioner.py +++ b/diff_diff/practitioner.py @@ -505,35 +505,74 @@ def _handle_synthetic(results: Any): steps = [ _step( baker_step=6, - label="Check pre-treatment fit quality", + label="Check pre-treatment fit and weight concentration", why=( "Synthetic DiD relies on pre-treatment fit to construct " - "weights. Poor fit suggests the synthetic control may not " - "approximate the counterfactual well." + "weights. Poor fit or highly concentrated unit weights " + "suggest the synthetic control may not approximate the " + "counterfactual well." ), code=( - "# Check pre-treatment fit and unit weight concentration:\n" "print(f'Pre-treatment fit (RMSE): {results.pre_treatment_fit:.4f}')\n" - "# Highly concentrated weights suggest fragile estimates" + "concentration = results.get_weight_concentration()\n" + "print(f\"Effective N: {concentration['effective_n']:.1f}\")\n" + "print(f\"Top-5 weight share: {concentration['top_k_share']:.2%}\")" ), step_name="sensitivity", ), _step( baker_step=6, - label="In-time or in-space placebo", + label="In-time placebo", why=( - "Test robustness by re-estimating on a placebo treatment " - "period (in-time) or excluding treated units one at a time " - "(leave-one-out). These are the natural falsification " - "checks for synthetic control methods." + "Re-estimate on shifted fake treatment dates in the " + "pre-period. A credible design yields near-zero placebo " + "ATTs — departures signal that something is being picked " + "up pre-treatment, weakening the causal interpretation." ), code=( - "# In-time placebo: re-estimate with a fake treatment date\n" - "# Leave-one-out: drop each treated unit and re-estimate" + "placebo_df = results.in_time_placebo()\n" + "print(placebo_df)" ), priority="medium", step_name="sensitivity", ), + _step( + baker_step=6, + label="Leave-one-out influence (jackknife)", + why=( + "If the estimate is driven by a single unit, robustness " + "is weak. Fit with variance_method='jackknife' and inspect " + "which units move the ATT the most." + ), + code=( + "# Requires variance_method='jackknife' AND enough support for LOO\n" + "# (n_treated >= 2 and >= 2 effective-weight controls).\n" + "if getattr(results, '_loo_unit_ids', None) is not None:\n" + " loo_df = results.get_loo_effects_df()\n" + " print(loo_df.head(10))\n" + "else:\n" + " print('LOO not available - re-fit with '\n" + " 'variance_method=\"jackknife\" and ensure >=2 treated units '\n" + " 'with positive effective support.')" + ), + priority="medium", + step_name="sensitivity", + ), + _step( + baker_step=6, + label="Regularization sensitivity (zeta_omega)", + why=( + "The unit-weight regularization is auto-selected from " + "data. Show whether the ATT moves materially across a " + "grid of values to gauge robustness to this choice." + ), + code=( + "sens_df = results.sensitivity_to_zeta_omega()\n" + "print(sens_df)" + ), + priority="low", + step_name="sensitivity", + ), _step( baker_step=8, label="Compare with staggered estimators (CS, SA)", diff --git a/diff_diff/results.py b/diff_diff/results.py index 7129173d..504a84d0 100644 --- a/diff_diff/results.py +++ b/diff_diff/results.py @@ -649,6 +649,40 @@ def significance_stars(self) -> str: return _get_significance_stars(self.avg_p_value) +@dataclass +class _SyntheticDiDFitSnapshot: + """Internal snapshot of fit() state retained on SyntheticDiDResults for + post-hoc diagnostic methods (in-time placebo, regularization sensitivity). + + Not part of the public API. Arrays are marked read-only after + construction to prevent accidental mutation by diagnostic methods. + Memory scales as O(n_units * n_periods). + """ + + Y_pre_control: np.ndarray + Y_post_control: np.ndarray + Y_pre_treated: np.ndarray + Y_post_treated: np.ndarray + control_unit_ids: List[Any] + treated_unit_ids: List[Any] + pre_periods: List[Any] + post_periods: List[Any] + w_control: Optional[np.ndarray] = None + w_treated: Optional[np.ndarray] = None + + def __post_init__(self): + for arr in ( + self.Y_pre_control, + self.Y_post_control, + self.Y_pre_treated, + self.Y_post_treated, + ): + arr.setflags(write=False) + for arr in (self.w_control, self.w_treated): + if arr is not None: + arr.setflags(write=False) + + @dataclass class SyntheticDiDResults: """ @@ -677,6 +711,9 @@ class SyntheticDiDResults: Number of control units/observations. unit_weights : dict Dictionary mapping control unit IDs to their synthetic weights. + When survey weights are used, these are the composed effective + weights (omega_eff = raw Frank-Wolfe * survey, renormalized) that + were applied to produce the ATT, not the raw Frank-Wolfe solution. time_weights : dict Dictionary mapping pre-treatment periods to their time weights. pre_periods : list @@ -690,6 +727,23 @@ class SyntheticDiDResults: (for "placebo"), bootstrap ATT estimates (for "bootstrap"), or leave-one-out estimates (for "jackknife"). The ``variance_method`` field disambiguates the contents. + synthetic_pre_trajectory : np.ndarray, optional + Synthetic control trajectory in pre-treatment periods, shape + ``(n_pre,)``. Equal to ``Y_pre_control @ omega_eff`` where + ``omega_eff`` is the composed effective weight vector. + synthetic_post_trajectory : np.ndarray, optional + Synthetic control trajectory in post-treatment periods, shape + ``(n_post,)``. + treated_pre_trajectory : np.ndarray, optional + Treated-unit mean trajectory in pre-treatment periods, shape + ``(n_pre,)``. Survey-weighted when the fit used survey weights. + treated_post_trajectory : np.ndarray, optional + Treated-unit mean trajectory in post-treatment periods, shape + ``(n_post,)``. + time_weights_array : np.ndarray, optional + The Frank-Wolfe time weights as a 1-D array (same values as the + ``time_weights`` dict but order-stable and usable for re-estimation + by sensitivity methods). Shape ``(n_pre,)``. """ att: float @@ -714,6 +768,27 @@ class SyntheticDiDResults: n_bootstrap: Optional[int] = field(default=None) # Survey design metadata (SurveyMetadata instance from diff_diff.survey) survey_metadata: Optional[Any] = field(default=None) + # Trajectory data for plotting / custom fit metrics + synthetic_pre_trajectory: Optional[np.ndarray] = field(default=None) + synthetic_post_trajectory: Optional[np.ndarray] = field(default=None) + treated_pre_trajectory: Optional[np.ndarray] = field(default=None) + treated_post_trajectory: Optional[np.ndarray] = field(default=None) + # Frank-Wolfe time weights as ndarray, needed by sensitivity_to_zeta_omega + # which holds time weights fixed + time_weights_array: Optional[np.ndarray] = field(default=None) + + # Internal diagnostic state attached by SyntheticDiD.fit() after + # construction as plain instance attributes (NOT dataclass fields). Kept + # out of the field list deliberately so dataclass-recursive serializers + # (dataclasses.asdict, dataclasses.fields, dataclasses.replace) cannot + # reach the retained panel state or unit IDs. See __post_init__ for + # None-initialization; SyntheticDiD.fit() populates. + def __post_init__(self): + # Plain attributes rather than dataclass fields so asdict()-style + # recursion cannot serialize internal panel state. + self._loo_unit_ids: Optional[List[Any]] = None + self._loo_roles: Optional[List[str]] = None + self._fit_snapshot: Optional[_SyntheticDiDFitSnapshot] = None def __repr__(self) -> str: """Concise string representation.""" @@ -724,6 +799,23 @@ def __repr__(self) -> str: f"p={self.p_value:.4f})" ) + def __getstate__(self) -> Dict[str, Any]: + """Exclude the internal fit snapshot from pickling. + + The snapshot retains outcome matrices, unit IDs, and survey weights + to support post-hoc diagnostics (`in_time_placebo`, + `sensitivity_to_zeta_omega`). Serialization would otherwise carry + that panel state to wherever the pickle is sent, which is a privacy + hazard for survey-weighted or sensitive fits. + + Unpickled results keep the public fields (ATT, weights, trajectories, + etc.); calling a diagnostic method that needs the snapshot raises a + ValueError directing the user to re-fit. + """ + state = self.__dict__.copy() + state["_fit_snapshot"] = None + return state + @property def coef_var(self) -> float: """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" @@ -912,6 +1004,453 @@ def get_time_weights_df(self) -> pd.DataFrame: [{"period": period, "weight": weight} for period, weight in self.time_weights.items()] ) + def get_loo_effects_df(self) -> pd.DataFrame: + """ + Per-unit leave-one-out ATT from the jackknife variance pass. + + Requires ``variance_method='jackknife'``; raises ValueError otherwise. + + The underlying values come from the jackknife loops in + ``SyntheticDiD._jackknife_se``: control LOO estimates fill the + first ``n_control`` positions (in the order of the control units + seen by fit), then treated LOO estimates fill the next + ``n_treated`` positions. This method joins those estimates back + to user-facing unit identities. + + ``att_loo`` is NaN when the fit hit the zero-sum weight guard for + that unit (survey weights composed to zero once the unit was + dropped). ``delta_from_full`` propagates NaN in that case. + + Returns + ------- + pd.DataFrame + Columns: + - ``unit`` — user's unit ID + - ``role`` — ``'control'`` or ``'treated'`` + - ``att_loo`` — ATT with this unit dropped + - ``delta_from_full`` — ``att_loo - self.att`` + Sorted by ``|delta_from_full|`` descending, NaN rows at the end. + """ + if self.variance_method != "jackknife": + raise ValueError( + "get_loo_effects_df() requires variance_method='jackknife'. " + f"This result used variance_method='{self.variance_method}'. " + "Re-fit with SyntheticDiD(variance_method='jackknife') to " + "obtain per-unit leave-one-out estimates." + ) + if ( + self._loo_unit_ids is None + or self._loo_roles is None + or self.placebo_effects is None + ): + raise ValueError( + "Leave-one-out estimates are unavailable (jackknife returned " + "NaN or an empty array). See prior warnings from fit() for the " + "cause (e.g., single treated unit, all weight on one control)." + ) + + att_loo = np.asarray(self.placebo_effects, dtype=float) + delta = att_loo - self.att + df = pd.DataFrame( + { + "unit": self._loo_unit_ids, + "role": self._loo_roles, + "att_loo": att_loo, + "delta_from_full": delta, + } + ) + # Sort by |delta| descending. NaN rows sort to the end so the most + # influential real units appear first. + df["_abs_delta"] = df["delta_from_full"].abs() + df = df.sort_values( + by="_abs_delta", ascending=False, na_position="last" + ).drop(columns="_abs_delta") + df = df.reset_index(drop=True) + return df + + def in_time_placebo( + self, + fake_treatment_periods: Optional[List[Any]] = None, + zeta_omega_override: Optional[float] = None, + zeta_lambda_override: Optional[float] = None, + ) -> pd.DataFrame: + """ + Re-estimate the ATT on shifted fake treatment periods within the + original pre-treatment window. + + A credible placebo should produce near-zero ATTs at every shifted + date. Departures from zero signal that whatever the estimator + picked up at the real treatment date is also present pre-treatment, + weakening the causal interpretation. + + The post-treatment data is never used — only the pre-window is + re-sliced. Regularization reuses ``self.zeta_omega`` and + ``self.zeta_lambda`` from the original fit (R ``synthdid`` + convention), unless overrides are supplied. + + Parameters + ---------- + fake_treatment_periods : list, optional + Explicit pre-period values to test. If ``None`` (default), + sweeps every feasible pre-period — every P in ``pre_periods`` + whose position ``i`` satisfies ``i >= 2`` (so at least 2 + pre-fake periods remain for weight estimation) and + ``i <= n_pre - 1`` (so at least 1 post-fake period exists). + Values not in ``pre_periods`` raise ValueError (a value in + ``post_periods`` is explicitly not a placebo). + zeta_omega_override : float, optional + Override ``self.zeta_omega`` for the refit. Default reuses + the original. + zeta_lambda_override : float, optional + Override ``self.zeta_lambda`` for the refit. + + Returns + ------- + pd.DataFrame + Columns: + - ``fake_treatment_period`` — the shifted date + - ``att`` — placebo ATT (ideally near 0) + - ``pre_fit_rmse`` — RMSE on the fake pre-window + - ``n_pre_fake`` — periods before the fake date + - ``n_post_fake`` — periods from the fake date onward + NaN is emitted only for dimensional infeasibility. Frank-Wolfe + does not expose a mid-solver non-convergence signal; inspect + ``pre_fit_rmse`` for poor refit quality. + """ + if self._fit_snapshot is None: + raise ValueError( + "in_time_placebo() requires the fit snapshot on the results " + "object. This result appears to have been loaded from " + "serialization (which excludes the snapshot) or was produced " + "by an older estimator version. Re-fit to enable." + ) + from diff_diff.utils import ( + compute_sdid_estimator, + compute_sdid_unit_weights, + compute_time_weights, + ) + + snap = self._fit_snapshot + pre_periods = snap.pre_periods + n_pre = len(pre_periods) + zeta_omega = ( + zeta_omega_override if zeta_omega_override is not None else self.zeta_omega + ) + zeta_lambda = ( + zeta_lambda_override if zeta_lambda_override is not None else self.zeta_lambda + ) + if zeta_omega is None or zeta_lambda is None: + raise ValueError( + "in_time_placebo() needs zeta_omega and zeta_lambda from the " + "original fit. Expected on the results object but found None." + ) + noise_level = self.noise_level if self.noise_level is not None else 0.0 + min_decrease = 1e-5 * noise_level if noise_level > 0 else 1e-5 + + # Build the list of (fake_period, position) pairs to iterate. + period_to_idx = {p: i for i, p in enumerate(pre_periods)} + if fake_treatment_periods is None: + positions = list(range(2, n_pre)) + fake_list = [(pre_periods[i], i) for i in positions] + else: + fake_list = [] + for p in fake_treatment_periods: + if p in snap.post_periods: + raise ValueError( + f"fake_treatment_period={p!r} is in post_periods; a real " + "treatment date is not a placebo. Choose a value from " + "pre_periods." + ) + if p not in period_to_idx: + raise ValueError( + f"fake_treatment_period={p!r} not found in pre_periods " + f"({pre_periods!r})." + ) + fake_list.append((p, period_to_idx[p])) + + columns = [ + "fake_treatment_period", + "att", + "pre_fit_rmse", + "n_pre_fake", + "n_post_fake", + ] + if not fake_list: + return pd.DataFrame(columns=columns) + + rows: List[Dict[str, Any]] = [] + for fake_period, i in fake_list: + n_pre_fake = i + n_post_fake = n_pre - i + row: Dict[str, Any] = { + "fake_treatment_period": fake_period, + "att": float("nan"), + "pre_fit_rmse": float("nan"), + "n_pre_fake": n_pre_fake, + "n_post_fake": n_post_fake, + } + # Dimensional infeasibility: Frank-Wolfe needs >=2 pre-fake + # periods for unit weights; the estimator needs >=1 post-fake. + if n_pre_fake < 2 or n_post_fake < 1: + rows.append(row) + continue + + Y_pre_c = snap.Y_pre_control[:i, :] + Y_post_c = snap.Y_pre_control[i:, :] + Y_pre_t = snap.Y_pre_treated[:i, :] + Y_post_t = snap.Y_pre_treated[i:, :] + + if snap.w_treated is not None: + w_t = snap.w_treated + y_pre_t_mean = np.average(Y_pre_t, axis=1, weights=w_t) + y_post_t_mean = np.average(Y_post_t, axis=1, weights=w_t) + else: + y_pre_t_mean = np.mean(Y_pre_t, axis=1) + y_post_t_mean = np.mean(Y_post_t, axis=1) + + omega_fake = compute_sdid_unit_weights( + Y_pre_c, + y_pre_t_mean, + zeta_omega=zeta_omega, + min_decrease=min_decrease, + ) + lambda_fake = compute_time_weights( + Y_pre_c, + Y_post_c, + zeta_lambda=zeta_lambda, + min_decrease=min_decrease, + ) + + if snap.w_control is not None: + omega_eff_fake = omega_fake * snap.w_control + denom = omega_eff_fake.sum() + if denom == 0: + rows.append(row) + continue + omega_eff_fake = omega_eff_fake / denom + else: + omega_eff_fake = omega_fake + + att_fake = compute_sdid_estimator( + Y_pre_c, + Y_post_c, + y_pre_t_mean, + y_post_t_mean, + omega_eff_fake, + lambda_fake, + ) + synthetic_pre_fake = Y_pre_c @ omega_eff_fake + pre_fit = float( + np.sqrt(np.mean((y_pre_t_mean - synthetic_pre_fake) ** 2)) + ) + row["att"] = float(att_fake) + row["pre_fit_rmse"] = pre_fit + rows.append(row) + + return pd.DataFrame(rows) + + def sensitivity_to_zeta_omega( + self, + zeta_grid: Optional[List[float]] = None, + multipliers: Tuple[float, ...] = (0.25, 0.5, 1.0, 2.0, 4.0), + ) -> pd.DataFrame: + """ + Re-estimate the ATT across a grid of ``zeta_omega`` values to show + how sensitive the estimate is to the unit-weight regularization. + + The Frank-Wolfe time weights computed during the original fit are + held fixed here — this method isolates sensitivity to + ``zeta_omega`` specifically. ``zeta_lambda`` and the time weights + are not re-fit. + + Parameters + ---------- + zeta_grid : list of float, optional + Absolute ``zeta_omega`` values to evaluate. If ``None`` + (default), uses ``multipliers * self.zeta_omega`` — i.e. a + 5-point grid by default, spanning 16x from the smallest to + the largest multiplier and symmetric in log space around 1.0. + multipliers : tuple of float, default ``(0.25, 0.5, 1.0, 2.0, 4.0)`` + Multipliers on ``self.zeta_omega``. Ignored when + ``zeta_grid`` is supplied. + + Returns + ------- + pd.DataFrame + Columns: + - ``zeta_omega`` — the regularization value evaluated + - ``att`` — resulting ATT + - ``pre_fit_rmse`` — RMSE on the original pre-period + - ``max_unit_weight`` — max element of the composed + ``omega_eff`` (sensitivity indicator: close to 1 means + near-one-hot solutions; close to ``1/n_control`` means + near-uniform) + - ``effective_n`` — ``1 / sum(omega_eff**2)`` + + Notes + ----- + Extreme ``zeta_omega``: very small values push weights toward + sparse one-hot solutions (few controls dominate); very large + values push toward uniform weighting. The ``pre_fit_rmse`` column + exposes the tradeoff. + """ + if self._fit_snapshot is None: + raise ValueError( + "sensitivity_to_zeta_omega() requires the fit snapshot on the " + "results object. This result appears to have been loaded from " + "serialization (which excludes the snapshot) or was produced " + "by an older estimator version. Re-fit to enable." + ) + if self.time_weights_array is None: + raise ValueError( + "sensitivity_to_zeta_omega() needs the original time weights " + "array. Expected on the results object but found None. Re-fit " + "to populate." + ) + from diff_diff.utils import compute_sdid_estimator, compute_sdid_unit_weights + + snap = self._fit_snapshot + if zeta_grid is None: + if self.zeta_omega is None: + raise ValueError( + "Cannot build default zeta_grid: self.zeta_omega is None. " + "Supply zeta_grid explicitly." + ) + zeta_values: List[float] = [float(m * self.zeta_omega) for m in multipliers] + else: + zeta_values = [float(z) for z in zeta_grid] + + noise_level = self.noise_level if self.noise_level is not None else 0.0 + min_decrease = 1e-5 * noise_level if noise_level > 0 else 1e-5 + + if snap.w_treated is not None: + y_pre_t_mean = np.average( + snap.Y_pre_treated, axis=1, weights=snap.w_treated + ) + y_post_t_mean = np.average( + snap.Y_post_treated, axis=1, weights=snap.w_treated + ) + else: + y_pre_t_mean = np.mean(snap.Y_pre_treated, axis=1) + y_post_t_mean = np.mean(snap.Y_post_treated, axis=1) + + columns = [ + "zeta_omega", + "att", + "pre_fit_rmse", + "max_unit_weight", + "effective_n", + ] + if not zeta_values: + return pd.DataFrame(columns=columns) + + time_weights = np.asarray(self.time_weights_array, dtype=float) + rows: List[Dict[str, Any]] = [] + for z in zeta_values: + omega_fake = compute_sdid_unit_weights( + snap.Y_pre_control, + y_pre_t_mean, + zeta_omega=z, + min_decrease=min_decrease, + ) + if snap.w_control is not None: + omega_eff = omega_fake * snap.w_control + denom = omega_eff.sum() + if denom == 0: + rows.append( + { + "zeta_omega": z, + "att": float("nan"), + "pre_fit_rmse": float("nan"), + "max_unit_weight": float("nan"), + "effective_n": float("nan"), + } + ) + continue + omega_eff = omega_eff / denom + else: + omega_eff = omega_fake + + att = compute_sdid_estimator( + snap.Y_pre_control, + snap.Y_post_control, + y_pre_t_mean, + y_post_t_mean, + omega_eff, + time_weights, + ) + synthetic_pre = snap.Y_pre_control @ omega_eff + pre_fit = float(np.sqrt(np.mean((y_pre_t_mean - synthetic_pre) ** 2))) + herf = float(np.sum(omega_eff ** 2)) + rows.append( + { + "zeta_omega": z, + "att": float(att), + "pre_fit_rmse": pre_fit, + "max_unit_weight": float(np.max(omega_eff)), + "effective_n": float("nan") if herf == 0 else 1.0 / herf, + } + ) + return pd.DataFrame(rows, columns=columns) + + def get_weight_concentration(self, top_k: int = 5) -> Dict[str, Any]: + """ + Concentration metrics for the control unit weights. + + Operates on ``self.unit_weights``, which for survey-weighted fits + stores the composed effective weights + (``omega_eff = raw_omega * w_control``, renormalized to sum to 1) + that were applied to produce the ATT. For non-survey fits the + values equal the raw Frank-Wolfe solution. Either way, the + concentration reflects the distribution actually used by the + estimator. + + Parameters + ---------- + top_k : int, default 5 + Number of largest weights to sum for ``top_k_share``. Must be + non-negative. Clamped to the available number of control units. + + Returns + ------- + dict + Keys: + - ``effective_n`` — ``1 / sum(w**2)``, inverse Herfindahl + - ``herfindahl`` — ``sum(w**2)`` + - ``top_k_share`` — sum of the ``top_k`` largest weights + - ``top_k`` — the (possibly clamped) value used + + Raises + ------ + ValueError + If ``top_k`` is negative. + """ + if top_k < 0: + raise ValueError( + f"top_k must be non-negative (got {top_k})." + ) + weights = np.asarray(list(self.unit_weights.values()), dtype=float) + if weights.size == 0: + return { + "effective_n": float("nan"), + "herfindahl": float("nan"), + "top_k_share": float("nan"), + "top_k": 0, + } + herfindahl = float(np.sum(weights ** 2)) + effective_n = float("nan") if herfindahl == 0 else 1.0 / herfindahl + k = min(int(top_k), weights.size) + if k <= 0: + top_k_share = 0.0 + else: + top_k_share = float(np.sort(weights)[-k:].sum()) + return { + "effective_n": effective_n, + "herfindahl": herfindahl, + "top_k_share": top_k_share, + "top_k": k, + } + @property def is_significant(self) -> bool: """Check if the ATT is statistically significant at the alpha level.""" diff --git a/diff_diff/synthetic_did.py b/diff_diff/synthetic_did.py index e11bfeeb..ca8baf07 100644 --- a/diff_diff/synthetic_did.py +++ b/diff_diff/synthetic_did.py @@ -11,7 +11,7 @@ from diff_diff.estimators import DifferenceInDifferences from diff_diff.linalg import solve_ols -from diff_diff.results import SyntheticDiDResults +from diff_diff.results import SyntheticDiDResults, _SyntheticDiDFitSnapshot from diff_diff.utils import ( _compute_regularization, _sum_normalize, @@ -480,9 +480,13 @@ def fit( # type: ignore[override] time_weights, ) - # Compute pre-treatment fit (RMSE) using composed weights - synthetic_pre = Y_pre_control @ omega_eff - pre_fit_rmse = np.sqrt(np.mean((Y_pre_treated_mean - synthetic_pre) ** 2)) + # Compute pre-treatment fit (RMSE) using composed weights. + # Retain trajectories on results for plotting / custom diagnostics. + synthetic_pre_trajectory = Y_pre_control @ omega_eff + synthetic_post_trajectory = Y_post_control @ omega_eff + pre_fit_rmse = np.sqrt( + np.mean((Y_pre_treated_mean - synthetic_pre_trajectory) ** 2) + ) # Warn if pre-treatment fit is poor (Registry requirement). # Threshold: 1× SD of treated pre-treatment outcomes — a natural baseline @@ -502,6 +506,10 @@ def fit( # type: ignore[override] stacklevel=2, ) + # Treated-unit trajectories (the pre/post means already computed above). + treated_pre_trajectory = Y_pre_treated_mean + treated_post_trajectory = Y_post_treated_mean + # Compute standard errors based on variance_method if self.variance_method == "bootstrap": se, bootstrap_estimates = self._bootstrap_se( @@ -570,7 +578,47 @@ def fit( # type: ignore[override] unit_weights_dict = {unit_id: w for unit_id, w in zip(control_units, omega_eff)} time_weights_dict = {period: w for period, w in zip(pre_periods, time_weights)} - # Store results + # Jackknife LOO ID/role arrays parallel to placebo_effects positions + # (first n_control entries are control-LOO, next n_treated are treated-LOO; + # see _jackknife_se docstring). + loo_unit_ids: Optional[List[Any]] + loo_roles: Optional[List[str]] + if inference_method == "jackknife" and len(placebo_effects) > 0: + loo_unit_ids = list(control_units) + list(treated_units) + loo_roles = ["control"] * len(control_units) + ["treated"] * len(treated_units) + else: + loo_unit_ids = None + loo_roles = None + + fit_snapshot = _SyntheticDiDFitSnapshot( + Y_pre_control=Y_pre_control, + Y_post_control=Y_post_control, + Y_pre_treated=Y_pre_treated, + Y_post_treated=Y_post_treated, + control_unit_ids=list(control_units), + treated_unit_ids=list(treated_units), + pre_periods=list(pre_periods), + post_periods=list(post_periods), + w_control=w_control, + w_treated=w_treated, + ) + + # Freeze the public diagnostic arrays so mutation via the results + # object cannot silently invalidate later diagnostic calls. + for _arr in ( + synthetic_pre_trajectory, + synthetic_post_trajectory, + treated_pre_trajectory, + treated_post_trajectory, + time_weights, + ): + _arr.setflags(write=False) + + # Store results. Internal diagnostic state (_loo_*, _fit_snapshot) + # is attached as plain attributes after construction so that + # dataclass-recursive serializers (dataclasses.asdict, + # dataclasses.fields, dataclasses.replace) cannot reach retained + # panel state or unit IDs. self.results_ = SyntheticDiDResults( att=att, se=se, @@ -593,7 +641,15 @@ def fit( # type: ignore[override] placebo_effects=placebo_effects if len(placebo_effects) > 0 else None, n_bootstrap=self.n_bootstrap if inference_method == "bootstrap" else None, survey_metadata=survey_metadata, + synthetic_pre_trajectory=synthetic_pre_trajectory, + synthetic_post_trajectory=synthetic_post_trajectory, + treated_pre_trajectory=treated_pre_trajectory, + treated_post_trajectory=treated_post_trajectory, + time_weights_array=time_weights, ) + self.results_._loo_unit_ids = loo_unit_ids + self.results_._loo_roles = loo_roles + self.results_._fit_snapshot = fit_snapshot self._unit_weights = unit_weights self._time_weights = time_weights diff --git a/docs/doc-deps.yaml b/docs/doc-deps.yaml index b61a86a9..b0732453 100644 --- a/docs/doc-deps.yaml +++ b/docs/doc-deps.yaml @@ -566,6 +566,10 @@ sources: type: user_guide - path: docs/llms.txt type: user_guide + - path: docs/methodology/REGISTRY.md + section: "SyntheticDiD" + type: methodology + note: "SyntheticDiDResults hosts validation diagnostics (LOO, weight concentration, in-time placebo, zeta sensitivity)" diff_diff/bootstrap_utils.py: drift_risk: medium diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 278fd1ce..e14513ed 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -1513,6 +1513,18 @@ Convergence criterion: stop when objective decrease < min_decrease² (default mi - **Jackknife with survey weights**: Guards on effective positive support (omega * w_control > 0 and w_treated > 0) after composition, not raw FW counts. Returns NaN SE if fewer than 2 effective controls or 2 positive-weight treated units. Per-iteration zero-sum guards return NaN for individual LOO iterations when remaining composed weights sum to zero. - **Note:** Survey support: weights, strata, PSU, and FPC are all supported. Full-design surveys use Rao-Wu rescaled bootstrap (Phase 6); non-bootstrap variance methods (`variance_method="placebo"` or `"jackknife"`) require weights-only (strata/PSU/FPC require bootstrap). Both sides weighted per WLS regression interpretation: treated-side means are survey-weighted (Frank-Wolfe target and ATT formula); control-side synthetic weights are composed with survey weights post-optimization (ω_eff = ω * w_co, renormalized). Frank-Wolfe optimization itself is unweighted — survey importance enters after trajectory-matching. Covariate residualization uses WLS with survey weights. Placebo, jackknife, and bootstrap SE preserve survey weights on both sides. +*Validation diagnostics (post-fit methods on `SyntheticDiDResults`):* + +- **Trajectories** (`synthetic_pre_trajectory`, `synthetic_post_trajectory`, `treated_pre_trajectory`, `treated_post_trajectory`): retained on results to support plotting and custom fit metrics. `synthetic_pre_trajectory = Y_pre_control @ ω_eff`; `treated_pre_trajectory` is the survey-weighted treated mean (matches the Frank-Wolfe target). `pre_treatment_fit` is recoverable as `RMSE(treated_pre_trajectory, synthetic_pre_trajectory)`. +- **`get_loo_effects_df()`**: user-facing join of the jackknife leave-one-out pseudo-values (stored in `placebo_effects`) to the underlying unit identities. First `n_control` positions map to `control_unit_ids`, next `n_treated` to `treated_unit_ids` — positional ordering that mirrors `_jackknife_se`. `att_loo` is NaN when the zero-sum composed-weight guard fired for that unit; `delta_from_full = att_loo - att`. Requires `variance_method='jackknife'`; raises `ValueError` otherwise. +- **`get_weight_concentration(top_k=5)`**: returns `effective_n = 1/Σω²` (inverse Herfindahl), `herfindahl`, `top_k_share`, `top_k`. Operates on `self.unit_weights` which stores the composed `ω_eff`; for survey-weighted fits the metrics reflect the population-weighted concentration, not the raw Frank-Wolfe solution. +- **`in_time_placebo(fake_treatment_periods=None, zeta_omega_override=None, zeta_lambda_override=None)`**: re-slices the pre-window at each fake treatment period and re-fits both ω and λ via Frank-Wolfe. Default sweeps every feasible pre-period (position index `i ≥ 2` so ≥2 pre-fake periods remain for weight estimation, `i ≤ n_pre - 1` so ≥1 post-fake period exists). Credible designs produce near-zero placebo ATTs; departures indicate pre-treatment dynamics the estimator is picking up. + - **Note:** Regularization reuses `self.zeta_omega` / `self.zeta_lambda` from the original fit (matches R `synthdid` convention of treating regularization as a property of the fit). `*_override` re-fits with new values. + - **Note:** Infeasibility-only NaN — the method emits NaN for dimensional infeasibility (e.g., survey composition producing zero weight sum on the fake window); Frank-Wolfe non-convergence is not detectable mid-solver, so `pre_fit_rmse` is the user-facing signal for poor refit quality. Passing a `fake_treatment_period` in `post_periods` raises `ValueError` (not a placebo). +- **`sensitivity_to_zeta_omega(zeta_grid=None, multipliers=(0.25, 0.5, 1.0, 2.0, 4.0))`**: re-fits ω at each zeta value on the original pre-window. Default grid is `multipliers * self.zeta_omega` — a 5-point grid spanning 16x from smallest to largest multiplier, symmetric in log space around 1.0. Returns `att`, `pre_fit_rmse`, `max_unit_weight`, `effective_n` per row. + - **Note:** Time weights are held fixed at the original Frank-Wolfe output (`self.time_weights_array`), not re-fit. This isolates sensitivity to `zeta_omega` specifically; sensitivity to `zeta_lambda` is not currently exposed. + - **Note:** At `multiplier=1.0` (or `zeta_grid` containing `self.zeta_omega`), the ATT reproduces `self.att` to machine precision with the same seeded draw. + **Reference implementation(s):** - R: `synthdid::synthdid_estimate()` (Arkhangelsky et al.'s official package) - Key R functions matched: `sc.weight.fw()` (Frank-Wolfe), `sparsify_function` (sparsification), `vcov.synthdid_estimate()` (variance) diff --git a/tests/test_methodology_sdid.py b/tests/test_methodology_sdid.py index e4b3fd38..37ad91f6 100644 --- a/tests/test_methodology_sdid.py +++ b/tests/test_methodology_sdid.py @@ -1474,3 +1474,629 @@ def test_empty_post_raises(self): with pytest.raises(ValueError, match="Y_post_control has no rows"): compute_time_weights(Y_pre, Y_post, zeta_lambda=0.0) + + +# ============================================================================= +# Validation Diagnostics: Fit Snapshot, Trajectories, LOO, Concentration, +# In-Time Placebo, Regularization Sensitivity +# ============================================================================= + + +class TestFitSnapshot: + """Verify the private _fit_snapshot bundle carries the right state.""" + + def test_shapes_and_ordering(self): + df = _make_panel(n_control=10, n_treated=2, n_pre=5, n_post=3, seed=42) + sdid = SyntheticDiD(variance_method="jackknife", seed=42) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(5, 8))) + snap = res._fit_snapshot + assert snap is not None + assert snap.Y_pre_control.shape == (5, 10) + assert snap.Y_post_control.shape == (3, 10) + assert snap.Y_pre_treated.shape == (5, 2) + assert snap.Y_post_treated.shape == (3, 2) + assert len(snap.control_unit_ids) == 10 + assert len(snap.treated_unit_ids) == 2 + assert len(snap.pre_periods) == 5 + assert len(snap.post_periods) == 3 + # No unit appears in both sides + assert set(snap.control_unit_ids).isdisjoint(set(snap.treated_unit_ids)) + + def test_matrices_are_read_only(self): + df = _make_panel(seed=7) + sdid = SyntheticDiD(variance_method="jackknife", seed=7) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + snap = res._fit_snapshot + assert not snap.Y_pre_control.flags.writeable + assert not snap.Y_post_control.flags.writeable + assert not snap.Y_pre_treated.flags.writeable + assert not snap.Y_post_treated.flags.writeable + with pytest.raises(ValueError): + snap.Y_pre_control[0, 0] = -999.0 + + +class TestTrajectories: + """Synthetic and treated trajectories are exposed on results.""" + + def test_synthetic_pre_matches_weighted_sum(self): + df = _make_panel(seed=11) + sdid = SyntheticDiD(variance_method="jackknife", seed=11) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + omega_eff = np.array( + [res.unit_weights[u] for u in res._fit_snapshot.control_unit_ids] + ) + expected = res._fit_snapshot.Y_pre_control @ omega_eff + assert np.allclose(res.synthetic_pre_trajectory, expected, atol=1e-12) + + def test_treated_trajectory_matches_mean(self): + df = _make_panel(seed=13) + sdid = SyntheticDiD(variance_method="jackknife", seed=13) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + assert np.allclose( + res.treated_pre_trajectory, + np.mean(res._fit_snapshot.Y_pre_treated, axis=1), + atol=1e-12, + ) + assert np.allclose( + res.treated_post_trajectory, + np.mean(res._fit_snapshot.Y_post_treated, axis=1), + atol=1e-12, + ) + + def test_treated_trajectory_survey_weighted(self): + """Under survey weights, treated trajectories should equal the + survey-weighted mean per the registry; the synthetic trajectory + should match Y_pre_control @ omega_eff using composed weights.""" + from diff_diff.survey import SurveyDesign + + df = _make_panel(n_control=10, n_treated=3, n_pre=5, n_post=3, seed=83) + w_by_unit = {u: 1.0 for u in df["unit"].unique()} + w_by_unit[df["unit"].iloc[-1]] = 5.0 # skew one treated unit + df = df.assign(weight=df["unit"].map(w_by_unit)) + sdid = SyntheticDiD(variance_method="placebo", n_bootstrap=50, seed=83) + res = sdid.fit( + df, outcome="outcome", treatment="treated", + unit="unit", time="period", + survey_design=SurveyDesign(weights="weight", weight_type="pweight"), + ) + snap = res._fit_snapshot + w_t = snap.w_treated + expected_pre = np.average(snap.Y_pre_treated, axis=1, weights=w_t) + expected_post = np.average(snap.Y_post_treated, axis=1, weights=w_t) + assert np.allclose(res.treated_pre_trajectory, expected_pre, atol=1e-12) + assert np.allclose(res.treated_post_trajectory, expected_post, atol=1e-12) + + omega_eff = np.array( + [res.unit_weights[u] for u in snap.control_unit_ids] + ) + expected_synth_pre = snap.Y_pre_control @ omega_eff + assert np.allclose( + res.synthetic_pre_trajectory, expected_synth_pre, atol=1e-12 + ) + + def test_public_trajectory_arrays_are_read_only(self): + df = _make_panel(seed=87) + sdid = SyntheticDiD(variance_method="jackknife", seed=87) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + for arr in ( + res.synthetic_pre_trajectory, + res.synthetic_post_trajectory, + res.treated_pre_trajectory, + res.treated_post_trajectory, + res.time_weights_array, + ): + assert not arr.flags.writeable + + def test_pre_fit_rmse_recoverable(self): + df = _make_panel(seed=17) + sdid = SyntheticDiD(variance_method="jackknife", seed=17) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + rmse = float( + np.sqrt( + np.mean( + (res.treated_pre_trajectory - res.synthetic_pre_trajectory) ** 2 + ) + ) + ) + assert abs(rmse - res.pre_treatment_fit) < 1e-10 + + +class TestLooEffectsDf: + """get_loo_effects_df joins jackknife pseudo-values to unit identities.""" + + def _fit_jackknife(self, seed=42, **kwargs): + df = _make_panel(n_control=10, n_treated=3, n_pre=5, n_post=3, + seed=seed, **kwargs) + sdid = SyntheticDiD(variance_method="jackknife", seed=seed) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(5, 8))) + return res + + def test_columns_and_roles(self): + res = self._fit_jackknife() + loo = res.get_loo_effects_df() + assert list(loo.columns) == ["unit", "role", "att_loo", "delta_from_full"] + assert set(loo["role"]) == {"control", "treated"} + assert (loo["role"] == "control").sum() == res.n_control + assert (loo["role"] == "treated").sum() == res.n_treated + assert set(loo["unit"]) == set(res.unit_weights) | set( + res._fit_snapshot.treated_unit_ids + ) + + def test_delta_from_full_matches_math(self): + res = self._fit_jackknife() + loo = res.get_loo_effects_df() + deltas = loo["att_loo"] - res.att + assert np.allclose(loo["delta_from_full"], deltas, equal_nan=True) + + def test_sorted_by_absolute_delta_descending(self): + res = self._fit_jackknife() + loo = res.get_loo_effects_df().dropna(subset=["delta_from_full"]) + abs_delta = loo["delta_from_full"].abs().to_numpy() + assert np.all(abs_delta[:-1] >= abs_delta[1:]) + + def test_placebo_raises_value_error(self): + df = _make_panel(seed=21) + sdid = SyntheticDiD(variance_method="placebo", n_bootstrap=50, seed=21) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + with pytest.raises(ValueError, match="variance_method='jackknife'"): + res.get_loo_effects_df() + + def test_bootstrap_raises_value_error(self): + df = _make_panel(seed=23) + sdid = SyntheticDiD(variance_method="bootstrap", n_bootstrap=50, seed=23) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + with pytest.raises(ValueError, match="variance_method='jackknife'"): + res.get_loo_effects_df() + + def test_positional_mapping_matches_placebo_effects(self): + """First n_control positions in placebo_effects map to control_units, + next n_treated map to treated_units.""" + res = self._fit_jackknife() + pe = res.placebo_effects + ids = res._loo_unit_ids + roles = res._loo_roles + assert list(ids[: res.n_control]) == res._fit_snapshot.control_unit_ids + assert list(ids[res.n_control :]) == res._fit_snapshot.treated_unit_ids + assert roles[: res.n_control].count("control") == res.n_control + assert roles[res.n_control :].count("treated") == res.n_treated + # The DataFrame values equal placebo_effects values (up to row permutation) + loo = res.get_loo_effects_df() + assert np.allclose( + sorted(loo["att_loo"].dropna().to_numpy()), + sorted(pe[np.isfinite(pe)]), + atol=1e-12, + ) + + +class TestWeightConcentration: + """get_weight_concentration returns correct concentration metrics.""" + + def test_equal_weights_yield_effective_n_equal_to_count(self): + """With uniform weights 1/n, effective_n = n.""" + # Manually construct a results object with uniform weights + from diff_diff.results import SyntheticDiDResults + + uniform = {f"u{i}": 0.1 for i in range(10)} + res = SyntheticDiDResults( + att=0.0, se=0.0, t_stat=0.0, p_value=1.0, conf_int=(0.0, 0.0), + n_obs=100, n_treated=1, n_control=10, + unit_weights=uniform, time_weights={}, + pre_periods=[], post_periods=[], + ) + c = res.get_weight_concentration() + assert abs(c["effective_n"] - 10.0) < 1e-10 + assert abs(c["herfindahl"] - 0.1) < 1e-10 # 10 * 0.01 + assert c["top_k"] == 5 + assert abs(c["top_k_share"] - 0.5) < 1e-10 + + def test_single_unit_yields_effective_n_one(self): + from diff_diff.results import SyntheticDiDResults + + res = SyntheticDiDResults( + att=0.0, se=0.0, t_stat=0.0, p_value=1.0, conf_int=(0.0, 0.0), + n_obs=10, n_treated=1, n_control=1, + unit_weights={"only": 1.0}, time_weights={}, + pre_periods=[], post_periods=[], + ) + c = res.get_weight_concentration() + assert abs(c["effective_n"] - 1.0) < 1e-10 + assert abs(c["herfindahl"] - 1.0) < 1e-10 + + def test_top_k_clamped_to_n(self): + from diff_diff.results import SyntheticDiDResults + + res = SyntheticDiDResults( + att=0.0, se=0.0, t_stat=0.0, p_value=1.0, conf_int=(0.0, 0.0), + n_obs=30, n_treated=1, n_control=3, + unit_weights={"a": 0.5, "b": 0.3, "c": 0.2}, time_weights={}, + pre_periods=[], post_periods=[], + ) + c = res.get_weight_concentration(top_k=10) + assert c["top_k"] == 3 + assert abs(c["top_k_share"] - 1.0) < 1e-10 + + def test_negative_top_k_raises(self): + from diff_diff.results import SyntheticDiDResults + + res = SyntheticDiDResults( + att=0.0, se=0.0, t_stat=0.0, p_value=1.0, conf_int=(0.0, 0.0), + n_obs=30, n_treated=1, n_control=3, + unit_weights={"a": 0.5, "b": 0.3, "c": 0.2}, time_weights={}, + pre_periods=[], post_periods=[], + ) + with pytest.raises(ValueError, match="top_k must be non-negative"): + res.get_weight_concentration(top_k=-1) + + def test_uses_composed_weights_under_survey(self): + """Metrics come from self.unit_weights which stores composed ω_eff + for survey fits.""" + import pandas as pd + from diff_diff.survey import SurveyDesign + + df = _make_panel(n_control=8, n_treated=2, n_pre=4, n_post=2, seed=29) + # Add survey weights heavily favoring one control unit + w_by_unit = {u: 1.0 for u in df["unit"].unique()} + w_by_unit[0] = 20.0 # control unit 0 gets a heavy weight + df = df.assign(weight=df["unit"].map(w_by_unit)) + sdid = SyntheticDiD(variance_method="placebo", n_bootstrap=50, seed=29) + res = sdid.fit( + df, outcome="outcome", treatment="treated", + unit="unit", time="period", + survey_design=SurveyDesign(weights="weight", weight_type="pweight"), + ) + c = res.get_weight_concentration() + # Composed metrics reflect unit_weights dict exactly + import numpy as np + + weights = np.array(list(res.unit_weights.values())) + expected_effective_n = 1.0 / np.sum(weights ** 2) + assert abs(c["effective_n"] - expected_effective_n) < 1e-10 + + +class TestInTimePlacebo: + """in_time_placebo re-estimates on shifted fake dates.""" + + def test_default_sweep_shape(self): + df = _make_panel(n_control=10, n_treated=2, n_pre=8, n_post=3, + att=0.0, seed=31) + sdid = SyntheticDiD(variance_method="jackknife", seed=31) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(8, 11))) + placebo = res.in_time_placebo() + # Feasible positions: i in [2, n_pre - 1] = [2, 7] -> 6 rows + assert len(placebo) == 6 + assert list(placebo.columns) == [ + "fake_treatment_period", "att", "pre_fit_rmse", + "n_pre_fake", "n_post_fake", + ] + + def test_no_effect_dgp_gives_small_placebo_atts(self): + """On a clean no-effect DGP, placebo ATTs should be small vs the + DGP's noise level.""" + df = _make_panel(n_control=20, n_treated=3, n_pre=8, n_post=3, + att=0.0, seed=33) + sdid = SyntheticDiD(variance_method="jackknife", seed=33) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(8, 11))) + placebo = res.in_time_placebo() + median_abs = float(placebo["att"].abs().median()) + # Noise sd in _make_panel is 0.5; ATTs should be well under that at + # the median. Loose threshold to stay stable under seed variation. + assert median_abs < 1.0 + + def test_explicit_list_overrides_sweep(self): + df = _make_panel(n_control=10, n_treated=2, n_pre=8, n_post=3, seed=37) + sdid = SyntheticDiD(variance_method="jackknife", seed=37) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(8, 11))) + placebo = res.in_time_placebo(fake_treatment_periods=[4, 6]) + assert len(placebo) == 2 + assert set(placebo["fake_treatment_period"]) == {4, 6} + + def test_post_period_raises(self): + df = _make_panel(n_control=10, n_treated=2, n_pre=6, n_post=3, seed=41) + sdid = SyntheticDiD(variance_method="jackknife", seed=41) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(6, 9))) + with pytest.raises(ValueError, match="post_periods"): + res.in_time_placebo(fake_treatment_periods=[6]) + + def test_missing_period_raises(self): + df = _make_panel(n_control=10, n_treated=2, n_pre=6, n_post=3, seed=43) + sdid = SyntheticDiD(variance_method="jackknife", seed=43) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(6, 9))) + with pytest.raises(ValueError, match="not found in pre_periods"): + res.in_time_placebo(fake_treatment_periods=[999]) + + def test_empty_default_sweep_preserves_schema(self): + """When n_pre < 3, the default sweep is empty. The DataFrame must + still carry the documented columns.""" + df = _make_panel(n_control=10, n_treated=2, n_pre=2, n_post=3, seed=50) + sdid = SyntheticDiD(variance_method="jackknife", seed=50) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(2, 5))) + placebo = res.in_time_placebo() + assert len(placebo) == 0 + assert list(placebo.columns) == [ + "fake_treatment_period", "att", "pre_fit_rmse", + "n_pre_fake", "n_post_fake", + ] + + def test_zeta_override_changes_result(self): + df = _make_panel(n_control=10, n_treated=2, n_pre=8, n_post=3, seed=47) + sdid = SyntheticDiD(variance_method="jackknife", seed=47) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(8, 11))) + default_ = res.in_time_placebo(fake_treatment_periods=[5]) + override_ = res.in_time_placebo( + fake_treatment_periods=[5], + zeta_omega_override=res.zeta_omega * 100, + ) + # Different regularization should change at least one of att/rmse + assert not np.isclose( + default_["pre_fit_rmse"].iloc[0], + override_["pre_fit_rmse"].iloc[0], + atol=1e-8, + ) + + +class TestSensitivityToZetaOmega: + """sensitivity_to_zeta_omega sweeps regularization values.""" + + def test_multiplier_one_reproduces_original_att(self): + df = _make_panel(n_control=12, n_treated=2, n_pre=6, n_post=3, + att=2.0, seed=53) + sdid = SyntheticDiD(variance_method="jackknife", seed=53) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(6, 9))) + sens = res.sensitivity_to_zeta_omega() + # Row where zeta_omega == self.zeta_omega (multiplier 1.0) + match = sens.loc[np.isclose(sens["zeta_omega"], res.zeta_omega)] + assert len(match) == 1 + assert abs(match["att"].iloc[0] - res.att) < 1e-10 + + def test_grid_length_matches_default_multipliers(self): + df = _make_panel(seed=59) + sdid = SyntheticDiD(variance_method="jackknife", seed=59) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + sens = res.sensitivity_to_zeta_omega() + assert len(sens) == 5 + + def test_default_grid_values_match_documented_multipliers(self): + """Assert the exact documented default grid values, not just length, + so the contract cannot drift unnoticed.""" + df = _make_panel(seed=60) + sdid = SyntheticDiD(variance_method="jackknife", seed=60) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + sens = res.sensitivity_to_zeta_omega() + expected = np.array([0.25, 0.5, 1.0, 2.0, 4.0]) * res.zeta_omega + assert np.allclose(sens["zeta_omega"].to_numpy(), expected, atol=1e-12) + + def test_explicit_grid_overrides_multipliers(self): + df = _make_panel(seed=61) + sdid = SyntheticDiD(variance_method="jackknife", seed=61) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + sens = res.sensitivity_to_zeta_omega(zeta_grid=[0.1, 1.0, 10.0]) + assert len(sens) == 3 + assert np.allclose(sens["zeta_omega"], [0.1, 1.0, 10.0]) + + def test_effective_n_grows_with_zeta(self): + """Higher regularization pushes weights toward uniform, so effective_n + should be monotone non-decreasing across the default grid.""" + df = _make_panel(n_control=15, n_treated=2, n_pre=8, n_post=3, seed=67) + sdid = SyntheticDiD(variance_method="jackknife", seed=67) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(8, 11))) + sens = res.sensitivity_to_zeta_omega() + eff_n = sens["effective_n"].to_numpy() + # Allow tiny wobble from solver tolerance + assert np.all(np.diff(eff_n) >= -1e-6) + + def test_columns_shape(self): + df = _make_panel(seed=71) + sdid = SyntheticDiD(variance_method="jackknife", seed=71) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + sens = res.sensitivity_to_zeta_omega() + assert list(sens.columns) == [ + "zeta_omega", "att", "pre_fit_rmse", + "max_unit_weight", "effective_n", + ] + + def test_empty_zeta_grid_preserves_schema(self): + df = _make_panel(seed=91) + sdid = SyntheticDiD(variance_method="jackknife", seed=91) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + sens = res.sensitivity_to_zeta_omega(zeta_grid=[]) + assert sens.shape == (0, 5) + assert list(sens.columns) == [ + "zeta_omega", "att", "pre_fit_rmse", + "max_unit_weight", "effective_n", + ] + + def test_empty_multipliers_preserves_schema(self): + df = _make_panel(seed=93) + sdid = SyntheticDiD(variance_method="jackknife", seed=93) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + sens = res.sensitivity_to_zeta_omega(multipliers=()) + assert sens.shape == (0, 5) + assert list(sens.columns) == [ + "zeta_omega", "att", "pre_fit_rmse", + "max_unit_weight", "effective_n", + ] + + +class TestPractitionerSdidReferences: + """_handle_synthetic in practitioner.py references real callables.""" + + def test_snippets_reference_existing_methods(self): + """Each code snippet in _handle_synthetic() should reference methods + that actually exist on SyntheticDiDResults.""" + from diff_diff.practitioner import _handle_synthetic + from diff_diff.results import SyntheticDiDResults + + df = _make_panel(seed=73) + sdid = SyntheticDiD(variance_method="jackknife", seed=73) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + steps, _ = _handle_synthetic(res) + expected_methods = [ + "get_weight_concentration", + "in_time_placebo", + "get_loo_effects_df", + "sensitivity_to_zeta_omega", + ] + joined_code = "\n".join(step["code"] for step in steps) + for method in expected_methods: + assert method in joined_code, ( + f"Expected {method}() referenced in practitioner guidance" + ) + assert hasattr(SyntheticDiDResults, method), ( + f"Practitioner guidance references {method}() but it's not " + "on SyntheticDiDResults" + ) + + def test_snippets_parse_as_python(self): + """Each snippet in _handle_synthetic should be syntactically valid.""" + import ast + from diff_diff.practitioner import _handle_synthetic + + df = _make_panel(seed=77) + sdid = SyntheticDiD(variance_method="jackknife", seed=77) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + steps, _ = _handle_synthetic(res) + for step in steps: + ast.parse(step["code"]) + + def test_jackknife_loo_snippet_handles_unavailable_loo(self): + """When variance_method='jackknife' but LOO is unavailable + (e.g., n_treated=1 returns empty jackknife array), the LOO snippet + should degrade gracefully instead of raising.""" + from diff_diff.practitioner import _handle_synthetic + + df = _make_panel(n_control=10, n_treated=1, n_pre=5, n_post=3, seed=97) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + sdid = SyntheticDiD(variance_method="jackknife", seed=97) + res = sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period", + post_periods=list(range(5, 8))) + assert res.variance_method == "jackknife" + assert res._loo_unit_ids is None # LOO intentionally unavailable + + steps, _ = _handle_synthetic(res) + loo_snippet = next( + s["code"] for s in steps if "get_loo_effects_df" in s["code"] + ) + # Executing the snippet against this result must not raise. + import io + import contextlib + captured = io.StringIO() + with contextlib.redirect_stdout(captured): + exec(loo_snippet, {"results": res}) + assert "LOO not available" in captured.getvalue() + + +class TestSyntheticDiDResultsPickle: + """Pickle round-trip drops the fit snapshot; diagnostic methods raise + with the documented recovery message.""" + + def _fit(self, seed=101): + df = _make_panel(seed=seed) + sdid = SyntheticDiD(variance_method="jackknife", seed=seed) + return sdid.fit(df, outcome="outcome", treatment="treated", + unit="unit", time="period") + + def test_snapshot_dropped_on_pickle(self): + import pickle + + res = self._fit() + assert res._fit_snapshot is not None # present pre-pickle + + restored = pickle.loads(pickle.dumps(res)) + assert restored._fit_snapshot is None + # Public fields survive + assert restored.att == res.att + assert restored.se == res.se + assert np.allclose( + restored.synthetic_pre_trajectory, res.synthetic_pre_trajectory + ) + + def test_in_time_placebo_raises_after_pickle(self): + import pickle + + res = self._fit(seed=103) + restored = pickle.loads(pickle.dumps(res)) + with pytest.raises(ValueError, match="fit snapshot"): + restored.in_time_placebo() + + def test_sensitivity_raises_after_pickle(self): + import pickle + + res = self._fit(seed=105) + restored = pickle.loads(pickle.dumps(res)) + with pytest.raises(ValueError, match="fit snapshot"): + restored.sensitivity_to_zeta_omega() + + def test_live_instance_snapshot_untouched_by_getstate(self): + """__getstate__ must not mutate the live object's snapshot — + only the returned state dict carries the nulled field.""" + res = self._fit(seed=107) + snap_before = res._fit_snapshot + assert snap_before is not None + _ = res.__getstate__() + # Live instance unchanged after __getstate__ call + assert res._fit_snapshot is snap_before + # Diagnostics still work in the live session + _ = res.in_time_placebo(fake_treatment_periods=[2]) + + def test_snapshot_excluded_from_dataclass_fields(self): + """_fit_snapshot and _loo_* must be plain instance attributes, not + dataclass fields, so dataclass-recursive serializers (asdict, + fields, replace) cannot reach retained panel state.""" + import dataclasses + + res = self._fit(seed=109) + field_names = {f.name for f in dataclasses.fields(res)} + assert "_fit_snapshot" not in field_names + assert "_loo_unit_ids" not in field_names + assert "_loo_roles" not in field_names + + def test_asdict_excludes_internal_diagnostic_state(self): + """dataclasses.asdict() must not recurse into the retained panel + snapshot or the LOO unit ID arrays.""" + import dataclasses + + res = self._fit(seed=111) + assert res._fit_snapshot is not None # live instance retains it + d = dataclasses.asdict(res) + assert "_fit_snapshot" not in d + assert "_loo_unit_ids" not in d + assert "_loo_roles" not in d