Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ffx/core/build_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def __init__(self, approach):
# eps -- Length of the path. eps=1e-3 means that alpha_min / alpha_max
# = 1e-3.
self._eps = 1e-70

# Optimization strategy: 'pathwise' (original) or 'cv' (improved)
self._optimization_strategy = 'cv'

# will use all if 'nonlin1', else []
self.all_nonlin_ops = [OP_ABS, OP_LOG10]
Expand Down Expand Up @@ -63,3 +66,7 @@ def l1_ratio(self):
@property
def num_alphas(self):
return self._num_alphas

@property
def optimization_strategy(self):
return self._optimization_strategy
163 changes: 157 additions & 6 deletions ffx/core/model_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from .models import ConstantModel, FFXModel
from .utils import ElasticNetWithTimeout, nmse, nondominated_indices_2d, y_is_poor
from sklearn.linear_model import ElasticNetCV, LassoCV


class MultiFFXModelFactory:
Expand Down Expand Up @@ -444,13 +445,23 @@ def _pathwiseLearn(
verbose=False,
**fit_params,
):
"""Adapted from enet_path() in sklearn.linear_model.
http://scikit-learn.sourceforge.net/modules/linear_model.html
Compute Elastic-Net path with coordinate descent.
Returns list of model (or None if failure)."""
"""Adaptive pathwise learning with CV or traditional approach.

Uses cross-validation based optimization when ss.optimization_strategy=='cv'
for better convergence and fewer iterations, or falls back to traditional
pathwise approach when ss.optimization_strategy=='pathwise'.
"""
if verbose:
print(" Pathwise learn: begin. max_num_bases=%d" % max_num_bases)
max_iter = 5000 # default 5000. magic number.
print(" Pathwise learn: begin. max_num_bases=%d, strategy=%s" %
(max_num_bases, ss.optimization_strategy))

# Use improved CV-based optimization if enabled
if ss.optimization_strategy == 'cv':
return self._cvBasedLearn(
ss, varnames, bases, X_orig_regress, y_orig,
max_num_bases, target_nmse, verbose, **fit_params
)
max_iter = 10000 # increased from 5000 to help convergence

# Condition X and y:
# -"unbias" = rescale so that (mean=0, stddev=1) -- subtract each row's
Expand Down Expand Up @@ -496,6 +507,7 @@ def _pathwiseLearn(
l1_ratio=ss.l1_ratio,
fit_intercept=False,
max_iter=max_iter,
tol=1e-2,
**fit_params,
)
try:
Expand Down Expand Up @@ -579,6 +591,145 @@ def _pathwiseLearn(
print(" Pathwise learn: done")
return models

def _cvBasedLearn(
self,
ss,
varnames,
bases,
X_orig_regress,
y_orig,
max_num_bases,
target_nmse,
verbose=False,
**fit_params,
):
"""Improved optimization using cross-validation for better convergence.

This method replaces the traditional pathwise approach with:
1. Cross-validation for automatic alpha selection
2. Adaptive tolerance based on data characteristics
3. Fewer iterations with better convergence
4. Less aggressive L1 ratio for better stability
"""
if verbose:
print(" CV-based learn: begin. max_num_bases=%d" % max_num_bases)

# Condition X and y (same as original method)
(X_unbiased, y_unbiased, X_avgs, X_stds, y_avg, y_std) = self._unbiasedXy(
X_orig_regress, y_orig
)
X_unbiased = numpy.asfortranarray(X_unbiased)

# Compute adaptive tolerance based on data characteristics
data_scale = numpy.std(y_unbiased)
n_samples, n_features = X_unbiased.shape
condition_est = min(1e6, n_features * 10) # Simple condition number estimate
adaptive_tol = max(1e-6, min(1e-2, 1e-4 * data_scale * numpy.sqrt(condition_est / 100)))

if verbose:
print(f" CV-based learn: adaptive tolerance = {adaptive_tol:.2e}")

# Use less aggressive L1 ratio for better convergence
adjusted_l1_ratio = min(0.7, ss.l1_ratio)
if verbose and adjusted_l1_ratio != ss.l1_ratio:
print(f" CV-based learn: adjusted l1_ratio from {ss.l1_ratio} to {adjusted_l1_ratio}")

# Prepare fit params
if "precompute" not in fit_params or fit_params["precompute"] is True:
fit_params["precompute"] = numpy.dot(X_unbiased.T, X_unbiased)

# Create alpha range - fewer, smarter alphas
n_samples = X_unbiased.shape[0]
vals = numpy.dot(X_unbiased.T, y_unbiased)
vals = [val for val in vals if not numpy.isnan(val)]
if vals:
alpha_max = numpy.abs(max(vals) / (n_samples * adjusted_l1_ratio))
else:
alpha_max = 1.0

# Use logarithmic spacing with focus on transition region
alphas = numpy.logspace(numpy.log10(alpha_max * 1e-4), numpy.log10(alpha_max), 50)

start_time = time.time()
try:
# Choose CV method based on L1 ratio
if adjusted_l1_ratio >= 0.99:
# Use LassoCV for pure L1 case
cv_model = LassoCV(
alphas=alphas,
cv=5,
tol=adaptive_tol,
max_iter=10000,
fit_intercept=False
)
else:
# Use ElasticNetCV for mixed case
cv_model = ElasticNetCV(
alphas=alphas,
l1_ratio=adjusted_l1_ratio,
cv=5,
tol=adaptive_tol,
max_iter=10000,
fit_intercept=False
)

if verbose:
print(f" CV-based learn: fitting with {len(alphas)} alphas")

cv_model.fit(X_unbiased, y_unbiased)

# Get coefficients and create model
cur_unbiased_coefs = cv_model.coef_.copy()
if cur_unbiased_coefs.shape == tuple():
cur_unbiased_coefs = cur_unbiased_coefs.reshape((1,))

# Check if we meet stopping criteria
num_bases = len(numpy.nonzero(cur_unbiased_coefs)[0])
nmse_unbiased = nmse(
numpy.dot(cur_unbiased_coefs, X_unbiased.T),
y_unbiased,
min(y_unbiased),
max(y_unbiased),
)

if verbose:
print(f" CV-based learn: best alpha = {cv_model.alpha_:.6e}")
print(f" CV-based learn: num_bases = {num_bases}, nmse = {nmse_unbiased:.6f}")
print(f" CV-based learn: time = {time.time() - start_time:.2f}s")

# Early stopping checks
if numpy.isinf(nmse_unbiased):
if verbose:
print(" CV-based learn: Early stop because nmse is inf")
return None

if nmse_unbiased < target_nmse:
if verbose:
print(" CV-based learn: Target NMSE achieved")

if num_bases > max_num_bases:
if verbose:
print(f" CV-based learn: Warning: {num_bases} bases > max {max_num_bases}")

# Convert to FFX model format
coefs = self._rebiasCoefs(
[0.0] + list(cur_unbiased_coefs), X_stds, X_avgs, y_std, y_avg
)
(coefs_n, bases_n, coefs_d, bases_d) = self._allocateToNumerDenom(
ss, bases, coefs
)
model = FFXModel(coefs_n, bases_n, coefs_d, bases_d, varnames)

if verbose:
print(" CV-based learn: done")

return [model]

except Exception as e:
if verbose:
print(f" CV-based learn failed: {e}")
return None

def _allocateToNumerDenom(self, ss, bases, coefs):
"""Prune out nonzero coefficients/bases. Allocate to numerator vs. denominator."""
if ss.include_denominator():
Expand Down
Loading