From ba0648a1baf131f978bebc520ce706053cc43aa0 Mon Sep 17 00:00:00 2001 From: John Halloran Date: Sat, 26 Apr 2025 19:06:31 -0400 Subject: [PATCH 1/2] Cleaned up looping behavior and aligned some params with scikit-learn. --- src/diffpy/snmf/main.py | 1 + src/diffpy/snmf/snmf_class.py | 346 ++++++++++++++++------------------ 2 files changed, 167 insertions(+), 180 deletions(-) diff --git a/src/diffpy/snmf/main.py b/src/diffpy/snmf/main.py index fe2fff2d..7afacf6b 100644 --- a/src/diffpy/snmf/main.py +++ b/src/diffpy/snmf/main.py @@ -21,6 +21,7 @@ print("Initial Guess (Y0):\n", df_Y0, "\n") """ + my_model = snmf_class.SNMFOptimizer(MM=MM, Y0=Y0, X0=X0, A=A0, components=2) print("Done") # print(f"My final guess for X: {my_model.X}") diff --git a/src/diffpy/snmf/snmf_class.py b/src/diffpy/snmf/snmf_class.py index f49d1e3b..84831132 100644 --- a/src/diffpy/snmf/snmf_class.py +++ b/src/diffpy/snmf/snmf_class.py @@ -1,11 +1,10 @@ -import cvxpy as cp import numpy as np from scipy.optimize import minimize -from scipy.sparse import block_diag, coo_matrix, diags +from scipy.sparse import coo_matrix, csc_matrix, diags class SNMFOptimizer: - def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, components=None): + def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, max_iter=500, tol=5e-7, components=None): print("Initializing SNMF Optimizer") self.MM = MM self.X0 = X0 @@ -13,7 +12,6 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, self.A = A self.rho = rho self.eta = eta - self.maxiter = maxiter # Capture matrix dimensions self.N, self.M = MM.shape self.num_updates = 0 @@ -23,7 +21,7 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, raise ValueError("Must provide either Y0 or a number of components.") else: self.K = components - self.Y0 = np.random.beta(a=2.5, b=1.5, size=(self.K, self.M)) + self.Y0 = np.random.beta(a=2.5, b=1.5, size=(self.K, self.M)) # This is untested else: self.K = Y0.shape[0] @@ -31,21 +29,15 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, if self.A is None: self.A = np.ones((self.K, self.M)) + np.random.randn(self.K, self.M) * 1e-3 # Small perturbation if self.X0 is None: - self.X0 = np.random.rand(self.N, self.K) # Ensures values in [0,1], no need for clipping + self.X0 = np.random.rand(self.N, self.K) # Ensures values in [0,1] # Initialize solution matrices to be iterated on self.X = np.maximum(0, self.X0) self.Y = np.maximum(0, self.Y0) # Second-order spline: Tridiagonal (-2 on diagonal, 1 on sub/superdiagonals) - # TODO re-add the option to have a first-order spline self.P = 0.25 * diags([1, -2, 1], offsets=[0, 1, 2], shape=(self.M - 2, self.M)) self.PP = self.P.T @ self.P - PPPP = block_diag([self.PP] * self.K, format="csr") - # Generate interleaved index sequence - seq = np.arange(self.M * self.K).reshape(self.K, self.M).T.ravel() - # Reorder rows and columns of PPPP (blocks interleaved instead of stacked) - self.PPPP = PPPP[seq, :][:, seq] # Set up residual matrix, objective function, and history self.R = self.get_residual_matrix() @@ -54,7 +46,7 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, self.objective_history = [self.objective_function] # Set up tracking variables for updateX() - self.preX = self.X.copy() # Previously stored X (like X0 for now) + self.preX = None self.GraX = np.zeros_like(self.X) # Gradient of X (zeros for now) self.preGraX = np.zeros_like(self.X) # Previous gradient of X (zeros for now) @@ -66,9 +58,8 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, ) # Main optimization loop - for outiter in range(self.maxiter): - self.outiter = outiter - self.outer_loop() + for iter in range(max_iter): + self.optimize_loop() # Print diagnostics regularization_term = 0.5 * rho * np.linalg.norm(self.P @ self.A.T, "fro") ** 2 sparsity_term = eta * np.sum(np.sqrt(self.X)) # Square root penalty @@ -76,17 +67,15 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, f"Num_updates: {self.num_updates}, " f"Obj fun: {self.objective_function:.5e}, " f"Obj - reg/sparse: {self.objective_function - regularization_term - sparsity_term:.5e}, " - f"Iter: {self.outiter}" + f"Iter: {iter}" ) # Convergence check: Stop if diffun is small and at least 20 iterations have passed - # MATLAB uses 1e-6 but also gets faster convergence, so this makes up that difference - print(self.objective_difference, " < ", self.objective_function * 5e-7) - if self.objective_difference < self.objective_function * 5e-7 and outiter >= 20: + print(self.objective_difference, " < ", self.objective_function * tol) + if self.objective_difference < self.objective_function * tol and iter >= 20: break # Normalize our results - # TODO make this much cleaner Y_row_max = np.max(self.Y, axis=1, keepdims=True) self.Y = self.Y / Y_row_max A_row_max = np.max(self.A, axis=1, keepdims=True) @@ -94,58 +83,44 @@ def __init__(self, MM, Y0=None, X0=None, A=None, rho=1e12, eta=610, maxiter=300, # loop to normalize X # effectively just re-running class with non-normalized X, normalized Y/A as inputs, then only update X # reset difference trackers and initialize - self.preX = self.X.copy() # Previously stored X (like X0 for now) + self.preX = None self.GraX = np.zeros_like(self.X) # Gradient of X (zeros for now) self.preGraX = np.zeros_like(self.X) # Previous gradient of X (zeros for now) self.R = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.objective_difference = None self.objective_history = [self.objective_function] - self.outiter = 0 - self.iter = 0 - for outiter in range(100): - if iter == 1: - self.iter = 1 # So step size can adapt without an inner loop + for norm_iter in range(100): self.updateX() self.R = self.get_residual_matrix() self.objective_function = self.get_objective_function() print(f"Objective function after normX: {self.objective_function:.5e}") self.objective_history.append(self.objective_function) self.objective_difference = self.objective_history[-2] - self.objective_history[-1] - if self.objective_difference < self.objective_function * 5e-7 and outiter >= 20: + if self.objective_difference < self.objective_function * tol and norm_iter >= 20: break # end of normalization (and program) - # note that objective function does not fully recover after normalization - # it is still higher than pre-normalization, but that is okay and matches MATLAB + # note that objective function may not fully recover after normalization, this is okay print("Finished optimization.") - def outer_loop(self): - # This inner loop runs up to four times per outer loop, making updates to X, Y - for iter in range(4): - self.iter = iter - self.preGraX = self.GraX.copy() - self.updateX() - self.num_updates += 1 - self.R = self.get_residual_matrix() - self.objective_function = self.get_objective_function() - print(f"Objective function after updateX: {self.objective_function:.5e}") - self.objective_history.append(self.objective_function) - if self.outiter == 0 and self.iter == 0: - self.objective_difference = self.objective_history[-1] - self.objective_function - - # Now we update Y - self.updateY2() - self.num_updates += 1 - self.R = self.get_residual_matrix() - self.objective_function = self.get_objective_function() - print(f"Objective function after updateY2: {self.objective_function:.5e}") - self.objective_history.append(self.objective_function) + def optimize_loop(self): + self.preGraX = self.GraX.copy() + self.updateX() + self.num_updates += 1 + self.R = self.get_residual_matrix() + self.objective_function = self.get_objective_function() + print(f"Objective function after updateX: {self.objective_function:.5e}") + self.objective_history.append(self.objective_function) + if self.objective_difference is None: + self.objective_difference = self.objective_history[-1] - self.objective_function - # Check whether to break out early - # TODO this condition has not been tested, and may have issues - if len(self.objective_history) >= 3: # Ensure at least 3 values exist - if self.objective_history[-3] - self.objective_function < self.objective_difference * 1e-3: - break # Stop if improvement is too small + # Now we update Y + self.updateY2() + self.num_updates += 1 + self.R = self.get_residual_matrix() + self.objective_function = self.get_objective_function() + print(f"Objective function after updateY2: {self.objective_function:.5e}") + self.objective_history.append(self.objective_function) self.updateA2() @@ -156,11 +131,10 @@ def outer_loop(self): self.objective_history.append(self.objective_function) self.objective_difference = self.objective_history[-2] - self.objective_history[-1] - def apply_interpolation(self, a, x): + def apply_interpolation(self, a, x, return_derivatives=False): """ Applies an interpolation-based transformation to `x` based on scaling `a`. - Also computes first (`Tx`) and second (`Hx`) derivatives. - This replicates MATLAB-style behavior without explicit reshaping. + Also can compute first (`Tx`) and second (`Hx`) derivatives. """ N = len(x) @@ -187,15 +161,20 @@ def apply_interpolation(self, a, x): Ax_tail = np.full((N - len(idx_int), Ax.shape[1]), Ax[-1, :]) Ax = np.vstack([Ax, Ax_tail]) - # Compute first derivative (Tx) - di = -idx_frac / a - Tx = x[idx_int] * (-di) + x[np.minimum(idx_int + 1, N - 1)] * di - Tx = np.vstack([Tx, np.zeros((N - len(idx_int), Tx.shape[1]))]) + if return_derivatives: + # Compute first derivative (Tx) + di = -idx_frac / a + Tx = x[idx_int] * (-di) + x[np.minimum(idx_int + 1, N - 1)] * di + Tx = np.vstack([Tx, np.zeros((N - len(idx_int), Tx.shape[1]))]) - # Compute second derivative (Hx) - ddi = -di / a + idx_frac * a**-2 - Hx = x[idx_int] * (-ddi) + x[np.minimum(idx_int + 1, N - 1)] * ddi - Hx = np.vstack([Hx, np.zeros((N - len(idx_int), Hx.shape[1]))]) + # Compute second derivative (Hx) + ddi = -di / a + idx_frac * a**-2 + Hx = x[idx_int] * (-ddi) + x[np.minimum(idx_int + 1, N - 1)] * ddi + Hx = np.vstack([Hx, np.zeros((N - len(idx_int), Hx.shape[1]))]) + else: + # Make placeholders + Tx = np.empty(Ax.shape) + Hx = np.empty(Ax.shape) return Ax, Tx, Hx @@ -211,7 +190,7 @@ def get_residual_matrix(self, X=None, Y=None, A=None): R = -self.MM.copy() # Compute transformed X for all (k, m) pairs for k in range(Y.shape[0]): # K - Ax, _, _ = self.apply_interpolation(A[k, :], X[:, k]) # Only use Ax + Ax, _, _ = self.apply_interpolation(A[k, :], X[:, k]) # Only calculate Ax R += Y[k, :] * Ax # Element-wise scaling and sum return R @@ -227,7 +206,7 @@ def get_objective_function(self, R=None, A=None): function = residual_term + regularization_term + sparsity_term return function - def apply_interpolation_matrix(self, X=None, Y=None, A=None): + def apply_interpolation_matrix(self, X=None, Y=None, A=None, return_derivatives=False): """ Applies an interpolation-based transformation to the matrix `X` using `A`, weighted by `Y`. Optionally computes first (`Tx`) and second (`Hx`) derivatives. @@ -286,15 +265,20 @@ def apply_interpolation_matrix(self, X=None, Y=None, A=None): Ax2 = XI1 * (1 - iI) + XI2 * iI Ax = Ax2 * YY # Apply weighting - # Compute first derivative (MATLAB: Tx2=XI1.*(-di)+XI2.*di; Tx=Tx2.*YY) - di = -ii * AA - Tx2 = XI1 * (-di) + XI2 * di - Tx = Tx2 * YY + if return_derivatives: + # Compute first derivative (MATLAB: Tx2=XI1.*(-di)+XI2.*di; Tx=Tx2.*YY) + di = -ii * AA + Tx2 = XI1 * (-di) + XI2 * di + Tx = Tx2 * YY - # Compute second derivative (MATLAB: Hx2=XI1.*(-ddi)+XI2.*ddi; Hx=Hx2.*YY) - ddi = -di * AA * 2 - Hx2 = XI1 * (-ddi) + XI2 * ddi - Hx = Hx2 * YY + # Compute second derivative (MATLAB: Hx2=XI1.*(-ddi)+XI2.*ddi; Hx=Hx2.*YY) + ddi = -di * AA * 2 + Hx2 = XI1 * (-ddi) + XI2 * ddi + Hx = Hx2 * YY + else: + shape = Ax.shape + Tx = np.empty(shape) + Hx = np.empty(shape) return Ax, Tx, Hx @@ -359,55 +343,113 @@ def apply_transformation_matrix(self, A=None, Y=None, R=None): return AT - def solve_mkr_box(self, T, m): + def solve_quadratic_two(self, T, m): """ - Solves the quadratic program for updating y in stretched NMF: + Solves the quadratic program for updating y in stretched NMF using scipy.optimize: min J(y) = 0.5 * y^T Q y + d^T y subject to: 0 ≤ y ≤ 1 + Uses the 'trust-constr' solver with the analytical gradient and Hessian. + Parameters: - - T: (N, K) matrix computed from getAfun(A(k,m), X(:,k)) - - MM_col: (N,) column of MM for the corresponding m + - T: (N, K) ndarray + Matrix computed from getAfun(A(k, m), X[:, k]). + - m: int + Index of the current column in MM. Returns: - - y: (K,) optimal solution + - y: (K,) ndarray + Optimal solution for y, clipped to ensure non-negativity. + """ + MM_col = self.MM[:, m] + + Q = T.T @ T + d = -T.T @ MM_col + + K = Q.shape[0] + + reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") + Q += np.eye(K) * reg_factor + + # Objective function + def objective(y): + return 0.5 * y @ Q @ y + d @ y + + # Gradient (first derivative) + def grad(y): + return Q @ y + d + + # Hessian (constant for quadratic problems) + def hess(y): + return csc_matrix(Q) # sparse format for efficiency + + bounds = [(0, 1)] * K + y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) + + result = minimize( + objective, y0, method="trust-constr", jac=grad, hess=hess, bounds=bounds, options={"verbose": 0} + ) + + return np.maximum(result.x, 0) + + def solve_quadratic_program(self, T, m): """ + Solves the quadratic program for updating y in stretched NMF using scipy.optimize: + + min J(y) = 0.5 * y^T Q y + d^T y + subject to: 0 ≤ y ≤ 1 + + This uses scipy's L-BFGS-B algorithm,which supports bound + constraints and is compatible with scikit-learn dependencies. + Something like trust-constr could be worth trying as well. + Parameters: + - T: (N, K) ndarray + Matrix computed from getAfun(A(k, m), X[:, k]). + - m: int + Index of the current column in MM. + + Returns: + - y: (K,) ndarray + Optimal solution for y, clipped to ensure non-negativity. + """ MM_col = self.MM[:, m] # Compute Q and d - Q = T.T @ T # Gram matrix (K x K) - d = -T.T @ MM_col # Linear term (K,) + Q = T.T @ T + d = -T.T @ MM_col - K = Q.shape[0] # Number of variables + K = Q.shape[0] # Regularize Q to ensure positive semi-definiteness - reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") # Adaptive regularization, original was fixed + reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") Q += np.eye(K) * reg_factor - # Define optimization variable - y = cp.Variable(K) + # Define the objective function: 0.5 * y^T Q y + d^T y + def objective(y): + return 0.5 * y @ Q @ y + d @ y - # Define quadratic objective - objective = cp.Minimize(0.5 * cp.quad_form(y, Q) + d.T @ y) + # Gradient of the objective + def grad(y): + return Q @ y + d - # Define constraints (0 ≤ y ≤ 1) - constraints = [y >= 0, y <= 1] + # Bounds for each variable: 0 ≤ y_i ≤ 1 + bounds = [(0, 1) for _ in range(K)] - # Solve using a QP solver - prob = cp.Problem(objective, constraints) - prob.solve(solver=cp.OSQP, verbose=False) + # Initial guess (can also use np.random.rand(K) or something more informed) + y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) - # Get the solution - return np.maximum(y.value, 0) # Ensure non-negative values in case of solver tolerance issues + result = minimize(objective, y0, method="L-BFGS-B", jac=grad, bounds=bounds) + + return np.maximum(result.x, 0) def updateX(self): """ Updates `X` using gradient-based optimization with adaptive step size L. """ # Compute `AX` using the interpolation function - AX, _, _ = self.apply_interpolation_matrix() # Discard the other two outputs + AX, _, _ = self.apply_interpolation_matrix() # Skip the other two outputs # Compute RA and RR intermediate_RA = AX.flatten(order="F").reshape((self.N * self.M, self.K), order="F") RA = intermediate_RA.sum(axis=1).reshape((self.N, self.M), order="F") @@ -418,7 +460,7 @@ def updateX(self): # Compute initial step size `L0` L0 = np.linalg.eigvalsh(self.Y.T @ self.Y).max() * np.max([self.A.max(), 1 / self.A.min()]) # Compute adaptive step size `L` - if self.outiter == 0 and self.iter == 0: + if self.preX is None: L = L0 else: num = np.sum((self.GraX - self.preGraX) * (self.X - self.preX)) # Element-wise multiplication @@ -433,8 +475,7 @@ def updateX(self): while True: # iterate updating X x_step = self.preX - self.GraX / L # Solve x^3 + p*x + q = 0 for the largest real root - # off the shelf solver did not work element-wise for matrices - self.X = np.square(rooth(-x_step, self.eta / (2 * L))) + self.X = np.square(cubic_largest_real_root(-x_step, self.eta / (2 * L))) # Mask values that should be set to zero mask = self.X**2 * L / 2 - L * self.X * x_step + self.eta * np.sqrt(self.X) < 0 self.X = mask * self.X @@ -465,10 +506,12 @@ def updateY2(self): # Populate T using apply_interpolation for k in range(K): - T[:, k] = self.apply_interpolation(self.A[k, m], self.X[:, k])[0].squeeze() + T[:, k] = self.apply_interpolation(self.A[k, m], self.X[:, k], return_derivatives=True)[ + 0 + ].squeeze() - # Solve quadratic problem for y using solve_mkr_box - y = self.solve_mkr_box(T=T, m=m) + # Solve quadratic problem for y + y = self.solve_quadratic_program(T=T, m=m) # Update Y self.Y[:, m] = y @@ -479,7 +522,6 @@ def regularize_function(self, A=None): Returns: - fun: Objective function value (scalar) - gra: Gradient (same shape as A) - - hes: Hessian matrix (M*K, M*K) """ if A is None: A = self.A @@ -489,7 +531,7 @@ def regularize_function(self, A=None): N = self.N # Compute interpolated matrices - AX, TX, HX = self.apply_interpolation_matrix(A=A) + AX, TX, HX = self.apply_interpolation_matrix(A=A, return_derivatives=True) # Compute residual intermediate_RA = AX.flatten(order="F").reshape((N * M, K), order="F") @@ -504,52 +546,7 @@ def regularize_function(self, A=None): der_reshaped = np.asarray(tiled_derivative).reshape((M, K), order="F") gra = der_reshaped.T + self.rho * A @ self.P.T @ self.P - # Compute Hessian - hess = np.zeros((K * M, K * M)) - - for m in range(M): - Tx = TX[:, m + M * np.arange(K)] # shape (N, K) - - TxT_Tx = Tx.T @ Tx - for k1 in range(K): - for k2 in range(K): - i = k1 * M + m - j = k2 * M + m - hess[i, j] += TxT_Tx[k1, k2] - - # Add diagonal - hess += np.diag(np.sum(HX * np.tile(RA, (1, K)), axis=0).reshape((M, K)).T.flatten(order="C")) - - # Add PPPP - hess += self.rho * self.PPPP - - # Final cleanup (optional but good practice) - hess = (hess + hess.T) / 2 - hess = np.asarray(hess, dtype=np.float64) - - """ - hess = np.zeros((M * K, M * K)) - - for m in range(M): - Tx = TX[:, m + M * np.arange(K)] # Now using all rows - hess[m * K : (m + 1) * K, m * K : (m + 1) * K] = Tx.T @ Tx - - hess = ( - hess - + spdiags( - np.reshape( - np.reshape(np.sum(HX * np.tile(RA, (1, K)), axis=0), (M, K), order='F').T, - (M * K,), - ), - 0, # Diagonal index - M * K, # Number of rows - M * K, # Number of columns - ).toarray() - + self.rho * self.PPPP - ) - """ - - return fun, gra, hess + return fun, gra def updateA2(self): """ @@ -562,9 +559,9 @@ def updateA2(self): # Define the optimization function def objective(A_vec): A_matrix = A_vec.reshape(self.A.shape) # Reshape back to matrix form - fun, gra, hess = self.regularize_function(A_matrix) + fun, gra = self.regularize_function(A_matrix) gra = gra.flatten() - return fun, gra, hess + return fun, gra # Optimization constraints: lower bound 0.1, no upper bound bounds = [(0.1, None)] * A_initial.size # Equivalent to 0.1 * ones(K, M) @@ -575,7 +572,6 @@ def objective(A_vec): x0=A_initial, # Initial guess method="trust-constr", # Equivalent to 'trust-region-reflective' jac=lambda A_vec: objective(A_vec)[1], # Gradient - # hess=lambda A_vec: objective(A_vec)[2], # Hessian bounds=bounds, # Lower bounds on A ) @@ -583,34 +579,24 @@ def objective(A_vec): self.A = result.x.reshape(self.A.shape) -def rooth(p, q): +def cubic_largest_real_root(p, q): """ - Solves x^3 + p*x + q = 0 element-wise for matrices, returning the largest real root. + Vectorized solver for x^3 + p*x + q = 0. + Returns the largest real root element-wise. """ - # Handle special case where q == 0 - y = np.where(q == 0, np.maximum(0, -p) ** 0.5, np.zeros_like(p)) # q=0 case - - # Compute discriminant + # calculate the discriminant delta = (q / 2) ** 2 + (p / 3) ** 3 + sqrt_delta = np.sqrt(np.abs(delta)) - # Compute square root of delta safely - d = np.where(delta >= 0, np.sqrt(delta), np.sqrt(np.abs(delta)) * 1j) - # TODO: this line causes a warning but results seem correct - - # Compute cube roots safely - a1 = (-q / 2 + d) ** (1 / 3) - a2 = (-q / 2 - d) ** (1 / 3) - - # Compute cube roots of unity - w = (np.sqrt(3) * 1j - 1) / 2 - - # Compute the three possible roots (element-wise) - y1 = a1 + a2 - y2 = w * a1 + w**2 * a2 - y3 = w**2 * a1 + w * a2 + # When delta >= 0: one real root + A = np.cbrt(-q / 2 + sqrt_delta) + B = np.cbrt(-q / 2 - sqrt_delta) + root1 = A + B - # Take the largest real root element-wise when delta < 0 - real_roots = np.stack([np.real(y1), np.real(y2), np.real(y3)], axis=0) - y = np.max(real_roots, axis=0) * (delta < 0) # Keep only real roots when delta < 0 + # When delta < 0: three real roots, use trigonometric method + phi = np.arccos(-q / (2 * np.sqrt(-((p / 3) ** 3) + 1e-12))) + r = 2 * np.sqrt(-p / 3) + root2 = r * np.cos(phi / 3) - return y + # Choose correct root depending on sign of delta + return np.where(delta >= 0, root1, root2) From 5e3740f16a85d075d12b68f220b8d6b2b0ee4609 Mon Sep 17 00:00:00 2001 From: John Halloran Date: Mon, 28 Apr 2025 11:03:40 -0400 Subject: [PATCH 2/2] Combined quadratic solvers --- src/diffpy/snmf/snmf_class.py | 80 +++++++---------------------------- 1 file changed, 15 insertions(+), 65 deletions(-) diff --git a/src/diffpy/snmf/snmf_class.py b/src/diffpy/snmf/snmf_class.py index 84831132..9d58e5f2 100644 --- a/src/diffpy/snmf/snmf_class.py +++ b/src/diffpy/snmf/snmf_class.py @@ -343,7 +343,7 @@ def apply_transformation_matrix(self, A=None, Y=None, R=None): return AT - def solve_quadratic_two(self, T, m): + def solve_quadratic_program(self, T, m, alg="trust-constr"): """ Solves the quadratic program for updating y in stretched NMF using scipy.optimize: @@ -351,6 +351,8 @@ def solve_quadratic_two(self, T, m): subject to: 0 ≤ y ≤ 1 Uses the 'trust-constr' solver with the analytical gradient and Hessian. + Alternatively, can use scipy's L-BFGS-B algorithm, which supports bound + constraints. Parameters: - T: (N, K) ndarray @@ -363,84 +365,32 @@ def solve_quadratic_two(self, T, m): Optimal solution for y, clipped to ensure non-negativity. """ MM_col = self.MM[:, m] - - Q = T.T @ T - d = -T.T @ MM_col - - K = Q.shape[0] - - reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") - Q += np.eye(K) * reg_factor - - # Objective function - def objective(y): - return 0.5 * y @ Q @ y + d @ y - - # Gradient (first derivative) - def grad(y): - return Q @ y + d - - # Hessian (constant for quadratic problems) - def hess(y): - return csc_matrix(Q) # sparse format for efficiency - - bounds = [(0, 1)] * K - y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) - - result = minimize( - objective, y0, method="trust-constr", jac=grad, hess=hess, bounds=bounds, options={"verbose": 0} - ) - - return np.maximum(result.x, 0) - - def solve_quadratic_program(self, T, m): - """ - Solves the quadratic program for updating y in stretched NMF using scipy.optimize: - - min J(y) = 0.5 * y^T Q y + d^T y - subject to: 0 ≤ y ≤ 1 - - This uses scipy's L-BFGS-B algorithm,which supports bound - constraints and is compatible with scikit-learn dependencies. - Something like trust-constr could be worth trying as well. - - Parameters: - - T: (N, K) ndarray - Matrix computed from getAfun(A(k, m), X[:, k]). - - m: int - Index of the current column in MM. - - Returns: - - y: (K,) ndarray - Optimal solution for y, clipped to ensure non-negativity. - """ - MM_col = self.MM[:, m] - - # Compute Q and d Q = T.T @ T d = -T.T @ MM_col - K = Q.shape[0] - - # Regularize Q to ensure positive semi-definiteness reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") Q += np.eye(K) * reg_factor - # Define the objective function: 0.5 * y^T Q y + d^T y def objective(y): return 0.5 * y @ Q @ y + d @ y - # Gradient of the objective def grad(y): return Q @ y + d - # Bounds for each variable: 0 ≤ y_i ≤ 1 - bounds = [(0, 1) for _ in range(K)] + if alg == "trust-constr": - # Initial guess (can also use np.random.rand(K) or something more informed) - y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) + def hess(y): + return csc_matrix(Q) # sparse format for efficiency - result = minimize(objective, y0, method="L-BFGS-B", jac=grad, bounds=bounds) + bounds = [(0, 1)] * K + y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) + result = minimize( + objective, y0, method="trust-constr", jac=grad, hess=hess, bounds=bounds, options={"verbose": 0} + ) + elif alg == "L-BFGS-B": + bounds = [(0, 1) for _ in range(K)] # per-variable bounds + y0 = np.clip(-np.linalg.solve(Q + np.eye(K) * 1e-5, d), 0, 1) # Initial guess + result = minimize(objective, y0, method="L-BFGS-B", jac=grad, bounds=bounds) return np.maximum(result.x, 0)