diff --git a/.gitignore b/.gitignore index 4c9132fad..353f67f09 100644 --- a/.gitignore +++ b/.gitignore @@ -122,3 +122,4 @@ docs/source/auto_examples/ docs/source/examples/mydask.png dask-worker-space +.coverage diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 803007aa0..6fd28e258 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,4 +13,3 @@ repos: rev: v4.3.21 hooks: - id: isort - diff --git a/LICENSE.txt b/LICENSE.txt index 927f3c0ff..82c2fb457 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -25,4 +25,4 @@ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF -THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file +THE POSSIBILITY OF SUCH DAMAGE. diff --git a/ci/environment-3.7.yaml b/ci/environment-3.7.yaml index 77b0b59cc..60a664067 100644 --- a/ci/environment-3.7.yaml +++ b/ci/environment-3.7.yaml @@ -14,7 +14,7 @@ dependencies: - multipledispatch >=0.4.9 - mypy - numba - - numpy >=1.16.3 + - numpy >=1.17.0 - numpydoc - packaging - pandas diff --git a/ci/environment-docs.yaml b/ci/environment-docs.yaml index 2dc8b1ada..0473710ac 100644 --- a/ci/environment-docs.yaml +++ b/ci/environment-docs.yaml @@ -36,6 +36,7 @@ dependencies: - tornado - toolz - xgboost + - dask-xgboost - zict - pip - dask diff --git a/dask_ml/_compat.py b/dask_ml/_compat.py index 960584a98..1c45c025e 100644 --- a/dask_ml/_compat.py +++ b/dask_ml/_compat.py @@ -1,4 +1,5 @@ import contextlib +import importlib import os from collections.abc import Mapping # noqa from typing import Any, List, Optional, Union @@ -19,6 +20,7 @@ SK_024 = SK_VERSION >= packaging.version.parse("0.24.0.dev0") DASK_240 = DASK_VERSION >= packaging.version.parse("2.4.0") DASK_2130 = DASK_VERSION >= packaging.version.parse("2.13.0") +DASK_2200 = DASK_VERSION > packaging.version.parse("2.19.0") # TODO: update to >= DISTRIBUTED_2_5_0 = DISTRIBUTED_VERSION > packaging.version.parse("2.5.0") DISTRIBUTED_2_11_0 = DISTRIBUTED_VERSION > packaging.version.parse("2.10.0") # dev WINDOWS = os.name == "nt" @@ -40,6 +42,15 @@ def check_is_fitted(est, attributes: Optional[Union[str, List[str]]] = None): return sklearn.utils.validation.check_is_fitted(est, *args) +def _import_sparse(): + try: + return importlib.import_module("sparse") + except ImportError: + raise ImportError( + "This requires the optional 'sparse' library. Please install 'sparse'." + ) + + def _check_multimetric_scoring(estimator, scoring=None): from sklearn.metrics._scorer import _check_multimetric_scoring diff --git a/dask_ml/_utils.py b/dask_ml/_utils.py index caa9f0063..10b7eb53a 100644 --- a/dask_ml/_utils.py +++ b/dask_ml/_utils.py @@ -5,6 +5,14 @@ from sklearn.base import BaseEstimator +def is_sparse(x): + try: + from sparse import SparseArray + except ImportError: + return False + return isinstance(x, SparseArray) + + def copy_learned_attributes(from_estimator, to_estimator): attrs = {k: v for k, v in vars(from_estimator).items() if k.endswith("_")} diff --git a/dask_ml/datasets.py b/dask_ml/datasets.py index 6ea1bd60d..62e78f668 100644 --- a/dask_ml/datasets.py +++ b/dask_ml/datasets.py @@ -10,6 +10,8 @@ import dask_ml.utils +from . import _compat + def _check_axis_partitioning(chunks, n_features): c = chunks[1][0] @@ -30,6 +32,7 @@ def make_counts( scale=1.0, chunks=100, random_state=None, + is_sparse=False, ): """ Generate a dummy dataset for modeling count data. @@ -72,6 +75,11 @@ def make_counts( z0 = X[:, informative_idx].dot(beta[informative_idx]) rate = da.exp(z0) y = rng.poisson(rate, size=1, chunks=(chunks,)) + + if is_sparse: + sparse = _compat._import_sparse() + X = X.map_blocks(sparse.COO) + return X, y @@ -218,6 +226,7 @@ def make_regression( coef=False, random_state=None, chunks=None, + is_sparse=False, ): """ Generate a random regression problem. @@ -334,6 +343,10 @@ def make_regression( y_big = y_big.squeeze() + if is_sparse: + sparse = _compat._import_sparse() + X_big = X_big.map_blocks(sparse.COO) + if return_coef: return X_big, y_big, coef else: @@ -357,6 +370,7 @@ def make_classification( shuffle=True, random_state=None, chunks=None, + is_sparse=False, ): chunks = da.core.normalize_chunks(chunks, (n_samples, n_features)) _check_axis_partitioning(chunks, n_features) @@ -378,9 +392,16 @@ def make_classification( y = rng.random(z0.shape, chunks=chunks[0]) < 1 / (1 + da.exp(-z0)) y = y.astype(int) + if is_sparse: + sparse = _compat._import_sparse() + X = X.map_blocks(sparse.COO) + return X, y +make_poisson = make_counts + + def random_date(start, end): delta = end - start int_delta = (delta.days * 24 * 60 * 60) + delta.seconds diff --git a/dask_ml/linear_model/algorithms.py b/dask_ml/linear_model/algorithms.py new file mode 100644 index 000000000..32a20e852 --- /dev/null +++ b/dask_ml/linear_model/algorithms.py @@ -0,0 +1,519 @@ +"""Optimization algorithms for solving minimizaiton problems. +""" + +from __future__ import absolute_import, division, print_function + +import functools + +import dask +import dask.array as da +import numpy as np +from dask import compute, delayed, persist +from distributed import get_client +from scipy.optimize import fmin_l_bfgs_b + +from dask_ml.linear_model.families import Logistic +from dask_ml.linear_model.regularizers import Regularizer +from dask_ml.linear_model.utils import dot, normalize, scatter_array + + +def compute_stepsize_dask( + beta, + step, + Xbeta, + Xstep, + y, + curr_val, + family=Logistic, + stepSize=1.0, + armijoMult=0.1, + backtrackMult=0.1, +): + """Compute the optimal stepsize + + Parameters + ---------- + beta : array-like + step : float + XBeta : array-like + Xstep : float + y : array-like + curr_val : float + famlily : Family, optional + stepSize : float, optional + armijoMult : float, optional + backtrackMult : float, optional + + Returns + ------- + stepSize : float + beta : array-like + xBeta : array-like + func : callable + """ + + loglike = family.loglike + beta, step, Xbeta, Xstep, y, curr_val = persist( + beta, step, Xbeta, Xstep, y, curr_val + ) + obeta, oXbeta = beta, Xbeta + (step,) = compute(step) + steplen = (step ** 2).sum() + lf = curr_val + func = 0 + for ii in range(100): + beta = obeta - stepSize * step + if ii and (beta == obeta).all(): + stepSize = 0 + break + + Xbeta = oXbeta - stepSize * Xstep + func = loglike(Xbeta, y) + Xbeta, func = persist(Xbeta, func) + + df = lf - compute(func)[0] + if df >= armijoMult * stepSize * steplen: + break + stepSize *= backtrackMult + + return stepSize, beta, Xbeta, func + + +@normalize +def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs): + """ + Michael Grant's implementation of Gradient Descent. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + y : array-like, shape (n_samples,) + max_iter : int + maximum number of iterations to attempt before declaring + failure to converge + tol : float + Maximum allowed change from prior iteration required to + declare convergence + family : Family + + Returns + ------- + beta : array-like, shape (n_features,) + n_iter : number of iterations executed + """ + loglike, gradient = family.loglike, family.gradient + n, p = X.shape + firstBacktrackMult = 0.1 + nextBacktrackMult = 0.5 + armijoMult = 0.1 + stepGrowth = 1.25 + stepSize = 1.0 + recalcRate = 10 + backtrackMult = firstBacktrackMult + beta = np.zeros(p) + n_iter = 0 + + for k in range(max_iter): + # how necessary is this recalculation? + if k % recalcRate == 0: + Xbeta = X.dot(beta) + func = loglike(Xbeta, y) + + grad = gradient(Xbeta, X, y) + Xgradient = X.dot(grad) + + # backtracking line search + lf = func + stepSize, _, _, func = compute_stepsize_dask( + beta, + grad, + Xbeta, + Xgradient, + y, + func, + family=family, + backtrackMult=backtrackMult, + armijoMult=armijoMult, + stepSize=stepSize, + ) + + beta, stepSize, Xbeta, lf, func, grad, Xgradient = persist( + beta, stepSize, Xbeta, lf, func, grad, Xgradient + ) + + stepSize, lf, func, grad = compute(stepSize, lf, func, grad) + + beta = ( + beta - stepSize * grad + ) # tiny bit of repeat work here to avoid communication + Xbeta = Xbeta - stepSize * Xgradient + + n_iter += 1 + + if stepSize == 0: + break + + df = lf - func + df /= max(func, lf) + + if df < tol: + break + stepSize *= stepGrowth + backtrackMult = nextBacktrackMult + + return beta, n_iter + + +@normalize +def newton(X, y, max_iter=50, tol=1e-8, family=Logistic, rcond=None, **kwargs): + """Newton's Method for Logistic Regression. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + y : array-like, shape (n_samples,) + max_iter : int + maximum number of iterations to attempt before declaring + failure to converge + tol : float + Maximum allowed change from prior iteration required to + declare convergence + family : Family + + Returns + ------- + beta : array-like, shape (n_features,) + n_iter : number of iterations executed + """ + gradient, hessian = family.gradient, family.hessian + n, p = X.shape + beta = np.zeros(p) # always init to zeros? + Xbeta = dot(X, beta) + + n_iter = 0 + converged = False + + while not converged: + beta_old = beta + + # should this use map_blocks()? + hess = hessian(Xbeta, X) + grad = gradient(Xbeta, X, y) + + hess, grad = da.compute(hess, grad) + + # should this be dask or numpy? + # currently uses Python 3 specific syntax + step, _, _, _ = np.linalg.lstsq(hess, grad, rcond=rcond) + beta = beta_old - step + + # should change this criterion + coef_change = np.absolute(beta_old - beta) + n_iter += 1 + converged = (not np.any(coef_change > tol)) or (n_iter >= max_iter) + + if not converged: + Xbeta = dot(X, beta) # numpy -> dask converstion of beta + + return beta, n_iter + + +@normalize +def admm( + X, + y, + regularizer="l1", + lamduh=0.1, + rho=1, + over_relax=1, + max_iter=250, + abstol=1e-4, + reltol=1e-2, + family=Logistic, + **kwargs +): + """ + Alternating Direction Method of Multipliers + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + y : array-like, shape (n_samples,) + regularizer : str or Regularizer + lamduh : float + rho : float + over_relax : FLOAT + max_iter : int + maximum number of iterations to attempt before declaring + failure to converge + abstol, reltol : float + family : Family + + Returns + ------- + beta : array-like, shape (n_features,) + n_iter : number of iterations executed + """ + pointwise_loss = family.pointwise_loss + pointwise_gradient = family.pointwise_gradient + regularizer = Regularizer.get(regularizer) + + def create_local_gradient(func): + @functools.wraps(func) + def wrapped(beta, X, y, z, u, rho): + return func(beta, X, y) + rho * (beta - z + u) + + return wrapped + + def create_local_f(func): + @functools.wraps(func) + def wrapped(beta, X, y, z, u, rho): + return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u, beta - z + u) + + return wrapped + + f = create_local_f(pointwise_loss) + fprime = create_local_gradient(pointwise_gradient) + + nchunks = getattr(X, "npartitions", 1) + # nchunks = X.npartitions + (n, p) = X.shape + # XD = X.to_delayed().flatten().tolist() + # yD = y.to_delayed().flatten().tolist() + if isinstance(X, da.Array): + XD = X.rechunk((None, X.shape[-1])).to_delayed().flatten().tolist() + else: + XD = [X] + if isinstance(y, da.Array): + yD = y.rechunk((None, y.shape[-1])).to_delayed().flatten().tolist() + else: + yD = [y] + + z = np.zeros(p) + u = np.array([np.zeros(p) for i in range(nchunks)]) + betas = np.array([np.ones(p) for i in range(nchunks)]) + + n_iter = 0 + + for k in range(max_iter): + + # x-update step + new_betas = [ + delayed(local_update)(xx, yy, bb, z, uu, rho, f=f, fprime=fprime) + for xx, yy, bb, uu in zip(XD, yD, betas, u) + ] + new_betas = np.array(da.compute(*new_betas)) + + beta_hat = over_relax * new_betas + (1 - over_relax) * z + + # z-update step + zold = z.copy() + ztilde = np.mean(beta_hat + np.array(u), axis=0) + z = regularizer.proximal_operator(ztilde, lamduh / (rho * nchunks)) + + # u-update step + u += beta_hat - z + + # check for convergence + primal_res = np.linalg.norm(new_betas - z) + dual_res = np.linalg.norm(rho * (z - zold)) + + eps_pri = np.sqrt(p * nchunks) * abstol + reltol * np.maximum( + np.linalg.norm(new_betas), np.sqrt(nchunks) * np.linalg.norm(z) + ) + eps_dual = np.sqrt(p * nchunks) * abstol + reltol * np.linalg.norm(rho * u) + + n_iter += 1 + + if primal_res < eps_pri and dual_res < eps_dual: + break + + return z, n_iter + + +def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b): + + beta = beta.ravel() + u = u.ravel() + z = z.ravel() + solver_args = (X, y, z, u, rho) + beta, f, d = solver( + f, beta, fprime=fprime, args=solver_args, maxiter=200, maxfun=250 + ) + + return beta + + +@normalize +def lbfgs( + X, + y, + regularizer=None, + lamduh=1.0, + max_iter=100, + tol=1e-4, + family=Logistic, + verbose=False, + **kwargs +): + """L-BFGS solver using scipy.optimize implementation + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + y : array-like, shape (n_samples,) + regularizer : str or Regularizer + lamduh : float + max_iter : int + maximum number of iterations to attempt before declaring + failure to converge + tol : float + Maximum allowed change from prior iteration required to + declare convergence + family : Family + verbose : bool, default False + whether to print diagnostic information during convergence + + Returns + ------- + beta : array-like, shape (n_features,) + n_iter : number of iterations executed + """ + try: + dask_distributed_client = get_client() + except ValueError: + dask_distributed_client = None + + pointwise_loss = family.pointwise_loss + pointwise_gradient = family.pointwise_gradient + if regularizer is not None: + regularizer = Regularizer.get(regularizer) + pointwise_loss = regularizer.add_reg_f(pointwise_loss, lamduh) + pointwise_gradient = regularizer.add_reg_grad(pointwise_gradient, lamduh) + + n, p = X.shape + beta0 = np.zeros(p) + + def compute_loss_grad(beta, X, y): + scatter_beta = ( + scatter_array(beta, dask_distributed_client) + if dask_distributed_client + else beta + ) + loss_fn = pointwise_loss(scatter_beta, X, y) + gradient_fn = pointwise_gradient(scatter_beta, X, y) + loss, gradient = compute(loss_fn, gradient_fn) + return loss, gradient.copy() + + with dask.config.set(fuse_ave_width=0): # optimizations slows this down + beta, loss, info = fmin_l_bfgs_b( + compute_loss_grad, + beta0, + fprime=None, + args=(X, y), + iprint=(verbose > 0) - 1, + pgtol=tol, + maxiter=max_iter, + ) + + return beta, info["nit"] + + +@normalize +def proximal_grad( + X, + y, + regularizer="l1", + lamduh=0.1, + family=Logistic, + max_iter=100, + tol=1e-8, + **kwargs +): + """ + Proximal Gradient Method + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + y : array-like, shape (n_samples,) + regularizer : str or Regularizer + lamduh : float + max_iter : int + maximum number of iterations to attempt before declaring + failure to converge + tol : float + Maximum allowed change from prior iteration required to + declare convergence + family : Family + + Returns + ------- + beta : array-like, shape (n_features,) + n_iter : number of iterations executed + """ + n, p = X.shape + firstBacktrackMult = 0.1 + nextBacktrackMult = 0.5 + stepGrowth = 1.25 + stepSize = 1.0 + recalcRate = 10 + backtrackMult = firstBacktrackMult + beta = np.zeros(p) + regularizer = Regularizer.get(regularizer) + + n_iter = 0 + + for k in range(max_iter): + # Compute the gradient + if k % recalcRate == 0: + Xbeta = X.dot(beta) + func = family.loglike(Xbeta, y) + + gradient = family.gradient(Xbeta, X, y) + + Xbeta, func, gradient = persist(Xbeta, func, gradient) + + obeta = beta + + n_iter += 1 + + # Compute the step size + lf = func + for ii in range(100): + beta = regularizer.proximal_operator( + obeta - stepSize * gradient, stepSize * lamduh + ) + Xbeta = X.dot(beta) + + Xbeta, beta = persist(Xbeta, beta) + + func = family.loglike(Xbeta, y) + func = persist(func)[0] + func = compute(func)[0] + df = lf - func + if df > 0: + break + stepSize *= backtrackMult + if stepSize == 0: + break + df /= max(func, lf) + if df < tol: + break + stepSize *= stepGrowth + backtrackMult = nextBacktrackMult + + # L2-regularization returned a dask-array + try: + return beta.compute(), n_iter + except AttributeError: + return beta, n_iter + + +_solvers = { + "admm": admm, + "gradient_descent": gradient_descent, + "newton": newton, + "lbfgs": lbfgs, + "proximal_grad": proximal_grad, +} diff --git a/dask_ml/linear_model/families.py b/dask_ml/linear_model/families.py new file mode 100644 index 000000000..abb700f78 --- /dev/null +++ b/dask_ml/linear_model/families.py @@ -0,0 +1,123 @@ +from __future__ import absolute_import, division, print_function + +from .utils import dot, exp, log1p, sigmoid + + +class Logistic(object): + """ Implements methods for `Logistic regression`_, + + Useful for classifying binary outcomes. + + .. _Logistic regression: https://en.wikipedia.org/wiki/Logistic_regression + """ + + @staticmethod + def loglike(Xbeta, y): + """ + Evaluate the logistic loglikeliehood + + Parameters + ---------- + Xbeta : array, shape (n_samples, n_features) + y : array, shape (n_samples) + """ + enXbeta = exp(-Xbeta) + return (Xbeta + log1p(enXbeta)).sum() - dot(y, Xbeta) + + @staticmethod + def pointwise_loss(beta, X, y): + """Logistic Loss, evaluated point-wise.""" + beta, y = beta.ravel(), y.ravel() + Xbeta = X.dot(beta) + return Logistic.loglike(Xbeta, y) + + @staticmethod + def pointwise_gradient(beta, X, y): + """Logistic gradient, evaluated point-wise.""" + beta, y = beta.ravel(), y.ravel() + Xbeta = X.dot(beta) + return Logistic.gradient(Xbeta, X, y) + + @staticmethod + def gradient(Xbeta, X, y): + """Logistic gradient""" + p = sigmoid(Xbeta) + return dot(X.T, p - y) + + @staticmethod + def hessian(Xbeta, X): + """Logistic hessian""" + p = sigmoid(Xbeta) + return dot(p * (1 - p) * X.T, X) + + +class Normal(object): + """ Implements methods for `Linear regression`_, + + Useful for modeling continuous outcomes. + + .. _Linear regression: https://en.wikipedia.org/wiki/Linear_regression + """ + + @staticmethod + def loglike(Xbeta, y): + return ((y - Xbeta) ** 2).sum() + + @staticmethod + def pointwise_loss(beta, X, y): + beta, y = beta.ravel(), y.ravel() + Xbeta = X.dot(beta) + return Normal.loglike(Xbeta, y) + + @staticmethod + def pointwise_gradient(beta, X, y): + beta, y = beta.ravel(), y.ravel() + Xbeta = X.dot(beta) + return Normal.gradient(Xbeta, X, y) + + @staticmethod + def gradient(Xbeta, X, y): + return 2 * dot(X.T, Xbeta) - 2 * dot(X.T, y) + + @staticmethod + def hessian(Xbeta, X): + return 2 * dot(X.T, X) + + +class Poisson(object): + """ + This implements `Poisson regression`_ + + Useful for modelling count data. + + .. _Poisson regression: https://en.wikipedia.org/wiki/Poisson_regression + """ + + @staticmethod + def loglike(Xbeta, y): + eXbeta = exp(Xbeta) + yXbeta = y * Xbeta + return (eXbeta - yXbeta).sum() + + @staticmethod + def pointwise_loss(beta, X, y): + beta, y = beta.ravel(), y.ravel() + Xbeta = X.dot(beta) + return Poisson.loglike(Xbeta, y) + + @staticmethod + def pointwise_gradient(beta, X, y): + beta, y = beta.ravel(), y.ravel() + Xbeta = X.dot(beta) + return Poisson.gradient(Xbeta, X, y) + + @staticmethod + def gradient(Xbeta, X, y): + eXbeta = exp(Xbeta) + return dot(X.T, eXbeta - y) + + @staticmethod + def hessian(Xbeta, X): + eXbeta = exp(Xbeta) + x_diag_eXbeta = eXbeta[:, None] * X + return dot(X.T, x_diag_eXbeta) diff --git a/dask_ml/linear_model/glm.py b/dask_ml/linear_model/glm.py index 951eba1b8..b7f4391bd 100644 --- a/dask_ml/linear_model/glm.py +++ b/dask_ml/linear_model/glm.py @@ -2,19 +2,12 @@ """Generalized Linear Models for large datasets.""" import textwrap -from dask_glm import algorithms, families -from dask_glm.utils import ( - accuracy_score, - add_intercept, - dot, - exp, - poisson_deviance, - sigmoid, -) +from dask_glm.utils import add_intercept, dot, exp, poisson_deviance, sigmoid from sklearn.base import BaseEstimator -from ..metrics import r2_score +from ..metrics import accuracy_score, r2_score from ..utils import check_array +from . import algorithms, families _base_doc = textwrap.dedent( """\ diff --git a/dask_ml/linear_model/regularizers.py b/dask_ml/linear_model/regularizers.py new file mode 100644 index 000000000..7baeb959d --- /dev/null +++ b/dask_ml/linear_model/regularizers.py @@ -0,0 +1,227 @@ +from __future__ import absolute_import, division, print_function + +import numpy as np + + +class Regularizer(object): + """Abstract base class for regularization object. + + Defines the set of methods required to create a new regularization object. This includes + the regularization functions itself and its gradient, hessian, and proximal operator. + """ + + name = "_base" + + def f(self, beta): + """Regularization function. + + Parameters + ---------- + beta : array, shape (n_features,) + + Returns + ------- + result : float + """ + raise NotImplementedError + + def gradient(self, beta): + """Gradient of regularization function. + + Parameters + ---------- + beta : array, shape ``(n_features,)`` + + Returns + ------- + gradient : array, shape ``(n_features,)`` + """ + raise NotImplementedError + + def hessian(self, beta): + """Hessian of regularization function. + + Parameters + ---------- + beta : array, shape ``(n_features,)`` + + Returns + ------- + hessian : array, shape ``(n_features, n_features)`` + """ + raise NotImplementedError + + def proximal_operator(self, beta, t): + """Proximal operator for regularization function. + + Parameters + ---------- + beta : array, shape ``(n_features,)`` + t : float # TODO: is that right? + + Returns + ------- + proximal_operator : array, shape ``(n_features,)`` + """ + raise NotImplementedError + + def add_reg_f(self, f, lam): + """Add regularization function to other function. + + Parameters + ---------- + f : callable + Function taking ``beta`` and ``*args`` + lam : float + regularization constant + + Returns + ------- + wrapped : callable + function taking ``beta`` and ``*args`` + """ + + def wrapped(beta, *args): + return f(beta, *args) + lam * self.f(beta) + + return wrapped + + def add_reg_grad(self, grad, lam): + """Add regularization gradient to other gradient function. + + Parameters + ---------- + grad : callable + Function taking ``beta`` and ``*args`` + lam : float + regularization constant + + Returns + ------- + wrapped : callable + function taking ``beta`` and ``*args`` + """ + + def wrapped(beta, *args): + return grad(beta, *args) + lam * self.gradient(beta) + + return wrapped + + def add_reg_hessian(self, hess, lam): + """Add regularization hessian to other hessian function. + + Parameters + ---------- + hess : callable + Function taking ``beta`` and ``*args`` + lam : float + regularization constant + + Returns + ------- + wrapped : callable + function taking ``beta`` and ``*args`` + """ + + def wrapped(beta, *args): + return hess(beta, *args) + lam * self.hessian(beta) + + return wrapped + + @classmethod + def get(cls, obj): + """Get the concrete instance for the name ``obj``. + + Parameters + ---------- + obj : Regularizer or str + Valid instances of ``Regularizer`` are passed through. + Strings are looked up according to ``obj.name`` and a + new instance is created + + Returns + ------- + obj : Regularizer + """ + if isinstance(obj, cls): + return obj + elif isinstance(obj, str): + return {o.name: o for o in cls.__subclasses__()}[obj]() + raise TypeError("Not a valid regularizer object.") + + +class L2(Regularizer): + """L2 regularization.""" + + name = "l2" + + def f(self, beta): + return (beta ** 2).sum() / 2 + + def gradient(self, beta): + return beta + + def hessian(self, beta): + return np.eye(len(beta)) + + def proximal_operator(self, beta, t): + return 1 / (1 + t) * beta + + +class L1(Regularizer): + """L1 regularization.""" + + name = "l1" + + def f(self, beta): + return (np.abs(beta)).sum() + + def gradient(self, beta): + if np.any(np.isclose(beta, 0)): + raise ValueError("l1 norm is not differentiable at 0!") + else: + return np.sign(beta) + + def hessian(self, beta): + if np.any(np.isclose(beta, 0)): + raise ValueError("l1 norm is not twice differentiable at 0!") + return np.zeros((beta.shape[0], beta.shape[0])) + + def proximal_operator(self, beta, t): + z = np.maximum(0, beta - t) - np.maximum(0, -beta - t) + return z + + +class ElasticNet(Regularizer): + """Elastic net regularization.""" + + name = "elastic_net" + + def __init__(self, weight=0.5): + self.weight = weight + self.l1 = L1() + self.l2 = L2() + + def _weighted(self, left, right): + return self.weight * left + (1 - self.weight) * right + + def f(self, beta): + return self._weighted(self.l1.f(beta), self.l2.f(beta)) + + def gradient(self, beta): + return self._weighted(self.l1.gradient(beta), self.l2.gradient(beta)) + + def hessian(self, beta): + return self._weighted(self.l1.hessian(beta), self.l2.hessian(beta)) + + def proximal_operator(self, beta, t): + """See notebooks/ElasticNetProximalOperatorDerivation.ipynb for derivation.""" + g = self.weight * t + + @np.vectorize + def func(b): + if b <= g: + return 0 + return (b - g * np.sign(b)) / (t - g + 1) + + return beta diff --git a/dask_ml/linear_model/utils.py b/dask_ml/linear_model/utils.py index 8efb5784d..e434a701f 100644 --- a/dask_ml/linear_model/utils.py +++ b/dask_ml/linear_model/utils.py @@ -1,12 +1,38 @@ """ """ +import inspect +import sys +from functools import wraps + import dask.array as da import dask.dataframe as dd import numpy as np from multipledispatch import dispatch +from .._utils import is_sparse -@dispatch(dd._Frame) + +@dispatch(object) +def exp(A): + return A.exp() + + +@dispatch(float) # noqa: F811 +def exp(A): + return np.exp(A) + + +@dispatch(np.ndarray) # noqa: F811 +def exp(A): + return np.exp(A) + + +@dispatch(da.Array) # noqa: F811 +def exp(A): + return da.exp(A) + + +@dispatch(dd._Frame) # noqa: F811 def exp(A): return da.exp(A) @@ -33,7 +59,11 @@ def add_intercept(X): def _add_intercept(x): ones = np.ones((x.shape[0], 1), dtype=x.dtype) - return np.concatenate([ones, x], axis=1) + + if is_sparse(x): + ones = sparse.COO(ones) + + return np.concatenate([x, ones], axis=1) @dispatch(da.Array) # noqa: F811 @@ -59,3 +89,160 @@ def add_intercept(X): # noqa: F811 if "intercept" in columns: raise ValueError("'intercept' column already in 'X'") return X.assign(intercept=1)[["intercept"] + list(columns)] + + +def make_y(X, beta=np.array([1.5, -3]), chunks=2): + z0 = X.dot(beta) + y = da.random.random(z0.shape, chunks=z0.chunks) < sigmoid(z0) + return y + + +def sigmoid(x): + """Sigmoid function of x.""" + return 1 / (1 + exp(-x)) + + +@dispatch(object) # noqa: F811 +def absolute(A): + return abs(A) + + +@dispatch(np.ndarray) # noqa: F811 +def absolute(A): + return np.absolute(A) + + +@dispatch(da.Array) # noqa: F811 +def absolute(A): + return da.absolute(A) + + +@dispatch(object) # noqa: F811 +def sign(A): + return A.sign() + + +@dispatch(np.ndarray) # noqa: F811 +def sign(A): + return np.sign(A) + + +@dispatch(da.Array) # noqa: F811 +def sign(A): + return da.sign(A) + + +@dispatch(object) # noqa: F811 +def log1p(A): + return A.log1p() + + +@dispatch(np.ndarray) # noqa: F811 +def log1p(A): + return np.log1p(A) + + +@dispatch(da.Array) # noqa: F811 +def log1p(A): + return da.log1p(A) + + +@dispatch(object, object) +def dot(A, B): + x = max([A, B], key=lambda x: getattr(x, "__array_priority__", 0)) + module = package_of(x) + return module.dot(A, B) + + +@dispatch(da.Array, np.ndarray) # noqa: F811 +def dot(A, B): + B = da.from_array(B, chunks=B.shape) + return da.dot(A, B) + + +@dispatch(np.ndarray, da.Array) # noqa: F811 +def dot(A, B): + A = da.from_array(A, chunks=A.shape) + return da.dot(A, B) + + +@dispatch(np.ndarray, np.ndarray) # noqa: F811 +def dot(A, B): + return np.dot(A, B) + + +@dispatch(da.Array, da.Array) # noqa: F811 +def dot(A, B): + return da.dot(A, B) + + +def normalize(algo): + @wraps(algo) + def normalize_inputs(X, y, *args, **kwargs): + normalize = kwargs.pop("normalize", True) + if normalize: + mean, std = da.compute(X.mean(axis=0), X.std(axis=0)) + mean, std = mean.copy(), std.copy() # in case they are read-only + intercept_idx = np.where(std == 0) + if len(intercept_idx[0]) > 1: + raise ValueError("Multiple constant columns detected!") + mean[intercept_idx] = 0 + std[intercept_idx] = 1 + mean = mean if len(intercept_idx[0]) else np.zeros(mean.shape) + Xn = (X - mean) / std + out, n_iter = algo(Xn, y, *args, **kwargs) + out = out.copy() + i_adj = np.sum(out * mean / std) + out[intercept_idx] -= i_adj + return out / std, n_iter + else: + return algo(X, y, *args, **kwargs) + + return normalize_inputs + + +def package_of(obj): + """ Return package containing object's definition + Or return None if not found + """ + # http://stackoverflow.com/questions/43462701/get-package-of-python-object/43462865#43462865 + mod = inspect.getmodule(obj) + if not mod: + return + base, _sep, _stem = mod.__name__.partition(".") + return sys.modules[base] + + +def scatter_array(arr, dask_client): + """Scatter a large numpy array into workers + Return the equivalent dask array + """ + future_arr = dask_client.scatter(arr) + return da.from_delayed(future_arr, shape=arr.shape, dtype=arr.dtype) + + +def is_dask_array_sparse(X): + """ + Check using _meta if a dask array contains sparse arrays + """ + try: + import sparse + except ImportError: + return False + + return isinstance(X._meta, sparse.SparseArray) + + +try: + import sparse +except ImportError: + pass +else: + + @dispatch(sparse.COO) # noqa: F811 + def exp(x): + return np.exp(x.todense()) + + @dispatch(sparse.SparseArray) # noqa: F811 + def add_intercept(X): + return sparse.concatenate([X, sparse.COO(np.ones((X.shape[0], 1)))], axis=1) diff --git a/dask_ml/metrics/__init__.py b/dask_ml/metrics/__init__.py index db8cd8d12..6ba14df98 100644 --- a/dask_ml/metrics/__init__.py +++ b/dask_ml/metrics/__init__.py @@ -1,8 +1,23 @@ -from .classification import accuracy_score, log_loss # noqa -from .pairwise import ( # noqa +from .classification import accuracy_score, log_loss, poisson_deviance +from .pairwise import ( euclidean_distances, pairwise_distances, pairwise_distances_argmin_min, ) -from .regression import mean_absolute_error, mean_squared_error, r2_score # noqa -from .scorer import SCORERS, check_scoring, get_scorer # noqa +from .regression import mean_absolute_error, mean_squared_error, r2_score +from .scorer import SCORERS, check_scoring, get_scorer + +__all__ = [ + "accuracy_score", + "log_loss", + "poisson_deviance", + "euclidean_distances", + "pairwise_distances", + "pairwise_distances_argmin_min", + "mean_absolute_error", + "mean_squared_error", + "r2_score", + "SCORERS", + "check_scoring", + "get_scorer", +] diff --git a/dask_ml/metrics/classification.py b/dask_ml/metrics/classification.py index d5f113f81..58b95766e 100644 --- a/dask_ml/metrics/classification.py +++ b/dask_ml/metrics/classification.py @@ -151,3 +151,7 @@ def log_loss( log_loss.__doc__ = getattr(sklearn.metrics.log_loss, "__doc__") + + +def poisson_deviance(y_true, y_pred): + return 2 * (y_true * np.log1p(y_true / y_pred) - (y_true - y_pred)).sum() diff --git a/docs/source/conf.py b/docs/source/conf.py index e0bc15acf..4b20a9abb 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -51,7 +51,6 @@ "sklearn": ("http://scikit-learn.org/stable/", None), "dask": ("https://docs.dask.org/en/latest/", None), "distributed": ("https://distributed.dask.org/en/latest/", None), - "dask_glm": ("http://dask-glm.readthedocs.io/en/latest/", None), } numpydoc_class_members_toctree = False diff --git a/docs/source/glm.rst b/docs/source/glm.rst deleted file mode 100644 index 79b04f813..000000000 --- a/docs/source/glm.rst +++ /dev/null @@ -1,56 +0,0 @@ -Generalized Linear Models -========================= - -.. currentmodule:: dask_ml.linear_model - -.. autosummary:: - LinearRegression - LogisticRegression - PoissonRegression - -Generalized linear models are a broad class of commonly used models. These -implementations scale well out to large datasets either on a single machine or -distributed cluster. They can be powered by a variety of optimization -algorithms and use a variety of regularizers. - -These follow the scikit-learn estimator API, and so can be dropped into -existing routines like grid search and pipelines, but are implemented -externally with new, scalable algorithms and so can consume distributed dask -arrays and dataframes rather than just single-machine NumPy and Pandas arrays -and dataframes. - -Example -------- - -.. ipython:: python - - from dask_ml.linear_model import LogisticRegression - from dask_ml.datasets import make_classification - X, y = make_classification(chunks=50) - lr = LogisticRegression() - lr.fit(X, y) - - -Algorithms ----------- - -.. currentmodule:: dask_glm.algorithms - -.. autosummary:: - admm - gradient_descent - lbfgs - newton - proximal_grad - - -Regularizers ------------- - -.. currentmodule:: dask_glm.regularizers - -.. autosummary:: - ElasticNet - L1 - L2 - Regularizer diff --git a/docs/source/modules/api.rst b/docs/source/modules/api.rst index 27d66d80a..52510dd8a 100644 --- a/docs/source/modules/api.rst +++ b/docs/source/modules/api.rst @@ -77,6 +77,13 @@ provides the following: :mod:`dask_ml.linear_model`: Generalized Linear Models ====================================================== +**Estimators** + +Generalized linear models are a broad class of commonly used models. These +implementations scale well out to large datasets either on a single machine or +distributed cluster. They can be powered by a variety of optimization +algorithms and use a variety of regularizers. + .. automodule:: dask_ml.linear_model :no-members: :no-inherited-members: @@ -91,6 +98,42 @@ provides the following: linear_model.LogisticRegression linear_model.PoissonRegression +.. _api.families: + +**Families** + +.. automodule:: dask_ml.linear_model..families + :members: + +.. _api.algorithms: + +**Algorithms** + +.. automodule:: dask_ml.linear_model.algorithms + :members: + +.. _api.regularizers.available: + +Available ``Regularizers`` +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +These regularizers are included with dask-ml. + +.. automodule:: dask_ml.linear_model.regularizers + :members: + :exclude-members: Regularizer + +.. _api.regularizers.interface: + +``Regularizer`` Interface +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Users wishing to implement their own regularizer should +satisfy this interface. + +.. autoclass:: dask_ml.linear_model.regularizers.Regularizer + :members: + :mod:`dask_ml.wrappers`: Meta-Estimators ======================================== diff --git a/setup.py b/setup.py index 7151f38bd..82ce11841 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,6 @@ "pandas>=0.23.4", "scikit-learn>=0.23", "scipy", - "dask-glm>=0.2.0", "multipledispatch>=0.4.9", "packaging", ] diff --git a/tests/linear_model/glm/__init__.py b/tests/linear_model/glm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/linear_model/glm/test_admm.py b/tests/linear_model/glm/test_admm.py new file mode 100644 index 000000000..3fe4922b2 --- /dev/null +++ b/tests/linear_model/glm/test_admm.py @@ -0,0 +1,64 @@ +import dask.array as da +import numpy as np +import pytest +from dask import persist + +from dask_ml.linear_model.algorithms import admm, local_update +from dask_ml.linear_model.families import Logistic, Normal +from dask_ml.linear_model.regularizers import L1 +from dask_ml.linear_model.utils import make_y + + +@pytest.mark.parametrize("N", [1000, 10000]) +@pytest.mark.parametrize( + "beta", + [ + np.array([-1.5, 3]), + np.array([35, 2, 0, -3.2]), + np.array([-1e-2, 1e-4, 1.0, 2e-3, -1.2]), + ], +) +@pytest.mark.parametrize("family", [Logistic, Normal]) +def test_local_update(N, beta, family): + M = beta.shape[0] + X = np.random.random((N, M)) + y = np.random.random(N) > 0.4 + u = np.zeros(M) + z = np.random.random(M) + rho = 1e7 + + def create_local_gradient(func): + def wrapped(beta, X, y, z, u, rho): + return func(beta, X, y) + rho * (beta - z + u) + + return wrapped + + def create_local_f(func): + def wrapped(beta, X, y, z, u, rho): + return func(beta, X, y) + (rho / 2) * np.dot(beta - z + u, beta - z + u) + + return wrapped + + f = create_local_f(family.pointwise_loss) + fprime = create_local_gradient(family.pointwise_gradient) + + result = local_update(X, y, beta, z, u, rho, f=f, fprime=fprime) + + assert np.allclose(result, z, atol=2e-3) + + +@pytest.mark.parametrize("N", [1000, 10000]) +@pytest.mark.parametrize("nchunks", [5, 10]) +@pytest.mark.parametrize("p", [1, 5, 10]) +@pytest.mark.skip(reason="Slow and dubious value") +def test_admm_with_large_lamduh(N, p, nchunks): + max_iter = 500 + X = da.random.random((N, p), chunks=(N // nchunks, p)) + beta = np.random.random(p) + y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) + + X, y = persist(X, y) + z, n_iter = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=max_iter) + + assert np.allclose(z, np.zeros(p), atol=1e-4) + assert n_iter > 0 and n_iter <= max_iter diff --git a/tests/linear_model/glm/test_algos_families.py b/tests/linear_model/glm/test_algos_families.py new file mode 100644 index 000000000..c8882b953 --- /dev/null +++ b/tests/linear_model/glm/test_algos_families.py @@ -0,0 +1,185 @@ +import dask +import dask.array as da +import dask.multiprocessing +import numpy as np +import pytest +from dask import persist + +from dask_ml.linear_model.algorithms import ( + admm, + gradient_descent, + lbfgs, + newton, + proximal_grad, +) +from dask_ml.linear_model.families import Logistic, Normal, Poisson +from dask_ml.linear_model.regularizers import Regularizer +from dask_ml.linear_model.utils import make_y, sigmoid + + +def add_l1(f, lam): + def wrapped(beta, X, y): + return f(beta, X, y) + lam * (np.abs(beta)).sum() + + return wrapped + + +def make_intercept_data(N, p, seed=20009): + """Given the desired number of observations (N) and + the desired number of variables (p), creates + random logistic data to test on.""" + + # set the seeds + da.random.seed(seed) + np.random.seed(seed) + + X = np.random.random((N, p + 1)) + col_sums = X.sum(axis=0) + X = X / col_sums[None, :] + X[:, p] = 1 + X = da.from_array(X, chunks=(N / 5, p + 1)) + y = make_y(X, beta=np.random.random(p + 1)) + + return X, y + + +@pytest.mark.parametrize("opt", [lbfgs, newton, gradient_descent]) +@pytest.mark.parametrize( + "N, p, seed,", [(100, 2, 20009), (250, 12, 90210), (95, 6, 70605)] +) +def test_methods(N, p, seed, opt): + X, y = make_intercept_data(N, p, seed=seed) + + coefs, _ = opt(X, y) + p = sigmoid(X.dot(coefs).compute()) + + y_sum = y.compute().sum() + p_sum = p.sum() + assert np.isclose(y_sum, p_sum, atol=1e-1) + + +@pytest.mark.parametrize( + "func,kwargs", + [ + (newton, {"tol": 1e-5, "max_iter": 50}), + (lbfgs, {"tol": 1e-8, "max_iter": 100}), + (gradient_descent, {"tol": 1e-7, "max_iter": 100}), + ], +) +@pytest.mark.parametrize("N", [1000]) +@pytest.mark.parametrize("nchunks", [1, 10]) +@pytest.mark.parametrize("family", [Logistic, Normal, Poisson]) +def test_basic_unreg_descent(func, kwargs, N, nchunks, family): + beta = np.random.normal(size=2) + M = len(beta) + X = da.random.random((N, M), chunks=(N // nchunks, M)) + y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) + + X, y = persist(X, y) + + result, n_iter = func(X, y, family=family, **kwargs) + test_vec = np.random.normal(size=2) + + opt = family.pointwise_loss(result, X, y).compute() + test_val = family.pointwise_loss(test_vec, X, y).compute() + + max_iter = kwargs["max_iter"] + assert n_iter > 0 and n_iter <= max_iter + assert opt < test_val + + +@pytest.mark.parametrize( + "func,kwargs", + [ + (admm, {"abstol": 1e-4, "max_iter": 250}), + (proximal_grad, {"tol": 1e-7, "max_iter": 100}), + ], +) +@pytest.mark.parametrize("N", [1000]) +@pytest.mark.parametrize("nchunks", [1, 10]) +@pytest.mark.parametrize("family", [Logistic, Normal, Poisson]) +@pytest.mark.parametrize("lam", [0.01, 1.2, 4.05]) +@pytest.mark.parametrize("reg", [r() for r in Regularizer.__subclasses__()]) +def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg): + beta = np.random.normal(size=2) + M = len(beta) + X = da.random.random((N, M), chunks=(N // nchunks, M)) + y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,)) + + X, y = persist(X, y) + + result, n_iter = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs) + test_vec = np.random.normal(size=2) + + f = reg.add_reg_f(family.pointwise_loss, lam) + + opt = f(result, X, y).compute() + test_val = f(test_vec, X, y).compute() + + max_iter = kwargs["max_iter"] + assert n_iter > 0 and n_iter <= max_iter + assert opt < test_val + + +@pytest.mark.parametrize( + "func,kwargs", + [ + (admm, {"max_iter": 2}), + (proximal_grad, {"max_iter": 2}), + (newton, {"max_iter": 2}), + (gradient_descent, {"max_iter": 2}), + ], +) +@pytest.mark.parametrize("scheduler", ["synchronous", "threading"]) +def test_determinism(func, kwargs, scheduler): + X, y = make_intercept_data(100, 10) + + with dask.config.set(scheduler=scheduler): + a, n_iter_a = func(X, y, **kwargs) + b, n_iter_b = func(X, y, **kwargs) + + max_iter = kwargs["max_iter"] + assert n_iter_a > 0 and n_iter_a <= max_iter + assert n_iter_b > 0 and n_iter_b <= max_iter + assert (a == b).all() + + +try: + from distributed import Client + from distributed.utils_test import cluster, loop # noqa +except ImportError: + pass +else: + + @pytest.mark.parametrize( + "func,kwargs", + [ + (admm, {"max_iter": 2}), + (proximal_grad, {"max_iter": 2}), + (newton, {"max_iter": 2}), + (gradient_descent, {"max_iter": 2}), + ], + ) + def test_determinism_distributed(func, kwargs, loop): + with cluster() as (s, [a, b]): + with Client(s["address"], loop=loop): + X, y = make_intercept_data(1000, 10) + + a, n_iter_a = func(X, y, **kwargs) + b, n_iter_b = func(X, y, **kwargs) + + max_iter = kwargs["max_iter"] + assert n_iter_a > 0 and n_iter_a <= max_iter + assert n_iter_b > 0 and n_iter_b <= max_iter + assert (a == b).all() + + def broadcast_lbfgs_weight(): + with cluster() as (s, [a, b]): + with Client(s["address"], loop=loop) as c: + X, y = make_intercept_data(1000, 10) + coefs = lbfgs(X, y, dask_distributed_client=c) + p = sigmoid(X.dot(coefs).compute()) + + y_sum = y.compute().sum() + p_sum = p.sum() + assert np.isclose(y_sum, p_sum, atol=1e-1) diff --git a/tests/linear_model/glm/test_regularizers.py b/tests/linear_model/glm/test_regularizers.py new file mode 100644 index 000000000..76ab15e3a --- /dev/null +++ b/tests/linear_model/glm/test_regularizers.py @@ -0,0 +1,185 @@ +import numpy as np +import numpy.testing as npt +import pytest + +from dask_ml.linear_model import regularizers as regs + + +@pytest.mark.parametrize( + "func,args", + [("f", [0]), ("gradient", [0]), ("hessian", [0]), ("proximal_operator", [0, 1])], +) +def test_base_class_raises_notimplementederror(func, args): + with pytest.raises(NotImplementedError): + getattr(regs.Regularizer(), func)(*args) + + +class FooRegularizer(regs.Regularizer): + def f(self, beta): + return beta + 1 + + def gradient(self, beta): + return beta + 1 + + def hessian(self, beta): + return beta + 1 + + +@pytest.mark.parametrize("func", ["add_reg_f", "add_reg_grad", "add_reg_hessian"]) +def test_add_reg_funcs(func): + def foo(x): + return x ** 2 + + new_func = getattr(FooRegularizer(), func)(foo, 1) + assert callable(new_func) + assert new_func(2) == 7 + + +def test_regularizer_get_passes_through_instance(): + x = FooRegularizer() + assert regs.Regularizer.get(x) == x + + +def test_regularizer_get_unnamed_raises(): + with pytest.raises(KeyError): + regs.Regularizer.get("foo") + + +def test_regularizer_gets_from_name(): + class Foo(regs.Regularizer): + name = "foo" + + assert isinstance(regs.Regularizer.get("foo"), Foo) + + +@pytest.mark.parametrize( + "beta,expected", [(np.array([0, 0, 0]), 0), (np.array([1, 2, 3]), 7)] +) +def test_l2_function(beta, expected): + assert regs.L2().f(beta) == expected + + +@pytest.mark.parametrize("beta", [np.array([0, 0, 0]), np.array([1, 2, 3])]) +def test_l2_gradient(beta): + npt.assert_array_equal(regs.L2().gradient(beta), beta) + + +@pytest.mark.parametrize("beta", [np.array([0, 0, 0]), np.array([1, 2, 3])]) +def test_l2_hessian(beta): + npt.assert_array_equal(regs.L2().hessian(beta), np.eye(len(beta))) + + +@pytest.mark.parametrize( + "beta,expected", + [ + (np.array([0, 0, 0]), np.array([0, 0, 0])), + (np.array([1, 2, 3]), np.array([0.5, 1, 1.5])), + ], +) +def test_l2_proximal_operator(beta, expected): + npt.assert_array_equal(regs.L2().proximal_operator(beta, 1), expected) + + +@pytest.mark.parametrize( + "beta,expected", [(np.array([0, 0, 0]), 0), (np.array([-1, 2, 3]), 6)] +) +def test_l1_function(beta, expected): + assert regs.L1().f(beta) == expected + + +@pytest.mark.parametrize( + "beta,expected", + [ + (np.array([1, 2, 3]), np.array([1, 1, 1])), + (np.array([-1, 2, 3]), np.array([-1, 1, 1])), + ], +) +def test_l1_gradient(beta, expected): + npt.assert_array_equal(regs.L1().gradient(beta), expected) + + +@pytest.mark.parametrize( + "beta", + [np.array([0.00000001, 1, 2]), np.array([-0.00000001, 1, 2]), np.array([0, 0, 0])], +) +def test_l1_gradient_raises_near_zero(beta): + with pytest.raises(ValueError): + regs.L1().gradient(beta) + + +def test_l1_hessian(): + npt.assert_array_equal( + regs.L1().hessian(np.array([1, 2])), np.array([[0, 0], [0, 0]]) + ) + + +def test_l1_hessian_raises(): + with pytest.raises(ValueError): + regs.L1().hessian(np.array([0, 0, 0])) + + +@pytest.mark.parametrize( + "beta,expected", + [ + (np.array([0, 0, 0]), np.array([0, 0, 0])), + (np.array([1, 2, 3]), np.array([0, 1, 2])), + ], +) +def test_l1_proximal_operator(beta, expected): + npt.assert_array_equal(regs.L1().proximal_operator(beta, 1), expected) + + +@pytest.mark.parametrize( + "beta,expected", [(np.array([0, 0, 0]), 0), (np.array([1, 2, 3]), 6.5)] +) +def test_elastic_net_function(beta, expected): + assert regs.ElasticNet().f(beta) == expected + + +def test_elastic_net_function_zero_weight_is_l2(): + beta = np.array([1, 2, 3]) + assert regs.ElasticNet(weight=0).f(beta) == regs.L2().f(beta) + + +def test_elastic_net_function_zero_weight_is_l1(): + beta = np.array([1, 2, 3]) + assert regs.ElasticNet(weight=1).f(beta) == regs.L1().f(beta) + + +def test_elastic_net_gradient(): + beta = np.array([1, 2, 3]) + npt.assert_array_equal( + regs.ElasticNet(weight=0.5).gradient(beta), np.array([1, 1.5, 2]) + ) + + +def test_elastic_net_gradient_zero_weight_is_l2(): + beta = np.array([1, 2, 3]) + npt.assert_array_equal( + regs.ElasticNet(weight=0).gradient(beta), regs.L2().gradient(beta) + ) + + +def test_elastic_net_gradient_zero_weight_is_l1(): + beta = np.array([1, 2, 3]) + npt.assert_array_equal( + regs.ElasticNet(weight=1).gradient(beta), regs.L1().gradient(beta) + ) + + +def test_elastic_net_hessian(): + beta = np.array([1, 2, 3]) + npt.assert_array_equal( + regs.ElasticNet(weight=0.5).hessian(beta), + np.eye(len(beta)) * regs.ElasticNet().weight, + ) + + +def test_elastic_net_hessian_raises(): + with pytest.raises(ValueError): + regs.ElasticNet(weight=0.5).hessian(np.array([0, 1, 2])) + + +def test_elastic_net_proximal_operator(): + beta = np.array([1, 2, 3]) + npt.assert_array_equal(regs.ElasticNet(weight=0.5).proximal_operator(beta, 1), beta) diff --git a/tests/linear_model/glm/test_utils.py b/tests/linear_model/glm/test_utils.py new file mode 100644 index 000000000..2d9629db0 --- /dev/null +++ b/tests/linear_model/glm/test_utils.py @@ -0,0 +1,116 @@ +import dask.array as da +import numpy as np +import pytest +from dask.array.utils import assert_eq + +from dask_ml.linear_model import utils + + +@utils.normalize +def do_nothing(X, y): + return np.array([0.0, 1.0, 2.0]), 1 + + +def test_normalize_normalizes(): + X = da.from_array(np.array([[1, 0, 0], [1, 2, 2]]), chunks=(2, 3)) + y = da.from_array(np.array([0, 1, 0]), chunks=(3,)) + res, _ = do_nothing(X, y) + np.testing.assert_equal(res, np.array([-3.0, 1.0, 2.0])) + + +def test_normalize_doesnt_normalize(): + X = da.from_array(np.array([[1, 0, 0], [1, 2, 2]]), chunks=(2, 3)) + y = da.from_array(np.array([0, 1, 0]), chunks=(3,)) + res, _ = do_nothing(X, y, normalize=False) + np.testing.assert_equal(res, np.array([0, 1, 2])) + + +def test_normalize_normalizes_if_intercept_not_present(): + X = da.from_array(np.array([[1, 0, 0], [3, 9.0, 2]]), chunks=(2, 3)) + y = da.from_array(np.array([0, 1, 0]), chunks=(3,)) + res, _ = do_nothing(X, y) + np.testing.assert_equal(res, np.array([0, 1 / 4.5, 2])) + + +def test_normalize_raises_if_multiple_constants(): + X = da.from_array(np.array([[1, 2, 3], [1, 2, 3]]), chunks=(2, 3)) + y = da.from_array(np.array([0, 1, 0]), chunks=(3,)) + with pytest.raises(ValueError): + do_nothing(X, y) + + +def test_add_intercept(): + X = np.zeros((4, 4)) + result = utils.add_intercept(X) + expected = np.array( + [[0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1]], + dtype=X.dtype, + ) + assert_eq(result, expected) + + +def test_add_intercept_dask(): + X = da.from_array(np.zeros((4, 4)), chunks=(2, 4)) + result = utils.add_intercept(X) + expected = da.from_array( + np.array( + [[0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1]], + dtype=X.dtype, + ), + chunks=2, + ) + assert_eq(result, expected) + + +def test_add_intercept_sparse(): + sparse = pytest.importorskip("sparse") + X = sparse.COO(np.zeros((4, 4))) + result = utils.add_intercept(X) + expected = sparse.COO( + np.array( + [[0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1]], + dtype=X.dtype, + ) + ) + assert (result == expected).all() + + +def test_add_intercept_sparse_dask(): + sparse = pytest.importorskip("sparse") + X = da.from_array(sparse.COO(np.zeros((4, 4))), chunks=(2, 4)) + result = utils.add_intercept(X) + expected = da.from_array( + sparse.COO( + np.array( + [[0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1]], + dtype=X.dtype, + ) + ), + chunks=2, + ) + assert_eq(result, expected) + + +def test_sparse(): + sparse = pytest.importorskip("sparse") + x = sparse.COO({(0, 0): 1, (1, 2): 2, (2, 1): 3}) + y = x.todense() + assert np.sum(x) == np.sum(x.todense()) + for func in [utils.sigmoid, utils.exp]: + assert (func(x) == func(y)).all() + + +def test_dask_array_is_sparse(): + sparse = pytest.importorskip("sparse") + assert utils.is_dask_array_sparse(da.from_array(sparse.COO([], [], shape=(10, 10)))) + assert utils.is_dask_array_sparse(da.from_array(sparse.eye(10))) + assert not utils.is_dask_array_sparse(da.from_array(np.eye(10))) + + +@pytest.mark.xfail( + reason="dask does not forward DOK in _meta " + "(https://github.com/pydata/sparse/issues/292)" +) +def test_dok_dask_array_is_sparse(): + sparse = pytest.importorskip("sparse") + assert utils.is_dask_array_sparse(da.from_array(sparse.DOK((10, 10)))) diff --git a/tests/linear_model/test_glm.py b/tests/linear_model/test_glm.py index 35e3a353a..2e7f346a9 100644 --- a/tests/linear_model/test_glm.py +++ b/tests/linear_model/test_glm.py @@ -4,11 +4,11 @@ import pandas as pd import pytest from dask.dataframe.utils import assert_eq -from dask_glm.regularizers import Regularizer from sklearn.pipeline import make_pipeline from dask_ml.datasets import make_classification, make_counts, make_regression from dask_ml.linear_model import LinearRegression, LogisticRegression, PoissonRegression +from dask_ml.linear_model.regularizers import Regularizer from dask_ml.linear_model.utils import add_intercept from dask_ml.model_selection import GridSearchCV @@ -25,7 +25,7 @@ def regularizer(request): return request.param -class DoNothingTransformer: +class DoNothingTransformer(object): def fit(self, X, y=None): return self @@ -51,32 +51,21 @@ def test_pr_init(solver): @pytest.mark.parametrize("fit_intercept", [True, False]) -def test_fit(fit_intercept, solver): - X, y = make_classification(n_samples=100, n_features=5, chunks=50) +@pytest.mark.parametrize("is_sparse", [True, False]) +def test_fit(fit_intercept, is_sparse): + X, y = make_classification( + n_samples=100, n_features=5, chunks=10, is_sparse=is_sparse + ) lr = LogisticRegression(fit_intercept=fit_intercept) lr.fit(X, y) lr.predict(X) lr.predict_proba(X) -@pytest.mark.parametrize( - "solver", ["admm", "newton", "lbfgs", "proximal_grad", "gradient_descent"] -) -def test_fit_solver(solver): - import dask_glm - from distutils.version import LooseVersion - - if LooseVersion(dask_glm.__version__) <= "0.2.0": - pytest.skip("FutureWarning for dask config.") - - X, y = make_classification(n_samples=100, n_features=5, chunks=50) - lr = LogisticRegression(solver=solver) - lr.fit(X, y) - - @pytest.mark.parametrize("fit_intercept", [True, False]) -def test_lm(fit_intercept): - X, y = make_regression(n_samples=100, n_features=5, chunks=50) +@pytest.mark.parametrize("is_sparse", [True, False]) +def test_lm(fit_intercept, is_sparse): + X, y = make_regression(n_samples=100, n_features=5, chunks=10, is_sparse=is_sparse) lr = LinearRegression(fit_intercept=fit_intercept) lr.fit(X, y) lr.predict(X) @@ -85,8 +74,9 @@ def test_lm(fit_intercept): @pytest.mark.parametrize("fit_intercept", [True, False]) -def test_big(fit_intercept): - X, y = make_classification(chunks=50) +@pytest.mark.parametrize("is_sparse", [True, False]) +def test_big(fit_intercept, is_sparse): + X, y = make_classification(chunks=50, is_sparse=is_sparse) lr = LogisticRegression(fit_intercept=fit_intercept) lr.fit(X, y) lr.predict(X) @@ -96,9 +86,11 @@ def test_big(fit_intercept): @pytest.mark.parametrize("fit_intercept", [True, False]) -def test_poisson_fit(fit_intercept): - X, y = make_counts(n_samples=100, chunks=500) - pr = PoissonRegression(fit_intercept=fit_intercept) +@pytest.mark.parametrize("is_sparse", [True, False]) +def test_poisson_fit(fit_intercept, is_sparse): + # XXX: this seems to take forever to converge. Setting a low max_iter for now. + X, y = make_counts(chunks=200, is_sparse=is_sparse) + pr = PoissonRegression(fit_intercept=fit_intercept, max_iter=5) pr.fit(X, y) pr.predict(X) pr.get_deviance(X, y) @@ -107,14 +99,15 @@ def test_poisson_fit(fit_intercept): def test_in_pipeline(): - X, y = make_classification(n_samples=100, n_features=5, chunks=50) + X, y = make_classification(n_samples=100, n_features=5, chunks=10) pipe = make_pipeline(DoNothingTransformer(), LogisticRegression()) pipe.fit(X, y) +@pytest.mark.xfail(reason="GridSearch dask objects") def test_gridsearch(): - X, y = make_classification(n_samples=100, n_features=5, chunks=50) - grid = {"logisticregression__C": [1000, 100, 10, 2]} + X, y = make_classification(n_samples=100, n_features=5, chunks=10) + grid = {"logisticregression__lamduh": [0.001, 0.01, 0.1, 0.5]} pipe = make_pipeline(DoNothingTransformer(), LogisticRegression()) search = GridSearchCV(pipe, grid, cv=3) search.fit(X, y) diff --git a/tests/preprocessing/test_encoders.py b/tests/preprocessing/test_encoders.py index 430100b1d..94b468af5 100644 --- a/tests/preprocessing/test_encoders.py +++ b/tests/preprocessing/test_encoders.py @@ -8,7 +8,7 @@ import sklearn.preprocessing import dask_ml.preprocessing -from dask_ml._compat import DASK_240, PANDAS_VERSION +from dask_ml._compat import DASK_240, DASK_2200, PANDAS_VERSION from dask_ml.utils import assert_estimator_equal X = np.array([["a"], ["a"], ["b"], ["c"]]) @@ -21,7 +21,8 @@ @pytest.mark.parametrize("method", ["fit", "fit_transform"]) @pytest.mark.parametrize("categories", ["auto", [["a", "b", "c"]]]) @pytest.mark.xfail( - condition=DASK_240, reason="https://github.com/dask/dask/issues/5008" + condition=(DASK_240 and not DASK_2200), + reason="https://github.com/dask/dask/issues/5008", ) def test_basic_array(sparse, method, categories): a = sklearn.preprocessing.OneHotEncoder(categories=categories, sparse=sparse) @@ -157,7 +158,8 @@ def test_unknown_category_transform(): @pytest.mark.xfail( - condition=DASK_240, reason="https://github.com/dask/dask/issues/5008" + condition=(DASK_240 and not DASK_2200), + reason="https://github.com/dask/dask/issues/5008", ) def test_unknown_category_transform_array(): x2 = da.from_array(np.array([["a"], ["b"], ["c"], ["d"]]), chunks=2) diff --git a/tests/test_kmeans.py b/tests/test_kmeans.py index c8fa3d789..9ef88ae8c 100644 --- a/tests/test_kmeans.py +++ b/tests/test_kmeans.py @@ -55,6 +55,7 @@ def test_fit_raises(): class TestKMeans: + @pytest.mark.filterwarnings("ignore:no implementation found") def test_basic(self, Xl_blobs_easy): X, _ = Xl_blobs_easy @@ -162,6 +163,7 @@ def test_inputs(self, X): km.fit(X) km.transform(X) + @pytest.mark.filterwarnings("ignore:no implementation found") def test_dtypes(self): X = da.random.uniform(size=(100, 2), chunks=(50, 2)) X2 = X.astype("f4") diff --git a/tests/test_parallel_post_fit.py b/tests/test_parallel_post_fit.py index 406d2c8e7..3e548e34d 100644 --- a/tests/test_parallel_post_fit.py +++ b/tests/test_parallel_post_fit.py @@ -81,6 +81,7 @@ def test_predict(kind): @pytest.mark.parametrize("kind", ["numpy", "dask.dataframe", "dask.array"]) +@pytest.mark.filterwarnings("ignore:no implementation found") def test_transform(kind): X, y = make_classification(chunks=100) @@ -130,6 +131,7 @@ def test_multiclass(): assert_eq_ar(result, expected) +@pytest.mark.filterwarnings("ignore:no implementation found") def test_auto_rechunk(): clf = ParallelPostFit(GradientBoostingClassifier()) X, y = make_classification(n_samples=1000, n_features=20, chunks=100) diff --git a/tests/test_pca.py b/tests/test_pca.py index 532b008c7..a36528fc0 100644 --- a/tests/test_pca.py +++ b/tests/test_pca.py @@ -113,6 +113,7 @@ def test_no_empty_slice_warning(): assert len(w) == 0 +@pytest.mark.filterwarnings("ignore:no implementation found") def test_whitening(): # Check that PCA output has unit-variance rng = np.random.RandomState(0) @@ -324,6 +325,7 @@ def test_pca_check_projection(): assert_almost_equal(np.abs(Yt[0][0]), 1.0, 1) +@pytest.mark.filterwarnings("ignore:no implementation found") def test_pca_inverse(): # Test that the projection of data can be inverted rng = np.random.RandomState(0) @@ -389,6 +391,7 @@ def test_n_components_none(): assert pca.n_components_ == min(data.shape) +@pytest.mark.filterwarnings("ignore:no implementation found") def test_randomized_pca_check_projection(): # Test that the projection by randomized PCA on dense data is correct rng = np.random.RandomState(0) @@ -534,6 +537,7 @@ def test_infer_dim_by_explained_variance(): assert pca.n_components_ == 2 +@pytest.mark.filterwarnings("ignore:no implementation found") def test_pca_score(): # Test that probabilistic PCA scoring yields a reasonable score n, p = 1000, 3 @@ -585,6 +589,7 @@ def test_pca_score3(): assert ll.argmax() == 1 +@pytest.mark.filterwarnings("ignore:no implementation found") def test_pca_score_with_different_solvers(): digits = datasets.load_digits() X_digits = digits.data diff --git a/tests/test_spectral_clustering.py b/tests/test_spectral_clustering.py index e07c5c38c..f1f9f327a 100644 --- a/tests/test_spectral_clustering.py +++ b/tests/test_spectral_clustering.py @@ -28,6 +28,7 @@ def test_basic(as_ndarray, persist_embedding): @pytest.mark.parametrize( "assign_labels", [sklearn.cluster.KMeans(n_init=2), "sklearn-kmeans"] ) +@pytest.mark.filterwarnings("ignore:no implementation found") def test_sklearn_kmeans(assign_labels): sc = SpectralClustering( n_components=25,