diff --git a/TODO.md b/TODO.md index 0ac90eb9..189eab95 100644 --- a/TODO.md +++ b/TODO.md @@ -89,7 +89,8 @@ Deferred items from PR reviews that were not addressed before merge. | `bias_corrected_local_linear`: support `weights=` once survey-design adaptation lands. nprobust's `lprobust` has no weight argument so there is no parity anchor; derivation needed. | `diff_diff/local_linear.py`, `diff_diff/_nprobust_port.py::lprobust` | Phase 1c | Medium | | `bias_corrected_local_linear`: support multi-eval grid (`neval > 1`) with cross-covariance (`covgrid=TRUE` branch of `lprobust.R:253-378`). Not needed for HAD but useful for multi-dose diagnostics. | `diff_diff/_nprobust_port.py::lprobust` | Phase 1c | Low | | Clustered-DGP parity: Phase 1c's DGP 4 uses manual `h=b=0.3` to sidestep an nprobust-internal singleton-cluster bug in `lpbwselect.mse.dpi`'s pilot fits. Once nprobust ships a fix (or we derive one independently), add a clustered-auto-bandwidth parity test. | `benchmarks/R/generate_nprobust_lprobust_golden.R` | Phase 1c | Low | -| `HeterogeneousAdoptionDiD` Phase 2b: multi-period event-study extension (Appendix B.2). `aggregate="event_study"` raises `NotImplementedError` in Phase 2a; Phase 2b will aggregate per-cohort first-differences into an event-study result with `.event_study_effects` / `.event_study_se` result fields. | `diff_diff/had.py`, `tests/test_had.py` | Phase 2a | Medium | +| `HeterogeneousAdoptionDiD` joint cross-horizon covariance on event study: per-horizon SEs use INDEPENDENT sandwiches in Phase 2b (paper-faithful pointwise CIs per Pierce-Schott Figure 2). A follow-up could derive an IF-based stacking of per-horizon scores for joint cross-horizon inference (needed for joint hypothesis tests across event-time horizons). Block-bootstrap is a reasonable alternative. | `diff_diff/had.py::_fit_event_study` | Phase 2b | Low | +| `HeterogeneousAdoptionDiD` event-study staggered-timing beyond last cohort: Phase 2b auto-filters staggered panels to the last cohort per paper Appendix B.2. Earlier-cohort treatment effects are not identified by HAD; redirecting to `ChaisemartinDHaultfoeuille` / `did_multiplegt_dyn` is the paper's prescription. A full staggered HAD would require a different identification path (out of paper scope). | `diff_diff/had.py::_validate_had_panel_event_study` | Phase 2b | Low | | `HeterogeneousAdoptionDiD`: survey-design integration (`survey=SurveyDesign(...)`). Currently raises `NotImplementedError`. Requires Taylor-linearization of the β-scale rescaling and replicate-weight-compatible 2SLS variance on the mass-point path. | `diff_diff/had.py` | Phase 2a | Medium | | `HeterogeneousAdoptionDiD`: `weights=` support. Deferred jointly with survey integration. nprobust's `lprobust` has no weight argument so the nonparametric continuous path needs a derivation; the 2SLS mass-point path needs weighted-sandwich parity. | `diff_diff/had.py` | Phase 2a | Medium | | `HeterogeneousAdoptionDiD` mass-point: `vcov_type in {"hc2", "hc2_bm"}` raises `NotImplementedError` pending a 2SLS-specific leverage derivation. The OLS leverage `x_i' (X'X)^{-1} x_i` is wrong for 2SLS; the correct finite-sample correction uses `x_i' (Z'X)^{-1} (...) (X'Z)^{-1} x_i`. Needs derivation plus an R / Stata (`ivreg2 small robust`) parity anchor. | `diff_diff/had.py::_fit_mass_point_2sls` | Phase 2a | Medium | @@ -97,7 +98,7 @@ Deferred items from PR reviews that were not addressed before merge. | `HeterogeneousAdoptionDiD` Phase 3: `qug_test()`, `stute_test()`, `yatchew_hr_test()` pre-test diagnostics (paper Section 3.3). Composite helper `did_had_pretest_workflow()`. Not part of Phase 2a scope. | `diff_diff/had.py`, new module | Phase 2a | Medium | | `HeterogeneousAdoptionDiD` Phase 4: Pierce-Schott (2016) replication harness; reproduce paper Figure 2 values and Table 1 coverage rates. | `benchmarks/`, `tests/` | Phase 2a | Low | | `HeterogeneousAdoptionDiD` Phase 5: `practitioner_next_steps()` integration, tutorial notebook, and `llms.txt` updates (preserving UTF-8 fingerprint). | `diff_diff/practitioner.py`, `tutorials/`, `diff_diff/guides/` | Phase 2a | Low | -| `HeterogeneousAdoptionDiD` staggered-timing reduction: Phase 2a requires exactly 2 time periods and raises on `>2` periods with or without `first_treat_col`. A "last-cohort subgroup" reduction scheme (slice to max-cohort's 2-period window) could lift this in a targeted follow-up PR before full Phase 2b multi-period aggregation. | `diff_diff/had.py::_validate_had_panel` | Phase 2a | Low | +| `HeterogeneousAdoptionDiD` time-varying dose on event study: Phase 2b REJECTS panels where `D_{g,t}` varies within a unit for `t >= F` (the aggregation uses `D_{g, F}` as the single regressor for all horizons, paper Appendix B.2 constant-dose convention). A follow-up PR could add a time-varying-dose estimator for these panels; current behavior is front-door rejection with a redirect to `ChaisemartinDHaultfoeuille`. | `diff_diff/had.py::_validate_had_panel_event_study` | Phase 2b | Low | | `HeterogeneousAdoptionDiD` repeated-cross-section support: paper Section 2 defines HAD on panel OR repeated cross-section, but Phase 2a is panel-only. RCS inputs (disjoint unit IDs between periods) are rejected by the balanced-panel validator with the generic "unit(s) do not appear in both periods" error. A follow-up PR will add an RCS identification path based on pre/post cell means (rather than unit-level first differences), with its own validator and a distinct `data_mode` / API surface. | `diff_diff/had.py::_validate_had_panel`, `diff_diff/had.py::_aggregate_first_difference` | Phase 2a | Medium | #### Performance diff --git a/diff_diff/__init__.py b/diff_diff/__init__.py index 5be7e97c..7c04bf50 100644 --- a/diff_diff/__init__.py +++ b/diff_diff/__init__.py @@ -57,6 +57,7 @@ ) from diff_diff.had import ( HeterogeneousAdoptionDiD, + HeterogeneousAdoptionDiDEventStudyResults, HeterogeneousAdoptionDiDResults, ) from diff_diff.estimators import ( @@ -436,9 +437,10 @@ # Bias-corrected local-linear (Phase 1c for HeterogeneousAdoptionDiD) "BiasCorrectedFit", "bias_corrected_local_linear", - # HeterogeneousAdoptionDiD (Phase 2a) + # HeterogeneousAdoptionDiD (Phase 2a single-period, Phase 2b event study) "HeterogeneousAdoptionDiD", "HeterogeneousAdoptionDiDResults", + "HeterogeneousAdoptionDiDEventStudyResults", "HAD", # Datasets "load_card_krueger", diff --git a/diff_diff/had.py b/diff_diff/had.py index d9ba428c..ebf1a036 100644 --- a/diff_diff/had.py +++ b/diff_diff/had.py @@ -38,8 +38,13 @@ error via the structural-residual 2SLS sandwich (see ``_fit_mass_point_2sls``). -Phase 2a ships the single-period path only; the multi-period event-study -extension (paper Appendix B.2) is queued for Phase 2b. +Phase 2a ships the single-period WAS estimator (``aggregate="overall"``). +Phase 2b adds the multi-period event-study extension (paper Appendix B.2) +via ``aggregate="event_study"``: per-horizon WAS estimates with pointwise +CIs, including pre-period placebos, reusing the three Phase 2a design +paths on per-horizon first differences anchored at ``Y_{g, F-1}``. +Staggered-timing panels are auto-filtered to the last-treatment cohort +per paper Appendix B.2 prescription. Infrastructure reused from Phase 1: - ``diff_diff.local_linear.bias_corrected_local_linear`` (Phase 1c) @@ -61,7 +66,7 @@ import warnings from dataclasses import dataclass -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd @@ -76,6 +81,7 @@ __all__ = [ "HeterogeneousAdoptionDiD", "HeterogeneousAdoptionDiDResults", + "HeterogeneousAdoptionDiDEventStudyResults", ] @@ -118,6 +124,53 @@ } +# ============================================================================= +# JSON-serialization helpers +# ============================================================================= + + +def _json_safe_scalar(x: Any) -> Any: + """Coerce a scalar to a JSON-serializable type. + + - NumPy scalars (``np.int64``, ``np.float64``, ``np.bool_``) -> + native Python via ``.item()`` + - ``pd.Timestamp`` / ``pd.Timedelta`` -> ISO 8601 string via + ``.isoformat()`` + - Everything else returned as-is. + + The ``to_dict`` methods use this to keep the returned dict + serializable via ``json.dumps`` regardless of the underlying + pandas/numpy dtype of the time / first_treat columns. + """ + if isinstance(x, (pd.Timestamp, pd.Timedelta)): + return x.isoformat() + if hasattr(x, "item") and callable(getattr(x, "item")): + try: + return x.item() + except (AttributeError, ValueError, TypeError): + return x + return x + + +def _json_safe_filter_info( + filter_info: Optional[Dict[str, Any]], +) -> Optional[Dict[str, Any]]: + """Normalize a ``filter_info`` dict to JSON-safe scalars. + + Returns ``None`` unchanged; otherwise coerces ``F_last`` and each + entry in ``dropped_cohorts`` via :func:`_json_safe_scalar`. Int + counts are cast to ``int`` for stability. + """ + if filter_info is None: + return None + return { + "F_last": _json_safe_scalar(filter_info.get("F_last")), + "n_kept": int(filter_info.get("n_kept", 0)), + "n_dropped": int(filter_info.get("n_dropped", 0)), + "dropped_cohorts": [_json_safe_scalar(c) for c in filter_info.get("dropped_cohorts", [])], + } + + # ============================================================================= # Results dataclass # ============================================================================= @@ -370,6 +423,271 @@ def to_dataframe(self) -> pd.DataFrame: return pd.DataFrame([self.to_dict()]) +@dataclass +class HeterogeneousAdoptionDiDEventStudyResults: + """Event-study results for :class:`HeterogeneousAdoptionDiD` (Phase 2b). + + Per-horizon arrays align with ``event_times`` by index; all per-horizon + arrays have shape ``(n_horizons,)``. The anchor horizon ``e = -1`` + (i.e., ``t = F - 1``) is NOT included because + ``Y_{g, F-1} - Y_{g, F-1} = 0`` trivially and the WAS is not identified + there. + + Per-horizon inference fields (``t_stat``, ``p_value``, ``conf_int_low``, + ``conf_int_high``) are NaN-coupled to the per-horizon ``se`` via + :func:`diff_diff.utils.safe_inference`; ``att`` and ``se`` themselves + are raw estimator outputs from the chosen design path on each + horizon's first differences. + + Design resolution is SHARED across horizons: the design, ``d_lower``, + ``target_parameter``, and ``inference_method`` are single scalars + determined once from the post-period dose distribution ``D_{g, F}`` + (paper Appendix B.2 convention — the dose regressor is invariant + across event-time horizons). + + Attributes + ---------- + event_times : np.ndarray, shape (n_horizons,) + Integer event-time labels ``e = t - F``, sorted ascending. + Excludes ``e = -1`` (the anchor). Post-period horizons have + ``e >= 0``; pre-period placebos have ``e <= -2``. + att : np.ndarray, shape (n_horizons,) + Per-horizon WAS point estimate on the beta-scale (see + :class:`HeterogeneousAdoptionDiDResults.att` for the per-design + formula, applied to ``ΔY_t = Y_{g,t} - Y_{g,F-1}``). + se : np.ndarray, shape (n_horizons,) + Per-horizon standard error on the beta-scale. Each horizon uses + the INDEPENDENT per-period sandwich from the chosen design path + (continuous: CCT-2014 robust divided by ``|den|``; mass-point: + structural-residual 2SLS sandwich). Pointwise CIs only — joint + cross-horizon covariance is not computed in Phase 2b (paper + reports pointwise CIs per Pierce-Schott). + t_stat, p_value : np.ndarray, shape (n_horizons,) + Per-horizon inference triple element. + conf_int_low, conf_int_high : np.ndarray, shape (n_horizons,) + Per-horizon CI endpoints at level ``alpha``. + n_obs_per_horizon : np.ndarray, shape (n_horizons,) + Per-horizon sample size (units contributing at that event time). + In Phase 2b this equals ``n_units`` for every horizon because the + validator rejects NaN in outcome / dose / unit columns upstream; + tracked as a field for future flexibility (e.g., per-period + missingness). + alpha : float + CI level used at fit time (0.05 for a 95% CI). + design : str + Resolved design mode, shared across horizons: + ``"continuous_at_zero"``, ``"continuous_near_d_lower"``, or + ``"mass_point"``. + target_parameter : str + Estimand label: ``"WAS"`` for Design 1' (continuous_at_zero), + ``"WAS_d_lower"`` for the other two. + d_lower : float + Support infimum used for all horizons. ``0.0`` for Design 1'; + ``float(d.min())`` otherwise. + dose_mean : float + ``D_bar = (1/G) * sum(D_{g,F})`` computed on the fit sample (after + the staggered last-cohort filter, if applied). + F : object + First-treatment period label (arbitrary dtype — int, str, + datetime). Identified by the multi-period dose invariant from the + fitted data. + n_units : int + Number of unique units contributing to the fit. After staggered + auto-filter: last-cohort units PLUS never-treated (``first_treat = 0``) + units retained as the untreated-group comparison per paper + Appendix B.2. Only earlier-treated cohorts are dropped. + inference_method : str + ``"analytical_nonparametric"`` (continuous designs) or + ``"analytical_2sls"`` (mass-point). Shared across horizons. + vcov_type : str or None + Effective variance-covariance family used on the mass-point path + (``"classical"``, ``"hc1"``, or ``"cr1"`` when cluster supplied). + ``None`` on the continuous paths (they use CCT-2014 robust SE). + cluster_name : str or None + Column name of the cluster variable when cluster-robust SE is + requested. ``None`` otherwise. + survey_metadata : object or None + Always ``None`` in Phase 2b. Field shape kept for future-compat. + bandwidth_diagnostics : list[BandwidthResult] or None + Per-horizon bandwidth diagnostics on the continuous paths; + ``None`` on the mass-point path. When non-None, aligned with + ``event_times`` by index. + bias_corrected_fit : list[BiasCorrectedFit] or None + Per-horizon bias-corrected fit on the continuous paths; ``None`` + on the mass-point path. When non-None, aligned with + ``event_times`` by index. + filter_info : dict or None + Populated when the staggered-timing last-cohort auto-filter fires. + Keys: ``"F_last"`` (kept cohort label), ``"n_kept"`` (units + retained), ``"n_dropped"`` (units dropped), ``"dropped_cohorts"`` + (list of dropped cohort labels). ``None`` when no filter was + applied. + """ + + # Per-horizon arrays + event_times: np.ndarray + att: np.ndarray + se: np.ndarray + t_stat: np.ndarray + p_value: np.ndarray + conf_int_low: np.ndarray + conf_int_high: np.ndarray + n_obs_per_horizon: np.ndarray + + # Shared metadata + alpha: float + design: str + target_parameter: str + d_lower: float + dose_mean: float + F: Any + n_units: int + inference_method: str + vcov_type: Optional[str] + cluster_name: Optional[str] + survey_metadata: Optional[Any] + + # Per-horizon diagnostics (lists, None on mass-point). + # List entries may be None for horizons where the continuous-path fit + # caught a degenerate bandwidth-selector failure (constant / perfectly- + # linear outcome); att / se for those horizons are NaN as well. + bandwidth_diagnostics: Optional[List[Optional[BandwidthResult]]] + bias_corrected_fit: Optional[List[Optional[BiasCorrectedFit]]] + + # Staggered auto-filter metadata + filter_info: Optional[Dict[str, Any]] + + def __repr__(self) -> str: + return ( + f"HeterogeneousAdoptionDiDEventStudyResults(" + f"n_horizons={len(self.event_times)}, " + f"design={self.design!r}, n_units={self.n_units})" + ) + + def summary(self) -> str: + """Formatted per-horizon summary table.""" + width = 80 + conf_level = int((1 - self.alpha) * 100) + lines = [ + "=" * width, + "HeterogeneousAdoptionDiD Event-Study Results".center(width), + "=" * width, + "", + f"{'Design:':<30} {self.design:>20}", + f"{'Target parameter:':<30} {self.target_parameter:>20}", + f"{'d_lower:':<30} {self.d_lower:>20.6g}", + f"{'D_bar (dose mean):':<30} {self.dose_mean:>20.6g}", + f"{'First-treatment period (F):':<30} {str(self.F):>20}", + f"{'Observations (units):':<30} {self.n_units:>20}", + f"{'Horizons:':<30} {len(self.event_times):>20}", + f"{'Inference method:':<30} {self.inference_method:>20}", + ] + if self.vcov_type is not None: + if self.cluster_name is not None: + label = f"CR1 at {self.cluster_name}" + else: + label = self.vcov_type + lines.append(f"{'Variance:':<30} {label:>20}") + if self.filter_info is not None: + lines.append( + f"{'Last-cohort filter (F_last):':<30} " + f"{str(self.filter_info.get('F_last')):>20}" + ) + lines.append( + f"{' Units kept / dropped:':<30} " + f"{self.filter_info.get('n_kept', 0)} / " + f"{self.filter_info.get('n_dropped', 0):<8}".rjust(51) + ) + lines.extend( + [ + "", + "-" * width, + ( + f"{'Event-time':>10} {'Estimate':>12} {'Std. Err.':>12} " + f"{'t-stat':>10} {'P>|t|':>10} " + f"{str(conf_level) + '% CI':>22}" + ), + "-" * width, + ] + ) + for i, e in enumerate(self.event_times): + ci_str = f"[{self.conf_int_low[i]:.4f}, {self.conf_int_high[i]:.4f}]" + # Default float formatting renders non-finite values as "nan"; + # we do not override this here since the column width is fixed + # and lowercase "nan" is unambiguous. + se_i = self.se[i] + t_i = self.t_stat[i] + p_i = self.p_value[i] + lines.append( + f"{int(e):>10} {self.att[i]:>12.4f} " + f"{se_i:>12.4f} {t_i:>10.3f} {p_i:>10.4f} {ci_str:>22}" + ) + lines.extend( + [ + "-" * width, + "", + "=" * width, + ] + ) + return "\n".join(lines) + + def print_summary(self) -> None: + """Print the summary to stdout.""" + print(self.summary()) + + def to_dict(self) -> Dict[str, Any]: + """Return results as a dict with per-horizon arrays and scalars. + + Per-horizon arrays are converted to Python lists via + ``ndarray.tolist()`` (which unwraps NumPy scalar elements to + native ``int`` / ``float``); scalar fields are coerced to + native Python types via ``_json_safe_scalar`` where relevant + (NumPy scalars -> ``.item()``, pandas ``Timestamp`` -> ISO + string, ``Timedelta`` -> ISO string). The returned dict is + JSON-serializable directly via ``json.dumps``. + """ + return { + "event_times": self.event_times.tolist(), + "att": self.att.tolist(), + "se": self.se.tolist(), + "t_stat": self.t_stat.tolist(), + "p_value": self.p_value.tolist(), + "conf_int_low": self.conf_int_low.tolist(), + "conf_int_high": self.conf_int_high.tolist(), + "n_obs_per_horizon": self.n_obs_per_horizon.tolist(), + "alpha": float(self.alpha), + "design": self.design, + "target_parameter": self.target_parameter, + "d_lower": float(self.d_lower), + "dose_mean": float(self.dose_mean), + "F": _json_safe_scalar(self.F), + "n_units": int(self.n_units), + "inference_method": self.inference_method, + "vcov_type": self.vcov_type, + "cluster_name": self.cluster_name, + "filter_info": _json_safe_filter_info(self.filter_info), + } + + def to_dataframe(self) -> pd.DataFrame: + """Return a tidy per-horizon DataFrame. + + Columns: ``event_time, att, se, t_stat, p_value, conf_int_low, + conf_int_high, n_obs``. One row per event-time horizon. + """ + return pd.DataFrame( + { + "event_time": self.event_times, + "att": self.att, + "se": self.se, + "t_stat": self.t_stat, + "p_value": self.p_value, + "conf_int_low": self.conf_int_low, + "conf_int_high": self.conf_int_high, + "n_obs": self.n_obs_per_horizon, + } + ) + + # ============================================================================= # Panel validation and aggregation # ============================================================================= @@ -430,18 +748,12 @@ def _validate_had_panel( f"period(s) in column {time_col!r}." ) if len(periods_list) > 2: - if first_treat_col is None: - raise ValueError( - f"HAD Phase 2a requires exactly two time periods " - f"(got {len(periods_list)} in {time_col!r}) when " - f"first_treat_col=None. Multi-period / staggered adoption " - f"support is queued for Phase 2b (Appendix B.2 event-study)." - ) raise ValueError( - f"HAD Phase 2a requires exactly two time periods " - f"(got {len(periods_list)} in {time_col!r}). Staggered adoption " - f"reduction (first_treat_col supplied with >2 periods) is " - f"queued for Phase 2b (Appendix B.2 event-study)." + f"HAD with aggregate='overall' requires exactly two time " + f"periods (got {len(periods_list)} in {time_col!r}). For " + f"multi-period panels, pass aggregate='event_study' (paper " + f"Appendix B.2 multi-period event-study extension) which " + f"produces per-event-time WAS estimates." ) # Balanced-panel check: every unit appears exactly once per period. @@ -574,6 +886,444 @@ def _validate_had_panel( return t_pre, t_post +def _validate_had_panel_event_study( + data: pd.DataFrame, + outcome_col: str, + dose_col: str, + time_col: str, + unit_col: str, + first_treat_col: Optional[str], +) -> Tuple[Any, List[Any], List[Any], pd.DataFrame, Optional[Dict[str, Any]]]: + """Validate a HAD panel for multi-period event-study mode (Phase 2b). + + Implements paper Appendix B.2 contract: a common treatment date ``F`` + where ``D_{g,t} = 0`` for all units at ``t < F`` and some units have + ``D_{g,t} > 0`` for ``t >= F``. Requires ``len(periods) > 2`` with at + least one pre-period (``t < F``, all D=0) and at least one post-period + (``t >= F``, some D > 0). + + Staggered-timing handling: when ``first_treat_col`` is supplied and + indicates more than one nonzero cohort, the panel is auto-filtered to + the LAST cohort (``F_last = max(cohorts)``) per paper Appendix B.2 + prescription "did_had may be used only for the last treatment cohort + in a staggered design". A ``UserWarning`` is emitted with drop-counts. + + Parameters + ---------- + data, outcome_col, dose_col, time_col, unit_col, first_treat_col + As in :func:`_validate_had_panel`. + + Returns + ------- + F : period label + First-treatment period (the earliest period where any unit has + ``D > 0`` in the filtered data). + t_pre_list : list + Pre-period labels (``t < F``, all D=0), sorted by natural ordering + on the column dtype. + t_post_list : list + Post-period labels (``t >= F``, some D > 0), sorted. + data_filtered : pd.DataFrame + Input with earlier cohorts (``first_treat`` in ``dropped_cohorts``) + dropped if staggered; never-treated units (``first_treat = 0``) + are RETAINED per paper Appendix B.2's "there must be an untreated + group" requirement. Identical to input when no staggered filter + applies. + filter_info : dict or None + Populated on staggered filter with keys ``F_last`` (kept cohort + label), ``n_kept`` (last-cohort units PLUS never-treated units), + ``n_dropped`` (earlier-cohort units removed), ``dropped_cohorts`` + (list of earlier cohort labels). ``None`` otherwise. + + Raises + ------ + ValueError + On missing columns, NaN in key columns, malformed panel, dose- + invariant violations, or no-treatment detected. + """ + required = [outcome_col, dose_col, time_col, unit_col] + if first_treat_col is not None: + required.append(first_treat_col) + missing = [c for c in required if c not in data.columns] + if missing: + raise ValueError(f"Missing column(s) in data: {missing}. Required: {required}.") + + periods_list = list(data[time_col].unique()) + if len(periods_list) < 3: + raise ValueError( + f"HAD with aggregate='event_study' requires more than two " + f"time periods (got {len(periods_list)} in {time_col!r}). " + f"For two-period panels, pass aggregate='overall' (Phase 2a " + f"single-period WAS)." + ) + + # Ordered-time-type check. Paper Appendix B.2 event-time horizons + # require chronological ordering of periods (anchor at F-1, horizons + # e = t - F relative to F). Phase 2a two-period panels can use the + # dose invariant alone to distinguish pre from post without needing + # chronological order, so string labels ("pre", "post") work there. + # For multi-period event-study, multiple pre-periods all have D=0 + # and multiple post-periods may both have D>0, so dose alone cannot + # recover chronology: we must trust the time column's natural order. + # Raw lexicographic sort on object/string labels silently misorders + # panels like "pre1"/"pre2"/"post1"/"post2" or month-name labels. + # Require an explicitly-ordered time representation. + time_dtype = data[time_col].dtype + if not ( + pd.api.types.is_numeric_dtype(time_dtype) + or pd.api.types.is_datetime64_any_dtype(time_dtype) + or (isinstance(time_dtype, pd.CategoricalDtype) and bool(time_dtype.ordered)) + ): + raise ValueError( + f"HAD aggregate='event_study' requires an ordered time " + f"column. time_col={time_col!r} has dtype={time_dtype!r}, " + f"which has no defined chronological order; raw sort would " + f"fall back to lexicographic ordering and silently misindex " + f"event-time horizons (e.g., 'pre1'/'pre2'/'post1'/'post2' " + f"sorts lexicographically but not chronologically). " + f"Convert time_col to numeric (e.g., integer year), " + f"datetime, or ordered categorical " + f"(``pd.Categorical(..., ordered=True, categories=[...])``) " + f"before calling fit() with aggregate='event_study'." + ) + + # Construct the chronological sort key once, shared across every + # downstream ordering: cohort ranking, pre/post period sorting, and + # contiguity checks. Ordered categoricals use their declared + # category index (``list(categorical)`` strips the ordering and + # falls back to string comparison); numeric / datetime use natural + # Python order. Reused by ``_aggregate_multi_period_first_differences`` + # via a parallel construction in that helper (both read the same + # ``time_dtype``). + if isinstance(time_dtype, pd.CategoricalDtype) and time_dtype.ordered: + _cat_order = {c: i for i, c in enumerate(time_dtype.categories)} + + def _sort_key(x: Any) -> Tuple[bool, Any]: + return (x is None, _cat_order.get(x, len(_cat_order))) + + else: + + def _sort_key(x: Any) -> Tuple[bool, Any]: + return (x is None, x) + + # NaN checks on key columns (before any filter). + for col in [outcome_col, dose_col, unit_col]: + if bool(data[col].isna().any()): + n_nan = int(data[col].isna().sum()) + raise ValueError( + f"{n_nan} NaN value(s) found in column {col!r}. HAD " + f"does not silently drop NaN rows; drop or impute before " + f"calling fit()." + ) + + # Cohort detection and staggered-timing auto-filter. + filter_info: Optional[Dict[str, Any]] = None + data_filtered = data + if first_treat_col is not None: + ft_raw = data[first_treat_col] + if bool(ft_raw.isna().any()): + n_nan = int(ft_raw.isna().sum()) + raise ValueError( + f"first_treat_col={first_treat_col!r} contains " + f"{n_nan} NaN value(s) at the row level. Use 0 for " + f"never-treated units and the treatment-start period " + f"for treated units. Drop or impute any NaN rows " + f"before calling fit()." + ) + # Within-unit constancy check. + ft_per_unit_nunique = data.groupby(unit_col)[first_treat_col].nunique(dropna=False) + if (ft_per_unit_nunique > 1).any(): + n_bad = int((ft_per_unit_nunique > 1).sum()) + raise ValueError( + f"first_treat_col={first_treat_col!r} is not constant " + f"within unit for {n_bad} unit(s). Each unit must have " + f"a single first_treat value across all observed periods." + ) + # Cross-validate first_treat_col against observed first-positive- + # dose period for every unit. A mislabeled cohort column would + # otherwise silently select the wrong cohort as F_last and return + # event-study estimates for the wrong units. Contract: + # - declared first_treat == 0: unit must have D == 0 at all t + # (never-treated) + # - declared first_treat == F_g > 0: unit's first period with + # D > 0 must equal F_g + df_for_check = data.sort_values([unit_col, time_col]) + pos_rows = df_for_check.loc[df_for_check[dose_col] > 0] + actual_first_pos = pos_rows.groupby(unit_col)[time_col].first() + declared_ft = df_for_check.groupby(unit_col)[first_treat_col].first() + n_mismatch = 0 + example_mismatch: Optional[Tuple[Any, Any, Any]] = None + for u, declared in declared_ft.items(): + actual = actual_first_pos.get(u, None) + if declared == 0: + if actual is not None: + n_mismatch += 1 + if example_mismatch is None: + example_mismatch = (u, declared, actual) + else: + if actual is None or actual != declared: + n_mismatch += 1 + if example_mismatch is None: + example_mismatch = (u, declared, actual) + if n_mismatch > 0: + u, declared, actual = example_mismatch # type: ignore[misc] + raise ValueError( + f"first_treat_col={first_treat_col!r} disagrees with the " + f"observed dose path for {n_mismatch} unit(s). Example: " + f"unit={u!r} declares first_treat={declared!r} but the " + f"unit's first period with D>0 is {actual!r} " + f"(None means never-treated). A mislabeled cohort column " + f"would silently select the wrong cohort as F_last in the " + f"last-cohort auto-filter. Fix the first_treat_col values " + f"to equal each unit's first positive-dose period (or 0 " + f"for never-treated) before calling fit()." + ) + # Identify cohorts (nonzero first_treat values). Sort using + # ``_sort_key`` (chronological order from ``time_dtype``), NOT + # raw Python sort: first_treat values are period labels and + # must rank chronologically so ``F_last = cohorts[-1]`` is the + # chronologically latest cohort. Under ordered-categorical time + # labels (e.g. month names), raw Python sort is lexicographic + # and would silently pick the wrong ``F_last``. + ft_unique = list(pd.unique(ft_raw)) + cohorts = sorted( + [v for v in ft_unique if v != 0 and not (isinstance(v, float) and np.isnan(v))], + key=_sort_key, + ) + if len(cohorts) == 0: + raise ValueError( + f"first_treat_col={first_treat_col!r} has no nonzero " + f"cohort values (all units appear never-treated). HAD " + f"requires at least one treated cohort with " + f"first_treat > 0 to identify a WAS effect." + ) + if len(cohorts) > 1: + F_last = cohorts[-1] + dropped_cohorts = cohorts[:-1] + # Filter: keep last-cohort AND never-treated (first_treat == 0). + # Paper Appendix B.2: "in designs with variation in treatment + # timing, there must be an untreated group, at least till the + # period where the last cohort gets treated". Never-treated + # units (first_treat=0) satisfy the dose invariant for every + # period (D=0 throughout) and serve as the untreated-group + # comparison at every pre-period horizon. Keeping them matches + # the paper's "there must be an untreated group" language and + # preserves Design 1' identifiability (boundary at 0) when the + # last-cohort doses are uniformly positive. Only earlier-treated + # cohorts (first_treat in dropped_cohorts) are dropped. + keep_mask = (data[first_treat_col] == F_last) | (data[first_treat_col] == 0) + dropped_unit_ids = set(data.loc[~keep_mask, unit_col].unique()) + kept_unit_ids = set(data.loc[keep_mask, unit_col].unique()) + data_filtered = data.loc[keep_mask].copy() + n_dropped = len(dropped_unit_ids - kept_unit_ids) + n_kept = len(kept_unit_ids) + if n_kept == 0: + raise ValueError( + f"Staggered auto-filter to last cohort " + f"(F_last={F_last!r}) left 0 units. Verify " + f"first_treat_col={first_treat_col!r} contains the " + f"expected cohort labels." + ) + filter_info = { + "F_last": F_last, + "n_kept": n_kept, + "n_dropped": n_dropped, + "dropped_cohorts": dropped_cohorts, + } + warnings.warn( + f"Staggered-timing panel detected: {len(cohorts)} distinct " + f"nonzero cohorts in first_treat_col={first_treat_col!r} " + f"({cohorts!r}). Auto-filtering to the last cohort " + f"(F_last={F_last!r}) plus never-treated units " + f"(first_treat=0): {n_kept} units kept, {n_dropped} " + f"earlier-cohort units dropped (from cohorts " + f"{dropped_cohorts!r}). HAD applies only to the last " + f"treatment cohort in staggered designs (paper Appendix " + f"B.2); never-treated units are retained as the untreated-" + f"group comparison per the paper's \"there must be an " + f'untreated group" requirement. For earlier-cohort ' + f"effects, use ChaisemartinDHaultfoeuille " + f"(did_multiplegt_dyn).", + UserWarning, + stacklevel=3, + ) + # After filter, re-read periods_list (cohort filter may have + # dropped some periods if earlier cohorts contributed uniquely). + periods_list = list(data_filtered[time_col].unique()) + if len(periods_list) < 3: + raise ValueError( + f"After staggered auto-filter to last cohort " + f"(F_last={F_last!r}), only {len(periods_list)} " + f"distinct time periods remain in {time_col!r}. " + f"Event-study requires >2 periods; the filtered " + f"panel is too small. Pass aggregate='overall' on " + f"a two-period subset, or supply data with more " + f"pre- or post-periods for the last cohort." + ) + + # Balanced panel on the (possibly-filtered) data: every unit appears + # exactly once per period. ``observed=True`` tells categorical + # groupby to count only OBSERVED unit-period cells. Without it, a + # time_col with an ordered-categorical dtype carrying extra unused + # category levels (beyond the periods actually present in the data) + # would expand to zero-count cells and the balance check would + # falsely reject valid panels. The rest of the validator is keyed + # to ``periods_list`` (observed unique values) so this stays + # consistent. + counts = data_filtered.groupby([unit_col, time_col], observed=True).size() + if (counts != 1).any(): + n_bad = int((counts != 1).sum()) + raise ValueError( + f"Unbalanced panel: {n_bad} unit-period cells have != 1 " + f"observation. HAD requires a balanced panel (each unit " + f"observed exactly once at each period)." + ) + unit_counts = data_filtered.groupby(unit_col)[time_col].nunique() + incomplete = unit_counts[unit_counts != len(periods_list)] + if len(incomplete) > 0: + raise ValueError( + f"Unbalanced panel: {len(incomplete)} unit(s) do not appear " + f"in all {len(periods_list)} periods. HAD requires a balanced " + f"panel (each unit observed at every period)." + ) + + # Dose-invariant period classification on filtered data. + per_period_nonzero: Dict[Any, int] = {} + for p in periods_list: + p_doses = np.asarray( + data_filtered.loc[data_filtered[time_col] == p, dose_col], dtype=np.float64 + ) + per_period_nonzero[p] = int((p_doses != 0).sum()) + t_pre_list_unsorted = [p for p, nz in per_period_nonzero.items() if nz == 0] + t_post_list_unsorted = [p for p, nz in per_period_nonzero.items() if nz > 0] + + if len(t_pre_list_unsorted) == 0: + stats_str = ", ".join(f"{p!r}: {nz} nonzero" for p, nz in per_period_nonzero.items()) + raise ValueError( + f"HAD requires D_{{g,t}} = 0 for all units in at least one " + f"pre-period. No period in column {time_col!r} has all-zero " + f"dose ({stats_str}). The panel has no identifiable baseline." + ) + if len(t_post_list_unsorted) == 0: + raise ValueError( + f"HAD requires at least one period with nonzero dose for " + f"some unit. All periods in column {time_col!r} have all-" + f"zero dose; there is no treatment to estimate." + ) + + # Sort using the same ``_sort_key`` already constructed for cohorts + # (ordered-categorical uses declared category order; numeric / + # datetime use natural Python order). + t_pre_list = sorted(t_pre_list_unsorted, key=_sort_key) + t_post_list = sorted(t_post_list_unsorted, key=_sort_key) + + # Contiguity check: all pre < all post in the natural ordering. + # The HAD dose invariant requires a single transition from all-zero + # to any-nonzero; interleaved pre/post periods indicate a malformed + # panel (e.g., dose going back to zero after treatment, or mixing + # never-treated units with out-of-order labels). Uses ``_sort_key`` + # so ordered categoricals respect their declared category order. + if t_pre_list and t_post_list: + max_pre = t_pre_list[-1] + min_post = t_post_list[0] + contiguous = _sort_key(max_pre) < _sort_key(min_post) + if not contiguous: + raise ValueError( + f"HAD dose invariant violated: pre-periods (all D=0) " + f"and post-periods (some D>0) are not contiguous. " + f"Pre-periods: {t_pre_list!r}; post-periods: " + f"{t_post_list!r}. The dose sequence must transition " + f"from all-zero to nonzero exactly once. For panels " + f"where dose varies non-monotonically (e.g., reversed " + f"treatment, switching), use " + f"ChaisemartinDHaultfoeuille (did_multiplegt_dyn)." + ) + + F = t_post_list[0] # earliest post-period + + # Post-period nonnegative-dose check on the filtered data. + post_mask = data_filtered[time_col].isin(t_post_list) + post_doses = np.asarray(data_filtered.loc[post_mask, dose_col], dtype=np.float64) + neg_post = post_doses < 0 + if neg_post.any(): + n_neg = int(neg_post.sum()) + min_neg = float(post_doses[neg_post].min()) + raise ValueError( + f"HAD requires D_{{g,t}} >= 0 for all units in post-periods " + f"(paper Section 2 dose definition). {n_neg} unit-period " + f"cell(s) have negative dose at t >= F={F!r} " + f"(min={min_neg!r}). Drop these units or verify the dose " + f"column." + ) + + # Staggered-without-``first_treat_col`` detection. When cohort metadata + # is not supplied, the dose-invariant period classification still + # declares t=F=min-post-period based on "any unit has nonzero dose". + # That silently accepts staggered panels where units have DIFFERENT + # first-positive-dose periods: the later-treated cohorts enter + # ``d_arr`` as zero-dose "controls" at the inferred F, violating + # paper Appendix B.2's last-cohort-only contract. Compute per-unit + # first-positive-dose period directly from the dose path and raise + # if multiple cohorts are present, directing users to pass + # ``first_treat_col`` (which activates the last-cohort auto-filter) + # or to use ChaisemartinDHaultfoeuille for full staggered support. + if first_treat_col is None: + df_sorted = data_filtered.sort_values([unit_col, time_col]) + # For each unit, the first period at which dose > 0. + pos_mask_global = df_sorted[dose_col] > 0 + first_pos_per_unit = df_sorted.loc[pos_mask_global].groupby(unit_col)[time_col].first() + cohort_labels = list(first_pos_per_unit.unique()) + if len(cohort_labels) > 1: + # Sort chronologically via the validated time-column order. + distinct_cohorts = sorted(cohort_labels, key=_sort_key) + raise ValueError( + f"Staggered-timing panel detected (first_treat_col is " + f"None): {len(distinct_cohorts)} distinct first-positive-" + f"dose periods {distinct_cohorts!r} across units. HAD's " + f"last-cohort auto-filter (paper Appendix B.2) only runs " + f"when first_treat_col is supplied so the estimator can " + f"identify cohorts. Pass first_treat_col= to " + f"enable the auto-filter to the last cohort, or use " + f"ChaisemartinDHaultfoeuille (did_multiplegt_dyn) for " + f"full staggered support." + ) + + # Constant post-period dose check. Paper Appendix B.2 assumes + # "once treated, stay treated with the same dose"; the event-study + # aggregation uses ``D_{g, F}`` as the single regressor for every + # event-time horizon. Panels where a unit's dose varies across + # post-periods (e.g., phased adoption, dose changes after F) would + # silently misattribute later-horizon effects to the period-F dose. + # Reject front-door with a redirect to ChaisemartinDHaultfoeuille + # for genuinely time-varying post-treatment doses. + if len(t_post_list) > 1: + post_data = data_filtered.loc[post_mask] + dose_spread_per_unit = post_data.groupby(unit_col)[dose_col].agg( + lambda x: float(x.max() - x.min()) + ) + abs_max_dose = float(np.max(np.abs(post_doses))) if post_doses.size else 0.0 + tol = 1e-12 * max(1.0, abs_max_dose) + bad_mask = dose_spread_per_unit > tol + if bool(bad_mask.any()): + n_bad = int(bad_mask.sum()) + max_spread = float(dose_spread_per_unit.max()) + raise ValueError( + f"HAD event-study requires constant dose within unit for " + f"all post-treatment periods t >= F={F!r}. {n_bad} unit(s) " + f"have time-varying doses across post-periods " + f"{t_post_list!r} (max within-unit spread={max_spread!r}, " + f"tolerance={tol!r}). The aggregation uses D_{{g, F}} as " + f"the single regressor for every event-time horizon " + f"(paper Appendix B.2 constant-dose convention), so " + f"silently accepting time-varying post-treatment doses " + f"would misattribute later-horizon effects. For genuinely " + f"time-varying post-treatment doses use " + f"ChaisemartinDHaultfoeuille (did_multiplegt_dyn)." + ) + + return F, t_pre_list, t_post_list, data_filtered, filter_info + + def _aggregate_first_difference( data: pd.DataFrame, outcome_col: str, @@ -649,6 +1399,140 @@ def _aggregate_first_difference( return d_arr, dy_arr, cluster_arr, unit_ids +def _aggregate_multi_period_first_differences( + data: pd.DataFrame, + outcome_col: str, + dose_col: str, + time_col: str, + unit_col: str, + F: Any, + t_pre_list: List[Any], + t_post_list: List[Any], + cluster_col: Optional[str], +) -> Tuple[np.ndarray, Dict[int, np.ndarray], Optional[np.ndarray], np.ndarray, Any]: + """Reduce a multi-period HAD panel to per-horizon first differences. + + For each period ``t`` other than the anchor ``t_anchor = F - 1`` (the + last pre-period), computes the unit-level first difference + ``ΔY_{g,t} = Y_{g,t} - Y_{g, t_anchor}`` and stores it under the event + time ``e = rank(t) - rank(F)`` where ``rank`` is the natural ordering + on the period column (so ``e = 0`` at ``t = F``, ``e = 1`` at the next + post-period, etc., and ``e <= -2`` for pre-period placebos). + + The single dose regressor is ``D_{g, F}`` (the dose at the first + treatment period), reused for every horizon. Paper Appendix B.2 + convention assumes "once treated, stay treated with same dose"; this + helper uses the period-F dose for every horizon, so time-varying + post-period dose is treated as the realized F-period dose. + + Parameters + ---------- + data : pd.DataFrame + Validated multi-period panel (already passed + ``_validate_had_panel_event_study`` and any staggered filter). + outcome_col, dose_col, time_col, unit_col, cluster_col : str + Column names. + F : period label + First-treatment period (from the validator). + t_pre_list, t_post_list : list + Period labels partitioned by the dose invariant, sorted + ascending. + + Returns + ------- + d_arr : np.ndarray, shape (G,) + Post-period dose ``D_{g, F}`` per unit. + dy_dict : dict[int, np.ndarray] + Maps event time ``e`` to first-difference outcome + ``Y_{g, t} - Y_{g, F-1}`` per unit, for every ``t`` except the + anchor. Keys cover all horizons EXCEPT ``e = -1`` (the anchor + gives ``ΔY = 0`` trivially). + cluster_arr : np.ndarray or None, shape (G,) + Cluster IDs per unit when ``cluster_col`` is supplied. + unit_ids : np.ndarray, shape (G,) + Sorted unit identifiers. + t_anchor : period label + The anchor period used (``F - 1`` in the natural period ordering; + equal to the LAST pre-period). + """ + df = data.sort_values([unit_col, time_col]).reset_index(drop=True) + # Period sort respects ordered categorical dtypes (matches + # ``_validate_had_panel_event_study``). The validator already + # enforces a numeric / datetime / ordered-categorical dtype on the + # event-study path, so ``_sort_key`` lookups are well-defined here. + time_dtype = data[time_col].dtype + if isinstance(time_dtype, pd.CategoricalDtype) and time_dtype.ordered: + _cat_order = {c: i for i, c in enumerate(time_dtype.categories)} + + def _sort_key(x: Any) -> Tuple[bool, Any]: + return (x is None, _cat_order.get(x, len(_cat_order))) + + else: + + def _sort_key(x: Any) -> Tuple[bool, Any]: + return (x is None, x) + + all_periods = sorted(t_pre_list + t_post_list, key=_sort_key) + # Event-time mapping: natural rank of each period relative to F. + F_idx = all_periods.index(F) + period_to_event_time: Dict[Any, int] = {p: (i - F_idx) for i, p in enumerate(all_periods)} + # Anchor = F-1 in natural rank (i.e., the last pre-period). By the + # validator's contiguity guarantee, this IS t_pre_list[-1]. + if not t_pre_list: + raise ValueError( + "Internal error: event-study aggregation called with no " + "pre-periods. _validate_had_panel_event_study should have " + "rejected this upstream." + ) + t_anchor = t_pre_list[-1] + + # Pivot to wide: units x periods. + wide_y = df.pivot(index=unit_col, columns=time_col, values=outcome_col) + wide_y = wide_y.sort_index() + unit_ids = np.asarray(wide_y.index) + + # Dose at F (the single regressor for ALL horizons). + d_at_F = ( + df[df[time_col] == F].set_index(unit_col).sort_index()[dose_col].to_numpy(dtype=np.float64) + ) + + dy_dict: Dict[int, np.ndarray] = {} + anchor_y = wide_y[t_anchor].to_numpy(dtype=np.float64) + for p in all_periods: + if p == t_anchor: + continue # anchor gives ΔY = 0 trivially; skipped by contract + e = period_to_event_time[p] + # e = -1 corresponds to t_anchor, which we've already skipped. + # All other periods get a horizon. The F-1 anchor / e=-1 coincidence + # is preserved: event time -1 means "one period before F", which is + # by definition the anchor. + y_t = wide_y[p].to_numpy(dtype=np.float64) + dy_dict[int(e)] = y_t - anchor_y + + cluster_arr: Optional[np.ndarray] = None + if cluster_col is not None: + if cluster_col not in data.columns: + raise ValueError(f"cluster column {cluster_col!r} not found in data.") + cluster_raw = data[cluster_col] + if bool(cluster_raw.isna().any()): + n_nan = int(cluster_raw.isna().sum()) + raise ValueError( + f"cluster column {cluster_col!r} contains {n_nan} NaN " + f"value(s) at the row level. Silent row dropping is " + f"disabled; drop or impute cluster ids before calling fit()." + ) + cluster_per_unit = df.groupby(unit_col)[cluster_col].nunique(dropna=False) + if (cluster_per_unit > 1).any(): + n_bad = int((cluster_per_unit > 1).sum()) + raise ValueError( + f"cluster column {cluster_col!r} is not constant within " + f"unit for {n_bad} unit(s). Cluster must be unit-level." + ) + cluster_arr = df.groupby(unit_col)[cluster_col].first().sort_index().to_numpy() + + return d_at_F, dy_dict, cluster_arr, unit_ids, t_anchor + + # ============================================================================= # Design auto-detection # ============================================================================= @@ -874,9 +1758,16 @@ class HeterogeneousAdoptionDiD: Weighted-Average-Slope (WAS) estimator with three design-dispatch paths: Design 1' (continuous-at-zero), Design 1 continuous-near- d_lower, and Design 1 mass-point (2SLS sample-average per paper - Section 3.2.4). Phase 2a ships the single-period path only; the - multi-period event-study extension (Appendix B.2) is queued for - Phase 2b. + Section 3.2.4). Two aggregation modes: + + - ``aggregate="overall"`` (Phase 2a, default) returns a single-period + :class:`HeterogeneousAdoptionDiDResults` on a two-period panel. + - ``aggregate="event_study"`` (Phase 2b, paper Appendix B.2) returns + a :class:`HeterogeneousAdoptionDiDEventStudyResults` with per- + event-time WAS estimates on a multi-period panel, using a uniform + ``F-1`` anchor and pointwise CIs per horizon. Staggered-timing + panels auto-filter to the last-treatment cohort plus never-treated + units (paper Appendix B.2 prescription). Parameters ---------- @@ -1118,14 +2009,23 @@ def fit( survey: Any = None, weights: Optional[np.ndarray] = None, ) -> HeterogeneousAdoptionDiDResults: - """Fit the HAD estimator on a two-period panel. - - Phase 2a is **panel-only**: the paper (Section 2) defines HAD on - panel or repeated-cross-section data, but this implementation - requires a balanced two-period panel with a unit identifier so - that unit-level first differences ``ΔY_g = Y_{g,2} - Y_{g,1}`` - can be formed. Repeated-cross-section inputs (disjoint unit IDs - between periods) are rejected by the balanced-panel validator. + """Fit the HAD estimator. + + ``aggregate="overall"`` (default) fits on a two-period panel and + returns a :class:`HeterogeneousAdoptionDiDResults` with the + single-period WAS estimate. ``aggregate="event_study"`` fits on + a multi-period panel (``T > 2``) and returns a + :class:`HeterogeneousAdoptionDiDEventStudyResults` with per- + event-time WAS estimates using a uniform ``F-1`` anchor (paper + Appendix B.2). + + Both the overall and event-study paths are **panel-only**: the paper + (Section 2) defines HAD on panel or repeated-cross-section data, + but this implementation requires a balanced panel with a unit + identifier so that unit-level first differences + ``ΔY_{g,t} = Y_{g,t} - Y_{g,t_anchor}`` can be formed. + Repeated-cross-section inputs (disjoint unit IDs between + periods) are rejected by the balanced-panel validator. Repeated-cross-section support is queued for a follow-up PR (tracked in ``TODO.md``); it requires a separate identification path based on pre/post cell means rather than unit-level @@ -1137,11 +2037,26 @@ def fit( outcome_col, dose_col, time_col, unit_col : str Column names. first_treat_col : str or None - Optional first-treatment column for cross-validation. - Required when the panel has more than two periods (raises - for >2 periods in Phase 2a; Phase 2b handles staggered). - aggregate : {"overall"} - ``"event_study"`` raises ``NotImplementedError`` (Phase 2b). + Optional first-treatment column (the period at which each + unit first receives treatment; ``0`` for never-treated). + Required on the event-study path when the panel has more + than two distinct first-treat values (staggered timing): + the estimator auto-filters to the last-treatment cohort + with a ``UserWarning`` per paper Appendix B.2 prescription. + For common-adoption panels the column is optional; when + omitted, the event-study path infers the first-treatment + period ``F`` from the dose invariant. + aggregate : {"overall", "event_study"} + ``"overall"`` (default): returns a single-period + :class:`HeterogeneousAdoptionDiDResults` (Phase 2a). Requires + exactly two time periods. + ``"event_study"`` (Phase 2b): returns a + :class:`HeterogeneousAdoptionDiDEventStudyResults` with per- + event-time WAS estimates on the multi-period panel (paper + Appendix B.2). Requires more than two time periods. Pointwise + CIs per horizon; joint cross-horizon covariance is deferred + to a follow-up PR. Staggered-timing panels are auto-filtered + to the last-treatment cohort with a ``UserWarning``. survey : SurveyDesign or None Reserved for a follow-up survey-integration PR. Must be ``None`` in Phase 2a. @@ -1152,22 +2067,15 @@ def fit( ------- HeterogeneousAdoptionDiDResults """ - # ---- Phase 2a scaffolding rejections (before any work) ---- + # ---- aggregate / survey / weights scaffolding rejections ---- if aggregate not in _VALID_AGGREGATES: raise ValueError( f"Invalid aggregate={aggregate!r}. Must be one of " f"{_VALID_AGGREGATES}." ) - if aggregate == "event_study": - raise NotImplementedError( - "aggregate='event_study' (multi-period event study per " - "paper Appendix B.2) is queued for Phase 2b. Phase 2a " - "supports aggregate='overall' (single-period WAS) only." - ) if survey is not None: raise NotImplementedError( "survey= support on HeterogeneousAdoptionDiD " - "is queued for a follow-up PR after Phase 2a. Pass " - "survey=None for now." + "is queued for a follow-up PR. Pass survey=None for now." ) if weights is not None: raise NotImplementedError( @@ -1175,6 +2083,23 @@ def fit( "queued for a follow-up PR (paired with survey " "integration). Pass weights=None for now." ) + # Dispatch the event-study path to a dedicated method so the + # single-period path stays unchanged (Phase 2a contract preserved). + # Note: event_study returns HeterogeneousAdoptionDiDEventStudyResults + # (distinct type from the overall path's HeterogeneousAdoptionDiDResults); + # the static return-type annotation reflects the common "overall" case + # to keep Phase 2a call-sites type-clean. Users explicitly passing + # aggregate="event_study" should annotate the result as + # HeterogeneousAdoptionDiDEventStudyResults. + if aggregate == "event_study": + return self._fit_event_study( # type: ignore[return-value] + data=data, + outcome_col=outcome_col, + dose_col=dose_col, + time_col=time_col, + unit_col=unit_col, + first_treat_col=first_treat_col, + ) # ---- Resolve effective fit-time state (local vars only, per # feedback_fit_does_not_mutate_config; do not mutate self.*) ---- @@ -1609,3 +2534,272 @@ def _fit_continuous( se = float(bc_fit.se_robust) / abs(den) return att, se, bc_fit, bc_fit.bandwidth_diagnostics + + # ------------------------------------------------------------------ + # Event-study dispatch (Phase 2b, paper Appendix B.2) + # ------------------------------------------------------------------ + + def _fit_event_study( + self, + data: pd.DataFrame, + outcome_col: str, + dose_col: str, + time_col: str, + unit_col: str, + first_treat_col: Optional[str], + ) -> HeterogeneousAdoptionDiDEventStudyResults: + """Multi-period event-study fit (paper Appendix B.2). + + Delegates to the multi-period panel validator (including staggered + last-cohort auto-filter), aggregates per-horizon first differences + against a common anchor ``Y_{g, F-1}``, resolves the design ONCE + on the period-F dose distribution, and then fits the chosen design + path independently on each event-time horizon's first differences. + + Per-horizon sandwich independence is the paper-faithful convention + (Pierce-Schott Figure 2 pointwise CIs). Joint cross-horizon + covariance is deferred to a follow-up PR. + """ + # ---- Resolve effective fit-time state (local vars only, + # feedback_fit_does_not_mutate_config). ---- + design_arg = self.design + d_lower_arg = self.d_lower + vcov_type_arg = self.vcov_type + robust_arg = self.robust + cluster_arg = self.cluster + + # ---- Validate multi-period panel and apply staggered filter ---- + F, t_pre_list, t_post_list, data_filtered, filter_info = _validate_had_panel_event_study( + data, outcome_col, dose_col, time_col, unit_col, first_treat_col + ) + + # ---- Aggregate to per-horizon first differences ---- + # Cluster extraction is deferred until after design resolution. + d_arr, dy_dict, _, _, _ = _aggregate_multi_period_first_differences( + data_filtered, + outcome_col, + dose_col, + time_col, + unit_col, + F, + t_pre_list, + t_post_list, + None, + ) + + n_units = int(d_arr.shape[0]) + if n_units < 3: + raise ValueError( + f"HAD event-study requires at least 3 units for inference; " + f"got n_units={n_units} after aggregation." + ) + + # ---- Resolve design (once, from D_{g, F} distribution) ---- + if design_arg == "auto": + resolved_design = _detect_design(d_arr) + else: + resolved_design = design_arg + + # ---- Resolve d_lower ---- + if resolved_design == "continuous_at_zero": + if d_lower_arg is not None: + scale = max(1.0, float(np.max(np.abs(d_arr)))) + if abs(float(d_lower_arg)) > 1e-12 * scale: + raise ValueError( + f"design='continuous_at_zero' (Design 1') requires " + f"d_lower == 0 within float tolerance (paper Section " + f"3.2 Design 1' regime). Got d_lower=" + f"{float(d_lower_arg)!r}. For d_lower > 0 use " + f"design='continuous_near_d_lower' or " + f"design='mass_point', or design='auto'." + ) + d_lower_val = 0.0 + elif d_lower_arg is None: + d_lower_val = float(d_arr.min()) + else: + d_lower_val = float(d_lower_arg) + + # ---- Regime-partition guards (mirror Phase 2a) ---- + if resolved_design in ("mass_point", "continuous_near_d_lower"): + scale = max(1.0, float(np.max(np.abs(d_arr)))) + if abs(d_lower_val) <= 1e-12 * scale: + raise ValueError( + f"design={resolved_design!r} requires d_lower > 0 (paper " + f"Section 3.2 reserves the d_lower=0 regime for Design 1' " + f"/ `continuous_at_zero`). Got d_lower={d_lower_val!r}." + ) + + if resolved_design in ("continuous_near_d_lower", "mass_point"): + d_min_orig = float(d_arr.min()) + if d_min_orig > 0: + eps_mp = 1e-12 * max(1.0, abs(d_min_orig)) + at_d_min_mask_orig = np.abs(d_arr - d_min_orig) <= eps_mp + modal_fraction_orig = float(np.mean(at_d_min_mask_orig)) + if ( + resolved_design == "continuous_near_d_lower" + and modal_fraction_orig > _MASS_POINT_THRESHOLD + ): + raise ValueError( + f"design='continuous_near_d_lower' cannot be used on a " + f"mass-point sample (modal fraction {modal_fraction_orig:.4f} " + f"at d.min()={d_min_orig!r} exceeds the " + f"{_MASS_POINT_THRESHOLD:.2f} threshold)." + ) + if resolved_design == "mass_point" and modal_fraction_orig <= _MASS_POINT_THRESHOLD: + raise ValueError( + f"design='mass_point' requires a modal mass at d.min() " + f"exceeding the {_MASS_POINT_THRESHOLD:.2f} threshold " + f"(paper Section 3.2.4). Got modal fraction " + f"{modal_fraction_orig:.4f} at d.min()={d_min_orig!r}." + ) + + if resolved_design in ("mass_point", "continuous_near_d_lower") and d_lower_arg is not None: + d_min = float(d_arr.min()) + tol = 1e-12 * max(1.0, abs(d_min)) + if abs(d_lower_val - d_min) > tol: + raise ValueError( + f"design={resolved_design!r} requires d_lower to equal " + f"the support infimum float(d.min())={d_min!r}; got " + f"d_lower={d_lower_val!r}." + ) + d_lower_val = d_min # snap + + dose_mean = float(d_arr.mean()) + + # ---- Extract cluster IDs on mass-point path only ---- + cluster_arr: Optional[np.ndarray] = None + if resolved_design == "mass_point" and cluster_arg is not None: + _, _, cluster_arr, _, _ = _aggregate_multi_period_first_differences( + data_filtered, + outcome_col, + dose_col, + time_col, + unit_col, + F, + t_pre_list, + t_post_list, + cluster_arg, + ) + + # ---- One-time warnings (per fit call, not per horizon) ---- + if resolved_design in ("continuous_near_d_lower", "mass_point"): + warnings.warn( + f"design={resolved_design!r} (Design 1, d_lower > 0) requires " + f"Assumption 6 from de Chaisemartin et al. (2026) for point " + f"identification of WAS_{{d_lower}}, or Assumption 5 for " + f"sign identification only. Neither is testable via " + f"pre-trends.", + UserWarning, + stacklevel=3, + ) + + if resolved_design in ("continuous_at_zero", "continuous_near_d_lower"): + if vcov_type_arg is not None: + warnings.warn( + f"vcov_type={vcov_type_arg!r} is ignored on the " + f"'{resolved_design}' path (continuous designs use the " + f"CCT-2014 robust SE from Phase 1c).", + UserWarning, + stacklevel=3, + ) + if robust_arg: + warnings.warn( + f"robust=True is ignored on the '{resolved_design}' " f"path.", + UserWarning, + stacklevel=3, + ) + if cluster_arg is not None: + warnings.warn( + f"cluster={cluster_arg!r} is ignored on the " + f"'{resolved_design}' path in Phase 2b (estimator-" + f"level cluster threading on the nonparametric path " + f"is queued for a follow-up PR).", + UserWarning, + stacklevel=3, + ) + + # ---- Resolve vcov label for mass-point ---- + if resolved_design == "mass_point": + if vcov_type_arg is None: + vcov_requested = "hc1" if robust_arg else "classical" + else: + vcov_requested = vcov_type_arg.lower() + inference_method = "analytical_2sls" + vcov_label: Optional[str] = "cr1" if cluster_arg is not None else vcov_requested + cluster_label: Optional[str] = cluster_arg if cluster_arg is not None else None + else: + vcov_requested = "" + inference_method = "analytical_nonparametric" + vcov_label = None + cluster_label = None + + # ---- Per-horizon loop ---- + event_times_sorted = sorted(dy_dict.keys()) + n_horizons = len(event_times_sorted) + att_arr = np.full(n_horizons, np.nan, dtype=np.float64) + se_arr = np.full(n_horizons, np.nan, dtype=np.float64) + t_arr = np.full(n_horizons, np.nan, dtype=np.float64) + p_arr = np.full(n_horizons, np.nan, dtype=np.float64) + ci_lo_arr = np.full(n_horizons, np.nan, dtype=np.float64) + ci_hi_arr = np.full(n_horizons, np.nan, dtype=np.float64) + n_obs_arr = np.full(n_horizons, n_units, dtype=np.int64) + + # Collect per-horizon diagnostics on continuous paths. Entries may be + # None for horizons where ``_fit_continuous`` caught a degenerate + # bandwidth-selector failure (constant/perfectly-linear outcome). + bc_fits: Optional[List[Optional[BiasCorrectedFit]]] = ( + [] if resolved_design in ("continuous_at_zero", "continuous_near_d_lower") else None + ) + bw_diags: Optional[List[Optional[BandwidthResult]]] = ( + [] if resolved_design in ("continuous_at_zero", "continuous_near_d_lower") else None + ) + + for i, e in enumerate(event_times_sorted): + dy_e = dy_dict[e] + if resolved_design in ("continuous_at_zero", "continuous_near_d_lower"): + att_e, se_e, bc_fit_e, bw_diag_e = self._fit_continuous( + d_arr, dy_e, resolved_design, d_lower_val + ) + if bc_fits is not None: + bc_fits.append(bc_fit_e) + if bw_diags is not None: + bw_diags.append(bw_diag_e) + elif resolved_design == "mass_point": + att_e, se_e = _fit_mass_point_2sls( + d_arr, dy_e, d_lower_val, cluster_arr, vcov_requested + ) + else: + raise ValueError(f"Internal error: unhandled design={resolved_design!r}.") + + t_stat_e, p_value_e, conf_int_e = safe_inference(att_e, se_e, alpha=float(self.alpha)) + att_arr[i] = float(att_e) + se_arr[i] = float(se_e) + t_arr[i] = float(t_stat_e) + p_arr[i] = float(p_value_e) + ci_lo_arr[i] = float(conf_int_e[0]) + ci_hi_arr[i] = float(conf_int_e[1]) + + return HeterogeneousAdoptionDiDEventStudyResults( + event_times=np.asarray(event_times_sorted, dtype=np.int64), + att=att_arr, + se=se_arr, + t_stat=t_arr, + p_value=p_arr, + conf_int_low=ci_lo_arr, + conf_int_high=ci_hi_arr, + n_obs_per_horizon=n_obs_arr, + alpha=float(self.alpha), + design=resolved_design, + target_parameter=_TARGET_PARAMETER[resolved_design], + d_lower=d_lower_val, + dose_mean=dose_mean, + F=F, + n_units=n_units, + inference_method=inference_method, + vcov_type=vcov_label, + cluster_name=cluster_label, + survey_metadata=None, + bandwidth_diagnostics=bw_diags, + bias_corrected_fit=bc_fits, + filter_info=filter_info, + ) diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index b7aed604..ab40ce3d 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -2248,7 +2248,7 @@ so `δ_0` is recovered by OLS of `ΔY` on `X` and `D_2 * X`; Average Slope is `( - **Extensive-margin effects**: ruled out by Assumption 3. If a jump `Y_2(0) ≠ Y_2(0+)` is suspected, the target parameter and estimator are not appropriate. - **Partial identification of WAS_{d̲}**: only identified up to a positive constant offset `≤ ε` by the bound in Equation 22 (Jensen inequality argument in Appendix C.3). - **Density at boundary**: Assumption 4 requires `f_{D_2}(0) > 0`. This is a non-trivial assumption since 0 is on the boundary of `Supp(D_2)`. -- **Variation in treatment timing**: Appendix B.2 - "in designs with variation in treatment timing, there must be an untreated group, at least till the period where the last cohort gets treated." The implementation errors (hard fail, not warning) on this configuration and redirects users to `ChaisemartinDHaultfoeuille`. +- **Variation in treatment timing**: Appendix B.2 - "in designs with variation in treatment timing, there must be an untreated group, at least till the period where the last cohort gets treated." In Phase 2b (`aggregate="event_study"`) the implementation auto-filters to the last-treatment cohort plus never-treated units with a `UserWarning` when `first_treat_col` is supplied (see Phase 2b last-cohort filter note below); when `first_treat_col` is omitted the estimator detects multiple first-positive-dose cohorts from the dose path and raises a front-door `ValueError` directing users to pass `first_treat_col` or use `ChaisemartinDHaultfoeuille`. - **Mechanical zero at reference period under linear trends (Footnote 13, main text p. 31)**: with industry/unit-specific linear trends, the pre-trends estimator is mechanically zero in the second-to-last pre-period (the slope anchor year). Practical consequence: that year is not an informative placebo check. *Algorithm (Design 1' nonparametric - summarized from Section 3.1.3-3.1.4 and Equations 7-8):* @@ -2322,14 +2322,19 @@ Shipped as `did_had_pretest_workflow()` and surfaced via `practitioner_next_step - [x] Phase 1c: Bias-corrected CI (Equation 8) with `nprobust` parity. Public wrapper `diff_diff.bias_corrected_local_linear` returns `BiasCorrectedFit` with μ̂-scale point estimate, robust SE, and bias-corrected 95% CI `[tau.bc ± z_{1-α/2} * se.rb]`. The β-scale rescaling from Equation 8, `(1/G) Σ D_{g,2}`, is applied by Phase 2's `HeterogeneousAdoptionDiD.fit()`. Parity against `nprobust::lprobust(..., bwselect="mse-dpi")` is asserted at `atol=1e-12` on `tau_cl`/`tau_bc`/`se_cl`/`se_rb`/`ci_low`/`ci_high` across the three unclustered golden DGPs (DGP 1 and DGP 3 typically land closer to `1e-13`). The Python wrapper computes its own `z_{1-α/2}` via `scipy.stats.norm.ppf` inside `safe_inference()`; R's `qnorm` value is stored in the golden JSON for audit, and the parity harness compares Python's CI bounds to R's pre-computed CI bounds so any residual drift is purely the floating-point arithmetic in `tau.bc ± z * se.rb`, not a critical-value disagreement. The clustered DGP achieves bit-parity (`atol=1e-14`) when cluster IDs are in first-appearance order; otherwise BLAS reduction ordering can drift to `atol=1e-10`. Generator: `benchmarks/R/generate_nprobust_lprobust_golden.R`. **Note:** The wrapper matches nprobust's `rho=1` default (`b = h` in auto mode), so Phase 1b's separately-computed `b_mse` is surfaced via `bandwidth_diagnostics.b_mse` but not applied. **Note (public-API surface restriction):** Phase 1c restricts the public wrapper's `vce` parameter to `"nn"`; hc0/hc1/hc2/hc3 raise `NotImplementedError` and are queued for Phase 2+ pending dedicated R parity goldens. The port-level `diff_diff._nprobust_port.lprobust` still accepts all five vce modes (matching R's `nprobust::lprobust` signature) for callers who need the broader surface and accept that the hc-mode variance path — which reuses p-fit hat-matrix leverage for the q-fit residual in R (lprobust.R:229-241) — has not been separately parity-tested. **Note (Phase 1c internal bug workaround):** The clustered golden DGP 4 uses manual `h=b=0.3` to sidestep an nprobust-internal singleton-cluster shape bug in `lprobust.vce` fired by the mse-dpi pilot fits; the Python port has no equivalent bug. - [x] Phase 2a: `HeterogeneousAdoptionDiD` class with separate code paths for Design 1' (`continuous_at_zero`), Design 1 continuous-near-`d̲` (`continuous_near_d_lower`), and Design 1 mass-point. Continuous paths compose Phase 1c's `bias_corrected_local_linear` and form the beta-scale WAS estimate `β̂ = (mean(ΔY) - τ̂_bc) / den` where `τ̂_bc` is the bias-corrected local-linear estimate of the boundary limit `lim_{d↓d̲} E[ΔY | D_2 ≤ d]` and `den = E[D_2]` for Design 1' (paper Theorem 1 / Equation 3 identification; Equation 7 sample estimator) or `den = E[D_2 - d̲]` for Design 1 (paper Theorem 3 / Equation 11, `WAS_{d̲}` under Assumption 6). Mass-point path uses a sample-average 2SLS estimator with instrument `1{D_{g,2} > d̲}` (paper Section 3.2.4). - [x] Phase 2a: `design="auto"` detection rule (`min_g D_{g,2} < 0.01 · median_g D_{g,2}` → continuous_at_zero; modal-min fraction > 2% → mass_point; else continuous_near_lower). Implemented as strict first-match in `diff_diff.had._detect_design`; when `d.min() == 0` exactly, resolves `continuous_at_zero` unconditionally (modal-min check runs only when `d.min() > 0`). Edge case covered: 3% at `D=0` + 97% `Uniform(0.5, 1)` resolves to `continuous_at_zero`, matching the paper-endorsed Design 1' handling of small-share-of-treated samples. -- [x] Phase 2a: Panel validator (`diff_diff.had._validate_had_panel`) verifies `D_{g,1} = 0` for all units, rejects negative post-period doses (`D_{g,2} < 0`) front-door on the original (unshifted) scale, rejects `>2` time periods (staggered reduction queued for Phase 2b), and rejects unbalanced panels and NaN in outcome/dose/unit columns. Both Design 1 paths (`continuous_near_d_lower` and `mass_point`) additionally require `d_lower == float(d.min())` within float tolerance; mismatched overrides raise with a pointer to the unsupported (LATE-like / off-support) estimand. +- [x] Phase 2a: Panel validator (`diff_diff.had._validate_had_panel`) verifies `D_{g,1} = 0` for all units, rejects negative post-period doses (`D_{g,2} < 0`) front-door on the original (unshifted) scale, rejects `>2` time periods on the `aggregate="overall"` path (multi-period panels must use `aggregate="event_study"`, Phase 2b), and rejects unbalanced panels and NaN in outcome/dose/unit columns. Both Design 1 paths (`continuous_near_d_lower` and `mass_point`) additionally require `d_lower == float(d.min())` within float tolerance; mismatched overrides raise with a pointer to the unsupported (LATE-like / off-support) estimand. - [x] Phase 2a: NaN-propagation tests covering constant-y, degenerate-mass-point, and single-cluster-CR1 inputs. The guaranteed NaN coupling is on the DOWNSTREAM triple (`t_stat`, `p_value`, `conf_int`) via the `safe_inference()` gate, which returns NaN on all three whenever `se` is non-finite, zero, or negative. `att` and `se` themselves are raw estimator outputs: on constant-y / no-dose-variation / divide-by-zero the fit paths return `(att=nan, se=nan)` so all five fields move to NaN together; on the degenerate single-cluster CR1 configuration on the mass-point path, `_fit_mass_point_2sls` returns `(att=beta_hat, se=nan)` - `att` is finite (Wald-IV is well defined) while `se` is NaN, so the downstream triple is NaN while `att` remains the raw 2SLS coefficient. The `assert_nan_inference` fixture in `tests/conftest.py` checks the downstream triple against this contract without requiring `att` to be NaN. - **Note (mass-point SE):** Standard errors on the mass-point path use the structural-residual 2SLS sandwich `[Z'X]^{-1} · Ω · [Z'X]^{-T}` with `Ω` built from the structural residuals `u = ΔY - α̂ - β̂·D` (not the reduced-form residuals from an OLS-on-indicator shortcut). Supported: `classical`, `hc1`, and CR1 (cluster-robust) when `cluster=` is supplied. `hc2` and `hc2_bm` raise `NotImplementedError` pending a 2SLS-specific leverage derivation (the OLS leverage `x_i' (X'X)^{-1} x_i` is wrong for 2SLS; the correct finite-sample correction depends on `(Z'X)^{-1}` rather than `(X'X)^{-1}`) plus a dedicated R parity anchor. Queued for the follow-up PR. - **Note (Design 1 identification):** `continuous_near_d_lower` and `mass_point` fits emit a `UserWarning` surfacing that `WAS_{d̲}` identification requires Assumption 6 (or Assumption 5 for sign identification only) beyond parallel trends, and that neither is testable via pre-trends. `continuous_at_zero` (Design 1', Assumption 3 only) does not emit this warning. - **Note (CI endpoints):** Because the continuous-path `att` is `(mean(ΔY) - τ̂_bc) / den`, the beta-scale CI endpoints reverse relative to the Phase 1c boundary-limit CI: `CI_lower(β̂) = (mean(ΔY) - CI_upper(τ̂_bc)) / den` and `CI_upper(β̂) = (mean(ΔY) - CI_lower(τ̂_bc)) / den`. The `HeterogeneousAdoptionDiD.fit()` implementation computes `att ± z · se` directly via `safe_inference`, which handles the reversal naturally from the transformed point estimate. - - **Note (Phase 2a scope):** Ships single-period only. `aggregate="event_study"` (Appendix B.2 multi-period extension) raises `NotImplementedError` pointing to Phase 2b. `survey=` and `weights=` kwargs raise `NotImplementedError` pointing to the follow-up survey-integration PR. - - **Note (Phase 2a panel-only):** The paper (Section 2) defines HAD on *panel or repeated cross-section* data, but Phase 2a ships a panel-only implementation: `HeterogeneousAdoptionDiD.fit()` requires a balanced two-period panel with a unit identifier so that unit-level first differences ΔY_g = Y_{g,2} - Y_{g,1} can be formed. Repeated-cross-section inputs (disjoint unit IDs between periods) are rejected by the existing balanced-panel validator with the "unit(s) do not appear in both periods" error. Repeated-cross-section support is queued for a follow-up PR (tracked in `TODO.md`); it will need a separate identification path based on pre/post cell means rather than unit-level differences. -- [ ] Phase 2b: Multi-period event-study extension (Appendix B.2). Will lift the `aggregate="event_study"` scaffolding in `HeterogeneousAdoptionDiD.fit()` and aggregate per-cohort first-differences into an event-study result. + - **Note (Phase 2a/2b scope):** Phase 2a ships the single-period `aggregate="overall"` path; Phase 2b lifts `aggregate="event_study"` (Appendix B.2 multi-period extension) which returns a `HeterogeneousAdoptionDiDEventStudyResults` with per-event-time WAS estimates and pointwise CIs. `survey=` and `weights=` kwargs raise `NotImplementedError` pointing to the follow-up survey-integration PR. + - **Note (panel-only):** The paper (Section 2) defines HAD on *panel or repeated cross-section* data, but both the overall and event-study paths ship a panel-only implementation: `HeterogeneousAdoptionDiD.fit()` requires a balanced panel with a unit identifier so that unit-level first differences `ΔY_{g,t} = Y_{g,t} - Y_{g,t_anchor}` can be formed. Repeated-cross-section inputs (disjoint unit IDs between periods) are rejected by the balanced-panel validator. RCS support is queued for a follow-up PR (tracked in `TODO.md`); it will need a separate identification path based on pre/post cell means rather than unit-level differences. +- [x] Phase 2b: Multi-period event-study extension (Appendix B.2). `aggregate="event_study"` produces per-event-time WAS estimates using a uniform `F-1` baseline (`ΔY_{g,t} = Y_{g,t} - Y_{g,F-1}` for every horizon), reusing the three Phase 2a design paths on per-horizon first differences. Pre-period placebos included for `e <= -2` (the anchor `e = -1` is skipped since `ΔY = 0` trivially). Post-period estimates for `e >= 0`. The joint Stute test (Equation 18) across pre-periods is a SEPARATE diagnostic deferred to Phase 3 (pre-test diagnostics). + - **Note (Phase 2b last-cohort filter):** When `first_treat_col` indicates more than one nonzero cohort, the panel is auto-filtered to the last-treatment cohort (`F_last = max(cohorts)`) **plus never-treated units** (`first_treat = 0`), with a `UserWarning` naming kept/dropped unit counts and dropped cohort labels. Paper Appendix B.2 is explicit that HAD "may be used only for the LAST treatment cohort in a staggered design"; the auto-filter implements this prescription, retaining never-treated units per the paper's "there must be an untreated group, at least till the period where the last cohort gets treated" requirement. Only earlier-cohort units (with `first_treat > 0` and `< F_last`) are dropped — never-treated units satisfy the dose invariant at every period (`D = 0` throughout) and preserve Design 1' identifiability (boundary at `0`) when last-cohort doses are uniformly positive. When `first_treat_col` is omitted on a >2-period panel, the validator infers each unit's first-positive-dose period from the dose path; if multiple distinct first-positive-dose cohorts are detected, the estimator raises a front-door `ValueError` directing users to pass `first_treat_col` (which activates the auto-filter) or use `ChaisemartinDHaultfoeuille` for full staggered support — there is no silent acceptance of staggered panels without cohort metadata. Common-adoption panels (single first-positive-dose cohort, or only never-treated + one cohort) pass through unchanged with `F` inferred from the dose invariant, and require dose contiguity (pre-periods < post-periods in natural ordering). Non-contiguous dose sequences (e.g., reverse treatment) raise with a pointer to `ChaisemartinDHaultfoeuille`. + - **Note (Phase 2b constant-dose requirement):** The event-study aggregation uses `D_{g, F}` (first-treatment-period dose) as the single regressor for every event-time horizon, per paper Appendix B.2's "once treated, stay treated with the same dose" convention. The validator REJECTS panels where a unit has time-varying dose across post-treatment periods (`D_{g, t} != D_{g, F}` for any `t >= F` within-unit, beyond float tolerance) with a front-door `ValueError`, directing users with genuinely time-varying post-treatment doses to `ChaisemartinDHaultfoeuille` (`did_multiplegt_dyn`). Silent acceptance would misattribute later-horizon treatment-effect heterogeneity to the period-F dose. A follow-up PR could implement a time-varying-dose estimator; tracked in `TODO.md`. + - **Note (Phase 2b per-horizon SE):** Each event-time horizon uses an INDEPENDENT sandwich computed on that horizon's first differences: continuous paths use the CCT-2014 robust SE from Phase 1c divided by `|den|`; mass-point path uses the structural-residual 2SLS sandwich from Phase 2a. This produces pointwise CIs per horizon, matching the paper's Pierce-Schott application (Section 5.2, Figure 2: "nonparametric pointwise CIs"). Joint cross-horizon covariance (IF-based stacking or block bootstrap) is NOT computed — the paper does not derive it and all reported CIs are pointwise. Follow-up PRs may add joint covariance for cross-horizon hypothesis tests; current tracking in `TODO.md`. + - **Note (Phase 2b baseline convention):** All event-time horizons use a uniform `F-1` anchor: `ΔY_{g,t} = Y_{g,t} - Y_{g,F-1}` for every `t`. This is consistent with the paper's Garrett-et-al. application (Section 5.1: "outcome `Y_{g,t} - Y_{g,2001}`" where `F = 2002`), simplifies event-time indexing (`e = t - F` so `e = -1` is the anchor, skipped), and keeps the implementation symmetric for pre- and post-period horizons. The paper review text's asymmetric "`Y_{g,t} - Y_{g,1}` for pre" / "`Y_{g,t} - Y_{g,F-1}` for post" phrasing is covered by the uniform convention since both give the same placebo interpretation under parallel trends (the paper's own applications use the uniform anchor). + - **Note (Phase 2b result class):** `aggregate="event_study"` returns a new `HeterogeneousAdoptionDiDEventStudyResults` dataclass (distinct from the single-period `HeterogeneousAdoptionDiDResults`) with per-horizon arrays (`event_times`, `att`, `se`, `t_stat`, `p_value`, `conf_int_low`, `conf_int_high`, `n_obs_per_horizon`) and shared metadata. `to_dataframe()` returns a tidy per-horizon DataFrame; `to_dict()` returns a dict with list-of-per-horizon fields. The static return-type annotation on `fit()` is `HeterogeneousAdoptionDiDResults` (the common case); callers passing `aggregate="event_study"` should annotate their variable as `HeterogeneousAdoptionDiDEventStudyResults` for type checkers. - [ ] Phase 3: `qug_test()` (`T = D_{2,(1)} / (D_{2,(2)} - D_{2,(1)})`, rejection `{T > 1/α - 1}`). - [ ] Phase 3: `stute_test()` Cramér-von Mises with Mammen wild bootstrap. - [ ] Phase 3: `yatchew_hr_test()` heteroskedasticity-robust linearity test. diff --git a/docs/methodology/papers/dechaisemartin-2026-review.md b/docs/methodology/papers/dechaisemartin-2026-review.md index 8017b244..0dce9765 100644 --- a/docs/methodology/papers/dechaisemartin-2026-review.md +++ b/docs/methodology/papers/dechaisemartin-2026-review.md @@ -189,7 +189,8 @@ Alternative to Stute when `G` is large or heteroskedasticity is suspected. - [ ] Warnings for staggered treatment timing (direct users to existing `ChaisemartinDHaultfoeuille` in diff-diff). - [ ] Warnings for extensive-margin effects / positive mass of untreated (not fatal; suggests running existing DiD). - [ ] Documentation of non-testability of Assumptions 5 and 6. -- [ ] Multi-period event-study extension (Appendix B.2) with joint Stute test across post-periods. +- [x] Multi-period event-study extension (Appendix B.2). **Phase 2b implementation (2026-04):** `aggregate="event_study"` returns per-event-time WAS estimates using uniform `F-1` anchor. Staggered timing auto-filtered to last cohort with `UserWarning` per Appendix B.2 prescription. Pointwise CIs per horizon (no joint cross-horizon covariance; matches paper's Pierce-Schott Figure 2). Pre-period placebos at `e <= -2`; the anchor `e = -1` is skipped since `ΔY = 0` there by construction. +- [ ] Joint Stute test (Equation 18) across pre-periods. Deferred to Phase 3 (pre-test diagnostics). --- diff --git a/tests/test_had.py b/tests/test_had.py index 99d4b5cb..733d473a 100644 --- a/tests/test_had.py +++ b/tests/test_had.py @@ -32,6 +32,7 @@ import inspect import warnings +from typing import Any, cast import numpy as np import pandas as pd @@ -39,11 +40,14 @@ from diff_diff.had import ( HeterogeneousAdoptionDiD, + HeterogeneousAdoptionDiDEventStudyResults, HeterogeneousAdoptionDiDResults, _aggregate_first_difference, + _aggregate_multi_period_first_differences, _detect_design, _fit_mass_point_2sls, _validate_had_panel, + _validate_had_panel_event_study, ) from diff_diff.local_linear import bias_corrected_local_linear from tests.conftest import assert_nan_inference @@ -933,11 +937,13 @@ def test_set_params_rollback_on_invalid_alpha(self): class TestScaffoldingRejections: - def test_aggregate_event_study_raises(self): + def test_aggregate_event_study_on_two_period_panel_raises(self): + """Event-study mode requires T > 2 (Phase 2b). A T=2 panel should + raise a helpful ValueError pointing to ``aggregate='overall'``.""" d, dy = _dgp_continuous_at_zero(200, seed=0) panel = _make_panel(d, dy) est = HeterogeneousAdoptionDiD() - with pytest.raises(NotImplementedError, match="2b"): + with pytest.raises(ValueError, match="more than two"): est.fit( panel, "outcome", @@ -2043,3 +2049,1186 @@ def test_to_dict_shows_effective_family(self): result_dict = r.to_dict() assert result_dict["vcov_type"] == "cr1" assert result_dict["cluster_name"] == "state" + + +# ============================================================================= +# Phase 2b: Multi-period event-study extension (paper Appendix B.2) +# ============================================================================= + + +def _make_multi_period_panel( + d_at_F, + n_periods=5, + F=3, + seed=0, + pre_trend=0.0, + beta=0.3, + sigma=0.1, + first_treat=None, + extra_cols=None, +): + """Build a balanced multi-period HAD panel with common adoption at F. + + ``D_{g,t} = 0`` for ``t < F``; ``D_{g,t} = d_at_F[g]`` for ``t >= F``. + ``Y_{g,t} = alpha_g + pre_trend * t + beta * D_{g,t} * 1{t >= F} + eps``. + + Parameters + ---------- + d_at_F : np.ndarray, shape (G,) + Unit-level treatment dose realized at period F. + n_periods : int + Total number of periods; periods indexed 1..n_periods. + F : int + First treatment period (1 <= F <= n_periods). + seed : int + RNG seed for outcome noise. + pre_trend : float + Deterministic linear trend (identical across units; zero under the + paper's parallel-trends assumption). + beta : float + True treatment-effect coefficient on dose. + sigma : float + Outcome noise SD. + first_treat : np.ndarray or None, shape (G,) + Optional unit-level first-treatment labels (``0`` for + never-treated). If provided, written to a ``first_treat`` + column; used for staggered-timing tests. + extra_cols : dict or None + Additional unit-constant columns. + """ + rng = np.random.default_rng(seed) + G = len(d_at_F) + units = np.arange(G) + periods = np.arange(1, n_periods + 1) + alpha_g = 0.5 * rng.standard_normal(G) + rows = [] + for g in units: + d_g = float(d_at_F[g]) + # Preserve None in first_treat to support NaN-injection tests; cast + # valid ints only. + if first_treat is not None: + ft_raw = first_treat[g] + ft_g: Any = ft_raw if ft_raw is None else int(ft_raw) + else: + ft_g = 0 if d_g == 0 else F + for t in periods: + if first_treat is not None: + if ft_g is None or ft_g == 0 or t < ft_g: + dose = 0.0 + else: + dose = d_g + else: + dose = d_g if t >= F else 0.0 + eps = sigma * rng.standard_normal() + outcome = alpha_g[g] + pre_trend * t + beta * dose + eps + row = {"unit": g, "period": t, "dose": dose, "outcome": outcome} + if first_treat is not None: + row["first_treat"] = ft_g + rows.append(row) + df = pd.DataFrame(rows) + if extra_cols is not None: + for col, vals in extra_cols.items(): + df = df.merge(pd.DataFrame({"unit": units, col: vals}), on="unit", how="left") + return df + + +def _fit_es(est, *args, **kwargs) -> HeterogeneousAdoptionDiDEventStudyResults: + """Fit and return a narrowed event-study result type. + + The public ``fit()`` is annotated to return the single-period + ``HeterogeneousAdoptionDiDResults`` (the common case). When + ``aggregate="event_study"`` is requested, the runtime return is + ``HeterogeneousAdoptionDiDEventStudyResults``; this helper narrows the + type for the test body via ``typing.cast``. + """ + kwargs.setdefault("aggregate", "event_study") + result = est.fit(*args, **kwargs) + return cast(HeterogeneousAdoptionDiDEventStudyResults, result) + + +class TestEventStudySmoke: + """Smoke tests: the three design paths produce finite event-study results.""" + + def test_continuous_at_zero_smoke(self): + rng = np.random.default_rng(0) + G = 400 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + assert isinstance(result, HeterogeneousAdoptionDiDEventStudyResults) + assert result.design == "continuous_at_zero" + assert result.F == 3 + assert result.n_units == G + # Post-period horizons should all produce finite att (noise-level); + # pre-period placebo at e=-2 may hit a degenerate fit (zero-variance + # baseline outcomes) and return NaN, that's acceptable. + post_mask = result.event_times >= 0 + assert np.all(np.isfinite(result.att[post_mask])) + + def test_continuous_near_d_lower_smoke(self): + rng = np.random.default_rng(1) + G = 400 + u = rng.beta(2, 2, G) + d = 0.1 + 0.9 * u + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=2) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + assert result.design == "continuous_near_d_lower" + assert result.target_parameter == "WAS_d_lower" + post_mask = result.event_times >= 0 + assert np.all(np.isfinite(result.att[post_mask])) + + def test_mass_point_smoke(self): + rng = np.random.default_rng(2) + G = 400 + mass_n = int(0.3 * G) + d = np.concatenate([np.full(mass_n, 0.5), rng.uniform(0.5, 1.0, G - mass_n)]) + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=3) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + assert result.design == "mass_point" + assert result.inference_method == "analytical_2sls" + assert result.bandwidth_diagnostics is None + assert result.bias_corrected_fit is None + post_mask = result.event_times >= 0 + assert np.all(np.isfinite(result.att[post_mask])) + assert np.all(np.isfinite(result.se[post_mask])) + + +class TestEventStudyBaselineConvention: + """e = -1 (anchor) must NOT appear in event_times.""" + + def test_anchor_not_in_event_times(self): + rng = np.random.default_rng(0) + G = 300 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + assert -1 not in result.event_times.tolist() + + def test_post_horizons_start_at_zero(self): + rng = np.random.default_rng(0) + G = 300 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + # F=3, n_periods=5 -> periods 1..5, F-1=2 is anchor. + # e = t-F for t in {1,2,3,4,5} -> {-2,-1,0,1,2}; -1 skipped. + assert result.event_times.tolist() == [-2, 0, 1, 2] + + +class TestEventStudyDesignResolution: + """Design / d_lower / target_parameter are SCALARS shared across horizons.""" + + def test_design_is_scalar(self): + rng = np.random.default_rng(0) + G = 300 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + assert isinstance(result.design, str) + assert isinstance(result.d_lower, float) + assert isinstance(result.target_parameter, str) + assert isinstance(result.inference_method, str) + + +class TestEventStudyStaggeredFilter: + """Auto-filter to last cohort + UserWarning per paper Appendix B.2.""" + + def _staggered_panel(self, seed=0): + rng = np.random.default_rng(seed) + G = 300 + # Three cohorts: 0 (never), 3, 5. Last cohort = 5. + ft_draw = rng.integers(0, 3, G) + ft = np.array([0, 3, 5])[ft_draw] + d = np.where(ft == 0, 0.0, rng.uniform(0.1, 1.0, G)) + # d[0] zeroed only if first_treat is 0; otherwise keep realized dose + panel = _make_multi_period_panel(d, n_periods=6, F=5, seed=seed + 1, first_treat=ft) + return panel, ft + + def test_staggered_filter_warning(self): + panel, _ = self._staggered_panel(seed=0) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + filter_warnings = [msg for msg in w if "Staggered" in str(msg.message)] + assert len(filter_warnings) == 1 + + def test_staggered_filter_info_populated(self): + panel, ft = self._staggered_panel(seed=0) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + assert result.filter_info is not None + assert result.filter_info["F_last"] == 5 + # n_kept = last-cohort units + never-treated units (both retained). + n_kept_expected = int(((ft == 5) | (ft == 0)).sum()) + assert result.filter_info["n_kept"] == n_kept_expected + # n_dropped = earlier cohorts only (never-treated are kept). + n_dropped_expected = int((ft == 3).sum()) + assert result.filter_info["n_dropped"] == n_dropped_expected + assert 3 in result.filter_info["dropped_cohorts"] + # Never-treated cohort (0) is NOT in dropped_cohorts. + assert 0 not in result.filter_info["dropped_cohorts"] + + def test_staggered_filter_keeps_last_cohort_and_never_treated(self): + panel, ft = self._staggered_panel(seed=0) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + # Paper Appendix B.2: staggered HAD applies to last cohort + keeps + # never-treated as the "untreated group" comparison. Earlier cohorts + # (first_treat=3) are dropped; never-treated (first_treat=0) AND + # last-cohort (first_treat=5) are retained. + n_kept_expected = int(((ft == 5) | (ft == 0)).sum()) + assert result.n_units == n_kept_expected + assert result.F == 5 + + def test_staggered_filter_retains_never_treated_units(self): + """Explicit sample-composition test: after staggered filter, kept + units are the union of last-cohort and never-treated. + + This pins the paper Appendix B.2 contract: "there must be an + untreated group, at least till the period where the last cohort + gets treated". Earlier cohorts are dropped; never-treated are NOT. + """ + panel, ft = self._staggered_panel(seed=1) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + # The fit ran successfully with never-treated retained. Verify + # directly: the validator returns data_filtered with expected + # composition. + F, t_pre, t_post, data_filtered, filter_info = _validate_had_panel_event_study( + panel, "outcome", "dose", "period", "unit", "first_treat" + ) + kept_ft_values = set(data_filtered["first_treat"].unique().tolist()) + # Should contain exactly {0, F_last=5}; NOT earlier cohort 3. + assert kept_ft_values == {0, 5} + + def test_no_filter_on_single_cohort(self): + """Panel with one nonzero cohort (plus never-treated): no filter.""" + rng = np.random.default_rng(0) + G = 200 + ft = rng.choice([0, 3], size=G, p=[0.5, 0.5]) + d = np.where(ft == 0, 0.0, rng.uniform(0.1, 1.0, G)) + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1, first_treat=ft) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + result = HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + filter_warnings = [msg for msg in w if "Staggered" in str(msg.message)] + assert len(filter_warnings) == 0 + assert result.filter_info is None + + +class TestEventStudyPerHorizonSEIndependence: + """Each horizon's SE matches Phase 2a SE on the two-period subset. + + Proves the per-horizon independence contract: the event-study path + computes per-event-time estimates identically to what Phase 2a would + produce on a (F-1, t) two-period subset. + """ + + def test_mass_point_per_horizon_matches_phase_2a(self): + rng = np.random.default_rng(0) + G = 300 + mass_n = int(0.3 * G) + d = np.concatenate([np.full(mass_n, 0.5), rng.uniform(0.5, 1.0, G - mass_n)]) + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + # Event-study fit. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + es_result = HeterogeneousAdoptionDiD(design="mass_point").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + assert isinstance(es_result, HeterogeneousAdoptionDiDEventStudyResults) + # Phase 2a fit on each post-period (F-1, t) two-period subset. + # Skip pre-period horizons since Phase 2a would reject the pre-pre + # subset (both periods all-zero dose). + F = 3 + for i, e in enumerate(es_result.event_times): + if int(e) < 0: + continue # pre-period comparisons not applicable + t_target = F + int(e) + subset = panel[panel["period"].isin([F - 1, t_target])].copy() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + overall_result = HeterogeneousAdoptionDiD(design="mass_point").fit( + subset, "outcome", "dose", "period", "unit" + ) + np.testing.assert_allclose(es_result.att[i], overall_result.att, atol=1e-12, rtol=1e-12) + np.testing.assert_allclose(es_result.se[i], overall_result.se, atol=1e-12, rtol=1e-12) + + def test_continuous_at_zero_per_horizon_matches_phase_2a(self): + rng = np.random.default_rng(1) + G = 300 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=2) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + es_result = HeterogeneousAdoptionDiD(design="continuous_at_zero").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + assert isinstance(es_result, HeterogeneousAdoptionDiDEventStudyResults) + # Skip pre-period horizons since Phase 2a would reject the pre-pre + # subset (both periods all-zero dose). + F = 3 + for i, e in enumerate(es_result.event_times): + if int(e) < 0: + continue + t_target = F + int(e) + subset = panel[panel["period"].isin([F - 1, t_target])].copy() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + overall_result = HeterogeneousAdoptionDiD(design="continuous_at_zero").fit( + subset, "outcome", "dose", "period", "unit" + ) + # Match if both finite; if both NaN (degenerate bandwidth + # selector on this horizon), skip assertion. + if np.isfinite(es_result.att[i]) and np.isfinite(overall_result.att): + np.testing.assert_allclose( + es_result.att[i], overall_result.att, atol=1e-12, rtol=1e-12 + ) + np.testing.assert_allclose( + es_result.se[i], overall_result.se, atol=1e-12, rtol=1e-12 + ) + else: + assert np.isnan(es_result.att[i]) + assert np.isnan(overall_result.att) + + +class TestEventStudyAggregateMatrix: + """2x2 period/aggregate matrix: reciprocal rejections.""" + + def test_T2_event_study_raises(self): + d = np.linspace(0.0, 1.0, 100) + dy = 0.3 * d + 0.01 * np.random.default_rng(0).standard_normal(100) + panel = _make_panel(d, dy) + with pytest.raises(ValueError, match="more than two"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_T_gt_2_overall_raises(self): + rng = np.random.default_rng(0) + G = 100 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + with pytest.raises(ValueError, match="aggregate='event_study'"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="overall" + ) + + def test_invalid_aggregate_raises(self): + d = np.linspace(0.0, 1.0, 100) + dy = 0.3 * d + panel = _make_panel(d, dy) + with pytest.raises(ValueError, match="Invalid aggregate"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="garbage" + ) + + +class TestEventStudyPlacebos: + """Pre-period placebos: near 0 under no pre-trend; detectable under pre-trend.""" + + def test_no_pre_trend_placebos_near_zero(self): + rng = np.random.default_rng(0) + G = 500 + d = rng.uniform(0.1, 1.0, G) # mass-point-free + d[0] = 0.0 # Design 1' + panel = _make_multi_period_panel( + d, n_periods=6, F=4, seed=1, pre_trend=0.0, beta=0.3, sigma=0.05 + ) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + pre_mask = result.event_times <= -2 + # Placebo estimates should be near 0 (noise band). + pre_atts = result.att[pre_mask] + pre_atts_finite = pre_atts[np.isfinite(pre_atts)] + if len(pre_atts_finite) > 0: + # Generous band — the DGP has unit FEs that wash out in first-diff + # but the placebo samples shrink noise-level; tolerance 0.3. + assert np.all(np.abs(pre_atts_finite) < 0.3) + + +class TestEventStudyResultMethods: + """to_dataframe / to_dict / summary produce well-shaped outputs.""" + + def _fit(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + return HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_to_dataframe_shape(self): + result = self._fit() + df = result.to_dataframe() + assert len(df) == len(result.event_times) + assert set(df.columns) == { + "event_time", + "att", + "se", + "t_stat", + "p_value", + "conf_int_low", + "conf_int_high", + "n_obs", + } + + def test_to_dict_shape(self): + result = self._fit() + d = result.to_dict() + assert "event_times" in d + assert "att" in d + assert len(d["event_times"]) == len(result.event_times) + assert len(d["att"]) == len(result.att) + assert d["design"] == result.design + assert d["F"] == result.F + + def test_to_dict_json_serializable(self): + """``to_dict()`` output must be JSON-serializable via ``json.dumps``. + + Covers CI reviewer round 5 P2: previously the per-horizon arrays + contained numpy scalars that tripped ``json.dumps``. + """ + import json + + result = self._fit() + d = result.to_dict() + # Should not raise. + payload = json.dumps(d) + assert isinstance(payload, str) + # Round-trip: values should parse back as native Python types. + parsed = json.loads(payload) + assert isinstance(parsed["event_times"], list) + assert isinstance(parsed["event_times"][0], int) + assert isinstance(parsed["att"][0], float) + assert isinstance(parsed["alpha"], float) + assert isinstance(parsed["n_units"], int) + + def test_summary_renders(self): + result = self._fit() + summary = result.summary() + assert "HeterogeneousAdoptionDiD Event-Study Results" in summary + assert result.design in summary + + def test_repr(self): + result = self._fit() + rep = repr(result) + assert "HeterogeneousAdoptionDiDEventStudyResults" in rep + assert f"n_horizons={len(result.event_times)}" in rep + + +class TestEventStudyPanelContract: + """Panel-contract guards for event-study mode.""" + + def test_rcs_rejected(self): + """Repeated-cross-section inputs (disjoint unit ids) rejected.""" + rng = np.random.default_rng(0) + n_periods = 4 + rows = [] + for t in range(1, n_periods + 1): + for u in range(50): + unit_id = u + t * 1000 # disjoint IDs per period + dose = 0.0 if t < 3 else rng.uniform(0.1, 1.0) + rows.append( + { + "unit": unit_id, + "period": t, + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="Unbalanced panel"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_non_contiguous_dose_rejected(self): + """Pre/post periods interleaved (dose reversal) raises.""" + G = 100 + rows = [] + rng = np.random.default_rng(0) + d_post = rng.uniform(0.1, 1.0, G) + for g in range(G): + # Weird panel: t=1 all-zero, t=2 treated, t=3 all-zero (reverse!) + for t, dose in [(1, 0.0), (2, d_post[g]), (3, 0.0)]: + rows.append( + { + "unit": g, + "period": t, + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="not contiguous"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_nan_in_outcome_rejected(self): + rng = np.random.default_rng(0) + G = 100 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + panel.loc[0, "outcome"] = np.nan + with pytest.raises(ValueError, match="NaN"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_nan_in_first_treat_col_rejected(self): + rng = np.random.default_rng(0) + G = 100 + d = rng.uniform(0.0, 1.0, G) + ft = np.where(d > 0, 3, 0).astype(object) + ft[5] = None # type: ignore[call-overload] + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1, first_treat=ft) + with pytest.raises(ValueError, match="NaN"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + + def test_no_pre_period_rejected(self): + """All periods nonzero dose -> no pre-period to anchor on.""" + rng = np.random.default_rng(0) + G = 100 + rows = [] + d_g = rng.uniform(0.1, 1.0, G) + for g in range(G): + for t in range(1, 5): + rows.append( + { + "unit": g, + "period": t, + "dose": d_g[g], # dose always nonzero! + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="all-zero dose|pre-period"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_time_varying_post_F_dose_rejected(self): + """Within-unit dose variation across post-periods raises. + + Paper Appendix B.2 assumes "once treated, stay treated with the + same dose"; the aggregation uses ``D_{g, F}`` as the single + regressor for every horizon. Silent acceptance of time-varying + post-treatment doses would misattribute later-horizon effects. + Covers CI reviewer round 1 P0: `_aggregate_multi_period_first_differences` + would otherwise use period-F dose for all horizons. + """ + rng = np.random.default_rng(0) + G = 50 + rows = [] + for g in range(G): + d_F = float(rng.uniform(0.1, 0.5)) + d_F_plus_1 = d_F + 0.3 # time-varying: dose changes after F + for t in range(1, 6): + if t < 3: + dose = 0.0 + elif t == 3: + dose = d_F + else: + dose = d_F_plus_1 # different from d_F + rows.append( + { + "unit": g, + "period": t, + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="constant dose|time-varying"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_staggered_ordered_categorical_chooses_chronological_last(self): + """Staggered filter uses chronological (not lexicographic) last. + + Constructs an ordered-categorical time column where lexicographic + and chronological orderings disagree. With category order + ``["q1", "q2", "q3", "q10"]``, chronological last is ``"q10"`` + but lexicographic last is ``"q3"``. If cohorts are ``{"q2", "q10"}``, + a raw-sort implementation would pick ``F_last = "q2"`` (lex-max + of the two strings); the fixed version must pick ``F_last = "q10"``. + + Covers CI reviewer round 3 P0: cohort sorting must use + chronological order from ``time_dtype``, not raw Python sort. + """ + rng = np.random.default_rng(0) + G = 80 + periods = ["q1", "q2", "q3", "q10"] + cat_dtype = pd.CategoricalDtype(categories=periods, ordered=True) + # Half of units treated at q2 (cohort 1), half at q10 (cohort 2). + rows = [] + for g in range(G): + F_g = "q2" if g < G // 2 else "q10" + d_g = float(rng.uniform(0.1, 1.0)) + for p in periods: + # Dose = d_g once the period >= F_g in chronological order. + chrono_g = periods.index(F_g) + chrono_p = periods.index(p) + dose = d_g if chrono_p >= chrono_g else 0.0 + rows.append( + { + "unit": g, + "period": p, + "dose": dose, + "outcome": rng.standard_normal(), + "first_treat": F_g, + } + ) + panel = pd.DataFrame(rows) + panel["period"] = panel["period"].astype(cat_dtype) + panel["first_treat"] = panel["first_treat"].astype(cat_dtype) + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + + # Chronological last cohort = "q10", not lexicographic last ("q3" + # is not even a cohort here; lex last of the two cohorts would + # be "q2" since "q10" < "q2" lexicographically). + assert result.filter_info is not None + assert result.filter_info["F_last"] == "q10" + assert result.F == "q10" + # q2-cohort units (G/2) are dropped; q10-cohort units (G/2) + # retained. + assert result.n_units == G // 2 + # Dropped cohorts should list "q2". + assert "q2" in result.filter_info["dropped_cohorts"] + + def test_first_treat_col_mismatch_with_dose_raises(self): + """first_treat_col disagreeing with observed dose path must raise. + + A mislabeled cohort column would otherwise silently select the + wrong cohort as F_last in the last-cohort auto-filter and + produce event-study estimates for the wrong units. Covers CI + reviewer round 2 P1. + """ + rng = np.random.default_rng(0) + G = 40 + rows = [] + for g in range(G): + # Actual first-positive-dose period: t=3 for half, t=5 for half. + F_actual = 3 if g < G // 2 else 5 + # But deliberately mislabel: swap the first_treat labels so + # G/2 units declare 5 when actual is 3, and vice versa. + F_declared = 5 if g < G // 2 else 3 + d_g = float(rng.uniform(0.1, 1.0)) + for t in range(1, 7): + dose = d_g if t >= F_actual else 0.0 + rows.append( + { + "unit": g, + "period": t, + "dose": dose, + "outcome": rng.standard_normal(), + "first_treat": F_declared, + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="disagrees with the observed dose"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, + "outcome", + "dose", + "period", + "unit", + first_treat_col="first_treat", + aggregate="event_study", + ) + + def test_unordered_string_time_col_rejected(self): + """Object/string time columns raise on event-study path. + + Raw sort on arbitrary string labels is lexicographic, not + chronological (e.g., 'pre1'/'pre2'/'post1'/'post2' would map + to wrong event-time horizons). Covers CI reviewer round 2 P1. + """ + rng = np.random.default_rng(0) + G = 50 + rows = [] + d_post = rng.uniform(0.0, 1.0, G) + d_post[0] = 0.0 + for g in range(G): + for label, dose in [ + ("pre1", 0.0), + ("pre2", 0.0), + ("post1", d_post[g]), + ("post2", d_post[g]), + ]: + rows.append( + { + "unit": g, + "period": label, # object dtype + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="ordered time column|dtype"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + def test_ordered_categorical_with_unused_levels_accepted(self): + """Ordered categorical with extra unused category levels fits. + + Covers CI reviewer round 4 P1: the balanced-panel check must + use ``observed=True`` on categorical groupby so unused category + levels don't expand to zero-count cells and falsely trip the + balance guard. + """ + rng = np.random.default_rng(0) + G = 40 + # Observed periods: pre1, pre2, post1, post2 + # Declared categories: ALSO include pre0 (unused) and post3 (unused) + all_categories = ["pre0", "pre1", "pre2", "post1", "post2", "post3"] + observed = ["pre1", "pre2", "post1", "post2"] + cat_dtype = pd.CategoricalDtype(categories=all_categories, ordered=True) + rows = [] + d_post = rng.uniform(0.1, 1.0, G) + d_post[0] = 0.0 + for g in range(G): + for label in observed: + dose = d_post[g] if label in ("post1", "post2") else 0.0 + rows.append( + { + "unit": g, + "period": label, + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + panel["period"] = panel["period"].astype(cat_dtype) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + # F should be post1 (first observed post-period); event_times + # should be [-2, 0, 1] (e=-1 for anchor pre2 is skipped). + assert result.F == "post1" + assert result.event_times.tolist() == [-2, 0, 1] + assert result.n_units == G + + def test_ordered_categorical_time_col_accepted(self): + """Ordered categorical time dtype passes the ordered-time check.""" + rng = np.random.default_rng(0) + G = 50 + labels = ["pre1", "pre2", "post1", "post2"] + cat_dtype = pd.CategoricalDtype(categories=labels, ordered=True) + rows = [] + d_post = rng.uniform(0.1, 1.0, G) + d_post[0] = 0.0 + for g in range(G): + for label, dose in [ + ("pre1", 0.0), + ("pre2", 0.0), + ("post1", d_post[g]), + ("post2", d_post[g]), + ]: + rows.append( + { + "unit": g, + "period": label, + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + panel["period"] = panel["period"].astype(cat_dtype) + # Should fit without raising the ordered-time error. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + # post1 is F; e=-2 (pre1) and e=0 (post1), e=1 (post2) expected. + assert result.F == "post1" + + def test_staggered_without_first_treat_col_rejected(self): + """Multi-cohort panel without first_treat_col raises (not silent). + + Without cohort metadata, the dose-invariant period classification + would silently treat later-cohort units as zero-dose "controls" + at the inferred F, violating Appendix B.2's last-cohort-only + contract. Covers CI reviewer round 1 P1. + """ + rng = np.random.default_rng(0) + G = 100 + rows = [] + for g in range(G): + # Assign cohort: half treat at t=3, half at t=5. + F_g = 3 if g < G // 2 else 5 + d_g = float(rng.uniform(0.1, 1.0)) + for t in range(1, 7): + dose = d_g if t >= F_g else 0.0 + rows.append( + { + "unit": g, + "period": t, + "dose": dose, + "outcome": rng.standard_normal(), + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="Staggered-timing|first_treat_col"): + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + + +class TestEventStudyGuardsPreserved: + """Phase 2a policy guards fire on the event-study path too.""" + + def test_continuous_at_zero_nonzero_d_lower_raises(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="continuous_at_zero", d_lower=0.3) + with pytest.raises(ValueError, match="d_lower == 0"): + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + + def test_mass_point_d_lower_zero_raises(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.5, 1.0, G) + # Use mass-point design explicitly with d_lower=0 (invalid regime) + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="mass_point", d_lower=0.0) + with pytest.raises(ValueError, match="d_lower > 0"): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + + def test_continuous_near_rejects_mass_point_sample(self): + rng = np.random.default_rng(0) + G = 200 + mass_n = int(0.3 * G) + d = np.concatenate([np.full(mass_n, 0.5), rng.uniform(0.5, 1.0, G - mass_n)]) + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="continuous_near_d_lower") + with pytest.raises(ValueError, match="mass-point sample"): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + + def test_mass_point_rejects_continuous_sample(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.1, 1.0, G) # no mass point + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="mass_point") + with pytest.raises(ValueError, match="modal mass"): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + + +class TestEventStudyNaNPropagation: + """NaN contract: degenerate fits produce NaN triple via safe_inference.""" + + def test_constant_y_nan_inference(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + # Overwrite outcome with a constant: ΔY = 0 everywhere → degenerate + panel["outcome"] = 1.0 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + result = cast( + HeterogeneousAdoptionDiDEventStudyResults, + HeterogeneousAdoptionDiD(design="auto").fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ), + ) + # All per-horizon inference triples should be NaN when fit is degenerate. + assert np.all(np.isnan(result.t_stat)) + assert np.all(np.isnan(result.p_value)) + assert np.all(np.isnan(result.conf_int_low)) + assert np.all(np.isnan(result.conf_int_high)) + + +class TestEventStudySklearnCompat: + """sklearn contract on the event-study path: clone round-trip, idempotence.""" + + def test_fit_does_not_mutate_design(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="auto") + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + assert est.design == "auto" # raw preserved + + def test_fit_is_idempotent(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="auto") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + r1 = est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + r2 = est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + np.testing.assert_allclose(r1.att, r2.att, atol=1e-14, rtol=0.0) + np.testing.assert_allclose(r1.se, r2.se, atol=1e-14, rtol=0.0) + + def test_sklearn_clone_round_trip(self): + sklearn_base = pytest.importorskip("sklearn.base") + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + est = HeterogeneousAdoptionDiD(design="auto", alpha=0.1) + cloned = sklearn_base.clone(est) + assert cloned.design == "auto" + assert cloned.alpha == 0.1 + with warnings.catch_warnings(): + warnings.simplefilter("ignore", UserWarning) + r_orig = est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + r_clone = cloned.fit( + panel, "outcome", "dose", "period", "unit", aggregate="event_study" + ) + np.testing.assert_allclose(r_orig.att, r_clone.att, atol=1e-14, rtol=0.0) + + +class TestEventStudyWarnings: + """Continuous-path warnings on event-study mode (vcov/robust/cluster ignored).""" + + def _panel(self): + rng = np.random.default_rng(0) + G = 200 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + return _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + + def test_vcov_type_ignored_on_continuous(self): + panel = self._panel() + est = HeterogeneousAdoptionDiD(design="auto", vcov_type="classical") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + vcov_warnings = [ + msg for msg in w if "vcov_type" in str(msg.message) and "ignored" in str(msg.message) + ] + assert len(vcov_warnings) == 1 # ONE per fit, not per horizon + + def test_cluster_ignored_on_continuous(self): + panel = self._panel() + panel["state"] = panel["unit"] % 20 + est = HeterogeneousAdoptionDiD(design="auto", cluster="state") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + est.fit(panel, "outcome", "dose", "period", "unit", aggregate="event_study") + cluster_warnings = [ + msg for msg in w if "cluster=" in str(msg.message) and "ignored" in str(msg.message) + ] + assert len(cluster_warnings) == 1 + + +class TestEventStudyValidator: + """Direct tests for ``_validate_had_panel_event_study``.""" + + def test_too_few_periods_raises(self): + d = np.array([0.0, 0.5, 0.8]) + dy = np.array([0.1, 0.2, 0.3]) + panel = _make_panel(d, dy, periods=(1, 2)) + with pytest.raises(ValueError, match="more than two"): + _validate_had_panel_event_study(panel, "outcome", "dose", "period", "unit", None) + + def test_infers_F_from_dose_invariant(self): + rng = np.random.default_rng(0) + G = 100 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + F, t_pre, t_post, data, filter_info = _validate_had_panel_event_study( + panel, "outcome", "dose", "period", "unit", None + ) + assert F == 3 + assert t_pre == [1, 2] + assert t_post == [3, 4, 5] + assert filter_info is None + + def test_empty_cohorts_raises(self): + """All first_treat values are 0 (never-treated).""" + G = 50 + rng = np.random.default_rng(0) + rows = [] + for g in range(G): + for t in range(1, 5): + rows.append( + { + "unit": g, + "period": t, + "dose": 0.0, + "outcome": rng.standard_normal(), + "first_treat": 0, + } + ) + panel = pd.DataFrame(rows) + with pytest.raises(ValueError, match="no nonzero cohort|all-zero dose"): + _validate_had_panel_event_study( + panel, "outcome", "dose", "period", "unit", "first_treat" + ) + + +class TestEventStudyAggregator: + """Direct tests for ``_aggregate_multi_period_first_differences``.""" + + def test_anchor_not_in_dy_dict(self): + rng = np.random.default_rng(0) + G = 50 + d = rng.uniform(0.0, 1.0, G) + d[0] = 0.0 + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + F, t_pre, t_post, data, _ = _validate_had_panel_event_study( + panel, "outcome", "dose", "period", "unit", None + ) + d_arr, dy_dict, _, _, t_anchor = _aggregate_multi_period_first_differences( + data, "outcome", "dose", "period", "unit", F, t_pre, t_post, None + ) + assert t_anchor == 2 # F - 1 + assert -1 not in dy_dict + # Horizons: e in {-2, 0, 1, 2} + assert set(dy_dict.keys()) == {-2, 0, 1, 2} + + def test_dose_regressor_uses_period_F(self): + rng = np.random.default_rng(0) + G = 30 + d = rng.uniform(0.1, 1.0, G) + panel = _make_multi_period_panel(d, n_periods=5, F=3, seed=1) + F, t_pre, t_post, data, _ = _validate_had_panel_event_study( + panel, "outcome", "dose", "period", "unit", None + ) + d_arr, _, _, unit_ids, _ = _aggregate_multi_period_first_differences( + data, "outcome", "dose", "period", "unit", F, t_pre, t_post, None + ) + # d_arr should be the period-F dose, unit-aligned + expected = panel[panel["period"] == F].sort_values("unit")["dose"].to_numpy() + np.testing.assert_allclose(d_arr, expected, atol=0.0, rtol=0.0)