From ca2acbf05fb163de71f1cdab4cb9cf0c76c28ad3 Mon Sep 17 00:00:00 2001 From: GongJr0 Date: Sun, 5 Oct 2025 01:13:00 +0300 Subject: [PATCH 1/2] Refactor lag significance functions and config handling Renamed internal bootstrapped significance functions for clarity, added config-driven wrappers, and improved result DataFrame construction. This refactor centralizes configuration resolution and streamlines selection logic for both statsmodels and numba engines. --- CandleNet/autoreg/lag_utils.py | 297 +++++++++++++++++-------------- CandleNet/cache/synergy_cache.py | 2 +- 2 files changed, 164 insertions(+), 135 deletions(-) diff --git a/CandleNet/autoreg/lag_utils.py b/CandleNet/autoreg/lag_utils.py index 62b8e7e..ff286a9 100644 --- a/CandleNet/autoreg/lag_utils.py +++ b/CandleNet/autoreg/lag_utils.py @@ -314,7 +314,86 @@ def _wilson_interval( return lo, hi -def bootstrapped_significance( +def _auto_block_len(n: int) -> int: + """ + Select an automatic block length for block bootstrap based on the series length. + + Parameters: + n (int): Number of observations in the time series. + + Returns: + block_len (int): Recommended block length, equal to the greater of 5 and the floor of the cube root of `n`. + """ + return max(5, int(n ** (1 / 3))) + + +def _resolve_lag_cfg(params: LagConfig, n: int) -> dict: + # max_lag tested + """ + Resolve a LagConfig mapping into concrete numeric parameters used for lag testing and bootstrapping. + + This converts potentially symbolic or "auto" entries in `params` into integer values appropriate for + a series of length `n`, applying sensible bounds and heuristics where needed. + + Parameters: + params (LagConfig): Configuration mapping containing keys: + - "maxLag": maximum lag to consider (may be numeric or "auto"-like value). + - "hacBandwidth": Newey–West/HAC bandwidth or "auto". + - "blockLen": circular block bootstrap block length or "auto". + - "bootstrapSamples": number of bootstrap replicates or "auto". + - "maxLagsSelected": cap on number of selected lags or "auto". + - "minBootstrapSamples", "minLagsSelected": minimums used when resolving "auto". + n (int): Length of the time series; used to clamp and derive data-dependent defaults. + + Returns: + dict: A mapping with integer-valued keys: + - "max_lag": selected max lag (clamped to at least 1 and at most n-2). + - "bandwidth": resolved HAC bandwidth as an int. + - "block_len": resolved block length for CBB as an int. + - "B": number of bootstrap replicates as an int. + - "max_selected": maximum number of lags to retain as an int (>= 0). + """ + max_lag = int(params["maxLag"]) + max_lag = max(1, min(max_lag, n - 2)) + + # bandwidth + bw = params["hacBandwidth"] + if isinstance(bw, str) and bw == "auto": + bw = _auto_nw_bandwidth(n) + + # block length + bl = params["blockLen"] + if isinstance(bl, str) and bl == "auto": + bl = _auto_block_len(n) + + # bootstrap samples + B = params["bootstrapSamples"] + if isinstance(B, str) and B == "auto": + # heuristic: proportional to tested lags, capped + B = max(params["minBootstrapSamples"], min(300, 20 * max_lag)) + + # max lags selected + msel_cfg = params["maxLagsSelected"] + if isinstance(msel_cfg, str) and msel_cfg == "auto": + msel = max(params["minLagsSelected"], min(5, max_lag)) + else: + msel = msel_cfg + + assert _is_int_like(msel) and msel >= 0, ( + f"Unsupported maxLagsSelected: {msel_cfg}. " + f"Must be a non-negative integer or 'auto'." + ) + + return { + "max_lag": max_lag, + "bandwidth": int(bw), + "block_len": int(bl), + "B": int(B), + "max_selected": int(msel), + } + + +def _bootstrapped_significance( y: pd.Series, max_lag: int = 20, B: int = 200, @@ -478,11 +557,11 @@ def bootstrapped_significance( out["selected"] = out["stable"] & (out["p_base"] < alpha) return out.sort_values( - ["selected", "freq", "top_freq"], ascending=[False, False, False] + by=["selected", "p_base", "freq"], ascending=[False, True, False] ) -def fast_bootstrapped_significance( +def _fast_bootstrapped_significance( y: np.ndarray | pd.Series, *, max_lag: int, @@ -499,30 +578,31 @@ def fast_bootstrapped_significance( rng: np.random.Generator | None = None, ) -> pd.DataFrame: """ - Drop-in replacement for bootstrapped_significance that uses the fast kernel. + Replacement for bootstrapped_significance that uses the fast kernel. Returns a DataFrame indexed by lag with columns: ['p_base','reject_base_fdr','freq','stable','trials','top_freq','n','beta','t'] """ y = np.asarray(y, dtype=np.float64).ravel() n = y.size if n < 10: - # empty result with expected schema idx = pd.Index(range(1, max_lag + 1), name="lag") return pd.DataFrame( + { + "freq": np.nan, + "top_freq": np.nan, + "p_base": np.nan, + "t_base": np.nan, + "decided": False, + "trials": 0.0, + "p_fdr": np.nan, + "reject_base_fdr": False, + "stable": False, + "selected": False, + "n": np.nan, + "beta": np.nan, + }, index=idx, - columns=[ - "p_base", - "reject_base_fdr", - "freq", - "stable", - "trials", - "top_freq", - "n", - "beta", - "t", - ], - dtype=float, - ).fillna(np.nan) + ) L = int(bandwidth) @@ -530,8 +610,10 @@ def fast_bootstrapped_significance( beta0, t0, n0 = _ols_hac_beta_t_vectorized(y, max_lag, L) p0 = 2.0 * norm.sf(np.abs(t0)) reject_fdr = np.zeros_like(p0, dtype=bool) + p_fdr = np.full_like(p0, np.nan, dtype=np.float64) + if use_fdr_end: - reject_fdr, _, _, _ = multipletests(p0, alpha=alpha, method="fdr_bh") + reject_fdr, p_fdr, _, _ = multipletests(p0, alpha=alpha, method="fdr_bh") # --- bootstrap (counts only) Xb, _ = cbb_sample(y, B=B, block_len=block_len, rng=rng) # shape (B, n) @@ -586,101 +668,85 @@ def fast_bootstrapped_significance( # --- Build the DF once (shape identical to the slow path) idx = pd.Index(range(1, max_lag + 1), name="lag") + selected = stable_flag & (reject_fdr if use_fdr_end else (p0 < alpha)) out = pd.DataFrame( { + "freq": freq, + "top_freq": top_f, "p_base": p0, + "t_base": t0, + "decided": decided, + "trials": trials, + "p_fdr": p_fdr, "reject_base_fdr": reject_fdr, - "freq": freq, "stable": stable_flag, - "trials": trials, - "top_freq": top_f, + "selected": selected, "n": n0.astype(np.float64), "beta": beta0, - "t": t0, }, index=idx, ) - return out - - -def _auto_block_len(n: int) -> int: - """ - Select an automatic block length for block bootstrap based on the series length. - - Parameters: - n (int): Number of observations in the time series. - - Returns: - block_len (int): Recommended block length, equal to the greater of 5 and the floor of the cube root of `n`. - """ - return max(5, int(n ** (1 / 3))) - - -def _resolve_lag_cfg(params: LagConfig, n: int) -> dict: - # max_lag tested - """ - Resolve a LagConfig mapping into concrete numeric parameters used for lag testing and bootstrapping. - - This converts potentially symbolic or "auto" entries in `params` into integer values appropriate for - a series of length `n`, applying sensible bounds and heuristics where needed. - Parameters: - params (LagConfig): Configuration mapping containing keys: - - "maxLag": maximum lag to consider (may be numeric or "auto"-like value). - - "hacBandwidth": Newey–West/HAC bandwidth or "auto". - - "blockLen": circular block bootstrap block length or "auto". - - "bootstrapSamples": number of bootstrap replicates or "auto". - - "maxLagsSelected": cap on number of selected lags or "auto". - - "minBootstrapSamples", "minLagsSelected": minimums used when resolving "auto". - n (int): Length of the time series; used to clamp and derive data-dependent defaults. + return out.sort_values( + by=["selected", "p_base", "freq"], ascending=[False, True, False] + ) - Returns: - dict: A mapping with integer-valued keys: - - "max_lag": selected max lag (clamped to at least 1 and at most n-2). - - "bandwidth": resolved HAC bandwidth as an int. - - "block_len": resolved block length for CBB as an int. - - "B": number of bootstrap replicates as an int. - - "max_selected": maximum number of lags to retain as an int (>= 0). - """ - max_lag = int(params["maxLag"]) - max_lag = max(1, min(max_lag, n - 2)) - # bandwidth - bw = params["hacBandwidth"] - if isinstance(bw, str) and bw == "auto": - bw = _auto_nw_bandwidth(n) +def bootstrapped_from_config(y: pd.Series) -> pd.DataFrame: + params = lag_config() + n = len(y) + rand_seed = params.get("randomSeed") + rng = np.random.default_rng(rand_seed) - # block length - bl = params["blockLen"] - if isinstance(bl, str) and bl == "auto": - bl = _auto_block_len(n) + r = _resolve_lag_cfg(params, n) + return _bootstrapped_significance( + y, + max_lag=r["max_lag"], + B=r["B"], + block_len=r["block_len"], + bandwidth=( + params["hacBandwidth"] + if params["hacBandwidth"] != "auto" + else r["bandwidth"] + ), + alpha=params["sigLevel"], + use_fdr_end=(params["selectionMethod"] == "fdrAdjusted"), + min_freq=params["stabilityFreq"] if params["requireStability"] else 0.0, + early_stop=params["earlyStop"], + b_min=params["minBootstrapSamples"], + check_every=params["stabilityCheckEvery"], + conf=params["stabilityConfidence"], + rng=rng, + ) - # bootstrap samples - B = params["bootstrapSamples"] - if isinstance(B, str) and B == "auto": - # heuristic: proportional to tested lags, capped - B = max(params["minBootstrapSamples"], min(300, 20 * max_lag)) - # max lags selected - msel_cfg = params["maxLagsSelected"] - if isinstance(msel_cfg, str) and msel_cfg == "auto": - msel = max(params["minLagsSelected"], min(5, max_lag)) - else: - msel = msel_cfg +def fast_bootstrapped_from_config(y: pd.Series) -> pd.DataFrame: + params = lag_config() + n = len(y) + rand_seed = params.get("randomSeed") + rng = np.random.default_rng(rand_seed) - assert _is_int_like(msel) and msel >= 0, ( - f"Unsupported maxLagsSelected: {msel_cfg}. " - f"Must be a non-negative integer or 'auto'." + r = _resolve_lag_cfg(params, n) + return _fast_bootstrapped_significance( + y, + max_lag=r["max_lag"], + B=r["B"], + block_len=r["block_len"], + bandwidth=( + params["hacBandwidth"] + if params["hacBandwidth"] != "auto" + else r["bandwidth"] + ), + alpha=params["sigLevel"], + use_fdr_end=(params["selectionMethod"] == "fdrAdjusted"), + min_freq=params["stabilityFreq"] if params["requireStability"] else 0.0, + early_stop=params["earlyStop"], + b_min=params["minBootstrapSamples"], + check_every=params["stabilityCheckEvery"], + conf=params["stabilityConfidence"], + rng=rng, ) - return { - "max_lag": max_lag, - "bandwidth": int(bw), - "block_len": int(bl), - "B": int(B), - "max_selected": int(msel), - } - def select_lags( y: pd.Series, @@ -710,46 +776,9 @@ def select_lags( r = _resolve_lag_cfg(params, n) # run the test if engine == "statsmodels": - res = bootstrapped_significance( - y, - max_lag=r["max_lag"], - B=r["B"], - block_len=r["block_len"], - bandwidth=( - params["hacBandwidth"] - if params["hacBandwidth"] != "auto" - else r["bandwidth"] - ), - alpha=params["sigLevel"], - use_fdr_end=(params["selectionMethod"] == "fdrAdjusted"), - min_freq=params["stabilityFreq"] if params["requireStability"] else 0.0, - early_stop=params["earlyStop"], - b_min=params["minBootstrapSamples"], - check_every=params["stabilityCheckEvery"], - conf=params["stabilityConfidence"], - rng=rng, - ) - + res = bootstrapped_from_config(y) elif engine == "numba": - res = fast_bootstrapped_significance( - y, - max_lag=r["max_lag"], - B=r["B"], - block_len=r["block_len"], - bandwidth=( - params["hacBandwidth"] - if params["hacBandwidth"] != "auto" - else r["bandwidth"] - ), - alpha=params["sigLevel"], - use_fdr_end=(params["selectionMethod"] == "fdrAdjusted"), - min_freq=params["stabilityFreq"] if params["requireStability"] else 0.0, - early_stop=params["earlyStop"], - b_min=params["minBootstrapSamples"], - check_every=params["stabilityCheckEvery"], - conf=params["stabilityConfidence"], - rng=rng, - ) + res = fast_bootstrapped_from_config(y) else: raise ValueError(f"Unsupported engine: {engine}") diff --git a/CandleNet/cache/synergy_cache.py b/CandleNet/cache/synergy_cache.py index 6026cb9..47df6d3 100644 --- a/CandleNet/cache/synergy_cache.py +++ b/CandleNet/cache/synergy_cache.py @@ -176,7 +176,7 @@ def fetch( def delete(self, sectors: str) -> None: con = self.check_con() - query = f"""DELETE FROM {self.TABLE_NAME} WHERE sectors = ?;""" + query = f"""DELETE FROM {self.TABLE_NAME} WHERE sectors_id = ?;""" con.execute(query, (sectors,)) self._log( LogType.EVENT, From 474f172c898ff0bdab466d1ffc2be7a6d51a3b73 Mon Sep 17 00:00:00 2001 From: GongJr0 Date: Sun, 5 Oct 2025 01:22:01 +0300 Subject: [PATCH 2/2] Refactor block length and cache deletion functions Replaces _auto_block_len with _infer_block_len in lag_utils.py and updates parameter naming in CorrCache.delete for clarity. These changes improve code readability and maintain consistency in function usage and logging. --- CandleNet/autoreg/lag_utils.py | 15 +-------------- CandleNet/cache/synergy_cache.py | 6 +++--- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/CandleNet/autoreg/lag_utils.py b/CandleNet/autoreg/lag_utils.py index ff286a9..1709ecd 100644 --- a/CandleNet/autoreg/lag_utils.py +++ b/CandleNet/autoreg/lag_utils.py @@ -314,19 +314,6 @@ def _wilson_interval( return lo, hi -def _auto_block_len(n: int) -> int: - """ - Select an automatic block length for block bootstrap based on the series length. - - Parameters: - n (int): Number of observations in the time series. - - Returns: - block_len (int): Recommended block length, equal to the greater of 5 and the floor of the cube root of `n`. - """ - return max(5, int(n ** (1 / 3))) - - def _resolve_lag_cfg(params: LagConfig, n: int) -> dict: # max_lag tested """ @@ -364,7 +351,7 @@ def _resolve_lag_cfg(params: LagConfig, n: int) -> dict: # block length bl = params["blockLen"] if isinstance(bl, str) and bl == "auto": - bl = _auto_block_len(n) + bl = _infer_block_len(n) # bootstrap samples B = params["bootstrapSamples"] diff --git a/CandleNet/cache/synergy_cache.py b/CandleNet/cache/synergy_cache.py index 47df6d3..23da5d2 100644 --- a/CandleNet/cache/synergy_cache.py +++ b/CandleNet/cache/synergy_cache.py @@ -173,16 +173,16 @@ def fetch( ) return Codec.dec_arrow(data) - def delete(self, sectors: str) -> None: + def delete(self, sectors_id: str) -> None: con = self.check_con() query = f"""DELETE FROM {self.TABLE_NAME} WHERE sectors_id = ?;""" - con.execute(query, (sectors,)) + con.execute(query, (sectors_id,)) self._log( LogType.EVENT, OriginType.USER, CallerType.CACHE, - f"Deleted cache entry for sectors: {sectors}.", + f"Deleted cache entry for sectors: {sectors_id}.", ) def clear(self) -> None: