diff --git a/desdeo/problem/__init__.py b/desdeo/problem/__init__.py index 0004649e..2b8e77e8 100644 --- a/desdeo/problem/__init__.py +++ b/desdeo/problem/__init__.py @@ -1,6 +1,7 @@ """Imports available from the desdeo-problem package.""" __all__ = [ + "CVXPYEvaluator", "Constant", "Constraint", "ConstraintTypeEnum", @@ -40,6 +41,7 @@ ] +from .cvxpy_evaluator import CVXPYEvaluator from .evaluator import ( PolarsEvaluator, PolarsEvaluatorModesEnum, diff --git a/desdeo/problem/cvxpy_evaluator.py b/desdeo/problem/cvxpy_evaluator.py new file mode 100644 index 00000000..090297ac --- /dev/null +++ b/desdeo/problem/cvxpy_evaluator.py @@ -0,0 +1,562 @@ +"""Defines an evaluator compatible with the Problem JSON format and transforms it into a CVXPY problem.""" + +import warnings + +import cvxpy as cp +import numpy as np + +from desdeo.problem.json_parser import FormatEnum, MathParser +from desdeo.problem.schema import ( + Constant, + Constraint, + ConstraintTypeEnum, + Objective, + Problem, + ScalarizationFunction, + TensorConstant, + TensorVariable, + Variable, + VariableTypeEnum, +) + + +class CVXPYEvaluatorError(Exception): + """Raised when an error within the CVXPYEvaluator class is encountered.""" + + +class CVXPYEvaluatorWarning(UserWarning): + """Raised when the problem contains features that are poorly supported in cvxpy.""" + + +class CVXPYEvaluator: + """Defines an evaluator that transforms an instance of Problem into a CVXPY problem.""" + + variables: dict[str, cp.Variable] + parameters: dict[str, cp.Parameter] + constraints: dict[str, cp.Constraint] + objective_functions: dict[str, cp.Expression] + scalarizations: dict[str, cp.Expression] + extra_functions: dict[str, cp.Expression] + constants: dict[str, int | float | list[int] | list[float]] + + problem_model: cp.Problem + objective_expr: cp.Expression | None + + def __init__(self, problem: Problem): + """Initialized the evaluator. + + Args: + problem (Problem): the problem to be transformed into a CVXPY problem. + """ + self.objective_functions = {} + self.scalarizations = {} + self.extra_functions = {} + self.constants = {} + self.variables = {} + self.parameters = {} + self.constraints = {} + self.objective_expr = None + self.problem_model = None + + # set the parser + self.parse = MathParser(to_format=FormatEnum.cvxpy).parse + + # Add variables + self.variables = self.init_variables(problem) + + # Add constants as parameters, if any + if problem.constants is not None: + self.parameters, self.constants = self.init_parameters(problem) + + # Add extra expressions, if any + if problem.extra_funcs is not None: + self.extra_functions = self.init_extras(problem) + + # Add objective function expressions + self.objective_functions = self.init_objectives(problem) + + # Add constraints, if any + if problem.constraints is not None: + self.constraints = self.init_constraints(problem) + + # Add scalarization functions, if any + if problem.scalarization_funcs is not None: + self.scalarizations = self.init_scalarizations(problem) + + self.problem = problem + + def init_variables(self, problem: Problem) -> dict[str, cp.Variable]: + """Add variables to the CVXPY problem. + + Args: + problem (Problem): problem from which to extract the variables. + + Raises: + CVXPYEvaluatorError: when a problem in extracting the variables is encountered. + I.e., the variables are of a non supported type. + + Returns: + dict[str, cp.Variable]: the variables for the problem. + """ + variables = {} + for var in problem.variables: + if isinstance(var, Variable): + # handle regular variables + lowerbound = var.lowerbound + upperbound = var.upperbound + + # Set bounds + bounds = None + if lowerbound is not None or upperbound is not None: + lb = lowerbound if lowerbound is not None else -np.inf + ub = upperbound if upperbound is not None else np.inf + bounds = [lb, ub] + + # figure out the variable type + match var.variable_type: + case VariableTypeEnum.integer: + # variable is integer + cv_var = cp.Variable(integer=True, bounds=bounds, name=var.symbol) + case VariableTypeEnum.real: + # variable is real + cv_var = cp.Variable(bounds=bounds, name=var.symbol) + case VariableTypeEnum.binary: + cv_var = cp.Variable(boolean=True, bounds=bounds, name=var.symbol) + case _: + msg = f"Could not figure out the type for variable {var}." + raise CVXPYEvaluatorError(msg) + + variables[var.symbol] = cv_var + + elif isinstance(var, TensorVariable): + # handle tensor variables, i.e., vectors etc.. + lowerbounds = var.get_lowerbound_values() if var.lowerbounds is not None else None + upperbounds = var.get_upperbound_values() if var.upperbounds is not None else None + + # Set bounds + bounds = None + if lowerbounds is not None or upperbounds is not None: + lb = np.array(lowerbounds) if lowerbounds is not None else -np.inf + ub = np.array(upperbounds) if upperbounds is not None else np.inf + bounds = [lb, ub] + + # figure out the variable type + match var.variable_type: + case VariableTypeEnum.integer: + # variable is integer + cv_var = cp.Variable(shape=tuple(var.shape), integer=True, bounds=bounds, name=var.symbol) + case VariableTypeEnum.real: + # variable is real + cv_var = cp.Variable(shape=tuple(var.shape), bounds=bounds, name=var.symbol) + case VariableTypeEnum.binary: + cv_var = cp.Variable(shape=tuple(var.shape), boolean=True, bounds=bounds, name=var.symbol) + case _: + msg = f"Could not figure out the type for variable {var}." + raise CVXPYEvaluatorError(msg) + + variables[var.symbol] = cv_var + + return variables + + def init_parameters( + self, problem: Problem + ) -> tuple[dict[str, cp.Parameter], dict[str, int | float | list[int] | list[float]]]: + """Add constants as CVXPY parameters. + + CVXPY uses parameters for values that can be changed without redefining the problem. + This allows constants to be updated between solves. + + Args: + problem (Problem): problem from which to extract the constants. + + Raises: + CVXPYEvaluatorError: when the domain of a constant cannot be figured out. + + Returns: + tuple: a tuple containing (parameters dict, constants dict). + """ + parameters: dict[str, cp.Parameter] = {} + constants: dict[str, int | float | list[int] | list[float]] = {} + + for con in problem.constants: + if isinstance(con, Constant): + param = cp.Parameter(name=con.symbol, nonneg=con.value >= 0) + param.value = con.value + parameters[con.symbol] = param + constants[con.symbol] = con.value + elif isinstance(con, TensorConstant): + values = con.get_values() + param = cp.Parameter(shape=np.array(values).shape, name=con.symbol) + param.value = np.array(values) + parameters[con.symbol] = param + constants[con.symbol] = values + + return parameters, constants + + def init_extras(self, problem: Problem) -> dict[str, cp.Expression]: + """Add extra function expressions to a CVXPY evaluator. + + Args: + problem (Problem): problem from which the extract the extra function expressions. + + Returns: + dict[str, cp.Expression]: a dict containing the extra function expressions. + """ + extra_functions: dict[str, cp.Expression] = {} + + for extra in problem.extra_funcs: + extra_functions[extra.symbol] = self.parse(extra.func, callback=self.get_expression_by_name) + + return extra_functions + + def init_objectives(self, problem: Problem) -> dict[str, cp.Expression]: + """Add objective function expressions to a CVXPY evaluator. + + Does not yet add any actual CVXPY optimization objectives, only creates a dict containing the + expressions of the objectives. + + Args: + problem (Problem): problem from which to extract the objective function expresions. + + Returns: + dict[str, cp.Expression]: dict containing the objective functions. + """ + objective_functions: dict[str, cp.Expression] = {} + for obj in problem.objectives: + expr = self.parse(obj.func, callback=self.get_expression_by_name) + if isinstance(expr, (int, float)): + warnings.warn( + "One or more of the problem objectives seems to be a constant.", + CVXPYEvaluatorWarning, + stacklevel=2, + ) + + objective_functions[obj.symbol] = expr + + # the obj.symbol_min objectives are used when optimizing and building scalarizations etc... + objective_functions[f"{obj.symbol}_min"] = -expr if obj.maximize else expr + + return objective_functions + + def init_constraints(self, problem: Problem) -> dict[str, cp.Constraint]: + """Add constraint expressions to a CVXPY problem. + + Args: + problem (Problem): the problem from which to extract the constraint function expressions. + + Raises: + CVXPYEvaluatorError: when an unsupported constraint type is encountered. + + Returns: + dict[str, cp.Constraint]: dict of constraints keyed by symbol. + """ + constraints = {} + for cons in problem.constraints: + expr = self.parse(cons.func, callback=self.get_expression_by_name) + + match con_type := cons.cons_type: + case ConstraintTypeEnum.LTE: + # constraints in DESDEO are defined such that they must be less than zero + constraint = expr <= 0 + case ConstraintTypeEnum.EQ: + constraint = expr == 0 + case _: + msg = f"Constraint type of {con_type} not supported. Must be one of {ConstraintTypeEnum}." + raise CVXPYEvaluatorError(msg) + + constraints[cons.symbol] = constraint + + return constraints + + def init_scalarizations(self, problem: Problem) -> dict[str, cp.Expression]: + """Add scalarization expressions to a CVXPY evaluator. + + Scalarizations work identically to objectives, except they are stored in a different + dict in the CVXPYEvaluator. + + Args: + problem (Problem): the problem from which to extract the scalarization function expressions. + + Returns: + dict[str, cp.Expression]: the dict with the scalarization expressions. + """ + scalarizations: dict[str, cp.Expression] = {} + + for scal in problem.scalarization_funcs: + scalarizations[scal.symbol] = self.parse(scal.func, self.get_expression_by_name) + + return scalarizations + + def add_constraint(self, constraint: Constraint) -> cp.Constraint: + """Add a constraint expression to the CVXPY problem. + + Args: + constraint (Constraint): the constraint function expression. + + Raises: + CVXPYEvaluatorError: when an unsupported constraint type is encountered. + + Returns: + cp.Constraint: The CVXPY constraint that was added. + """ + expr = self.parse(constraint.func, self.get_expression_by_name) + + match con_type := constraint.cons_type: + case ConstraintTypeEnum.LTE: + # constraints in DESDEO are defined such that they must be less than zero + cvxpy_constraint = expr <= 0 + case ConstraintTypeEnum.EQ: + cvxpy_constraint = expr == 0 + case _: + msg = f"Constraint type of {con_type} not supported. Must be one of {ConstraintTypeEnum}." + raise CVXPYEvaluatorError(msg) + + self.constraints[constraint.symbol] = cvxpy_constraint + return cvxpy_constraint + + def add_objective(self, obj: Objective): + """Adds an objective function expression to the CVXPY evaluator. + + Does not yet add any actual CVXPY optimization objectives, only adds them to the dict + containing the expressions of the objectives. + + Args: + obj (Objective): the objective function expression to be added. + """ + expr = self.parse(obj.func, self.get_expression_by_name) + if isinstance(expr, (int, float)): + warnings.warn( + "One or more of the problem objectives seems to be a constant.", + CVXPYEvaluatorWarning, + stacklevel=2, + ) + + self.objective_functions[obj.symbol] = expr + + # the obj.symbol_min objectives are used when optimizing and building scalarizations etc... + self.objective_functions[f"{obj.symbol}_min"] = -expr if obj.maximize else expr + + def add_scalarization_function(self, scal: ScalarizationFunction): + """Adds a scalarization expression to the CVXPY evaluator. + + Scalarizations work identically to objectives, except they are stored in a different + dict in the CVXPYEvaluator. + + Args: + scal (ScalarizationFunction): The scalarization function to be added. + """ + self.scalarizations[scal.symbol] = self.parse(scal.func, self.get_expression_by_name) + + def add_variable(self, var: Variable | TensorVariable) -> cp.Variable: + """Add a variable to the CVXPY evaluator. + + Args: + var (Variable | TensorVariable): The definition of the variable to be added. + + Raises: + CVXPYEvaluatorError: when a problem in extracting the variables is encountered. + + Returns: + cp.Variable: the variable that was added. + """ + if isinstance(var, Variable): + # handle regular variables + lowerbound = var.lowerbound + upperbound = var.upperbound + + # Set bounds + bounds = None + if lowerbound is not None or upperbound is not None: + lb = lowerbound if lowerbound is not None else -cp.inf + ub = upperbound if upperbound is not None else cp.inf + bounds = (lb, ub) + + # figure out the variable type + match var.variable_type: + case VariableTypeEnum.integer: + # variable is integer + cv_var = cp.Variable(integer=True, bounds=bounds, name=var.symbol) + case VariableTypeEnum.real: + # variable is real + cv_var = cp.Variable(bounds=bounds, name=var.symbol) + case VariableTypeEnum.binary: + cv_var = cp.Variable(boolean=True, bounds=bounds, name=var.symbol) + case _: + msg = f"Could not figure out the type for variable {var}." + raise CVXPYEvaluatorError(msg) + + elif isinstance(var, TensorVariable): + # handle tensor variables, i.e., vectors etc.. + lowerbounds = var.get_lowerbound_values() if var.lowerbounds is not None else None + upperbounds = var.get_upperbound_values() if var.upperbounds is not None else None + + # Set bounds + bounds = None + if lowerbounds is not None or upperbounds is not None: + lb = np.array(lowerbounds) if lowerbounds is not None else -np.inf + ub = np.array(upperbounds) if upperbounds is not None else np.inf + bounds = (lb, ub) + + # figure out the variable type + match var.variable_type: + case VariableTypeEnum.integer: + # variable is integer + cv_var = cp.Variable(shape=tuple(var.shape), integer=True, bounds=bounds, name=var.symbol) + case VariableTypeEnum.real: + # variable is real + cv_var = cp.Variable(shape=tuple(var.shape), bounds=bounds, name=var.symbol) + case VariableTypeEnum.binary: + cv_var = cp.Variable(shape=tuple(var.shape), boolean=True, bounds=bounds, name=var.symbol) + case _: + msg = f"Could not figure out the type for variable {var}." + raise CVXPYEvaluatorError(msg) + + self.variables[var.symbol] = cv_var + return cv_var + + def get_expression_by_name(self, name: str) -> cp.Expression | np.ndarray | int | float: + """Returns a CVXPY expression corresponding to the name. + + Looks for variables, parameters, objective functions, scalarizations, and extra functions. + + Args: + name (str): The symbol of the expression. + + Returns: + cp.Expression | np.ndarray | int | float: A mathematical expression that CVXPY can use. + """ + if name in self.variables: + expression = self.variables[name] + elif name in self.parameters: + expression = self.parameters[name] + elif name in self.objective_functions: + expression = self.objective_functions[name] + elif name in self.scalarizations: + expression = self.scalarizations[name] + elif name in self.extra_functions: + expression = self.extra_functions[name] + elif name in self.constants: + if isinstance(self.constants[name], list): + expression = np.array(self.constants[name]) + else: + expression = self.constants[name] + else: + msg = f"No expression with name {name} found in the CVXPY model." + raise CVXPYEvaluatorError(msg) + return expression + + def get_values(self) -> dict[str, float | int | bool | list[float] | list[int] | np.ndarray]: + """Get the values from the CVXPY solution in a dict. + + The keys of the dict will be the symbols defined in the problem utilized to initialize the evaluator. + Can only be called after the problem has been solved. + + Returns: + dict: a dict with keys equivalent to the symbols defined in self.problem. + + Raises: + CVXPYEvaluatorError: if the problem has not been solved yet. + """ + if self.problem_model is None or self.problem_model.status not in ["optimal", "optimal_inaccurate"]: + msg = "Problem has not been solved yet or did not achieve optimal status." + raise CVXPYEvaluatorError(msg) + + result_dict = {} + + for var in self.problem.variables: + value = self.variables[var.symbol].value + if value is None: + msg = f"Variable {var.symbol} has not been solved yet." + raise CVXPYEvaluatorError(msg) + result_dict[var.symbol] = value + + for obj in self.problem.objectives: + result_dict[obj.symbol] = self.objective_functions[obj.symbol].value + + if self.problem.constants is not None: + for con in self.problem.constants: + result_dict[con.symbol] = self.constants[con.symbol] + + if self.problem.extra_funcs is not None: + for extra in self.problem.extra_funcs: + result_dict[extra.symbol] = self.extra_functions[extra.symbol].value + + if self.problem.scalarization_funcs is not None: + for scal in self.problem.scalarization_funcs: + result_dict[scal.symbol] = self.scalarizations[scal.symbol].value + + if self.problem.constraints is not None: + for con in self.problem.constraints: + constraint_expr = self.parse(con.func, callback=self.get_expression_by_name) + result_dict[con.symbol] = constraint_expr.value + + return result_dict + + def set_optimization_target(self, target: str): + """Sets an optimization objective for the CVXPY problem. + + Args: + target (str): a str representing a symbol. Needs to match an objective function or scalarization + function already found in the evaluator. + maximize (bool): If true, the target function is maximized instead of minimized. + + Raises: + CVXPYEvaluatorError: the given target was not found in the evaluator. + """ + if target in self.objective_functions: + objective = self.problem.get_objective(symbol=target, copy=False) + maximize = False if objective is None else bool(objective.maximize) + elif target in self.scalarizations: + maximize = False + else: + msg = f"The gurobipy model has no objective or scalarization named {target}." + raise CVXPYEvaluatorError(msg) + + obj_expr = self.get_expression_by_name(target) + + objective = cp.Maximize(obj_expr) if maximize else cp.Minimize(obj_expr) + + self.objective_expr = objective + self.problem_model = cp.Problem(objective, list(self.constraints.values())) + + def solve(self, **kwargs): + """Solve the CVXPY problem. + + Args: + **kwargs: additional arguments to pass to cp.Problem.solve(). + """ + if self.problem_model is None: + msg = "No optimization target has been set. Call set_optimization_target() first." + raise CVXPYEvaluatorError(msg) + + if self.problem_model.is_dcp(): + self.problem_model.solve(**kwargs) + elif self.problem_model.is_dgp(): + self.problem_model.solve(gp=True, **kwargs) + else: + warnings.warn( + "The problem does not appear to be DCP or DGP. CVXPY may not be able to solve it.", + CVXPYEvaluatorWarning, + stacklevel=2, + ) + self.problem_model.solve(**kwargs) + + def update_parameter(self, symbol: str, value: int | float | list[int] | list[float] | np.ndarray): + """Update the value of a parameter (constant). + + Args: + symbol (str): the symbol of the parameter to update. + value: the new value for the parameter. + + Raises: + CVXPYEvaluatorError: if the parameter does not exist. + """ + if symbol not in self.parameters: + msg = f"Parameter {symbol} not found in the evaluator." + raise CVXPYEvaluatorError(msg) + + self.parameters[symbol].value = value + if isinstance(value, (list, np.ndarray)): + self.constants[symbol] = list(value) if isinstance(value, np.ndarray) else value + else: + self.constants[symbol] = value diff --git a/desdeo/problem/gurobipy_evaluator.py b/desdeo/problem/gurobipy_evaluator.py index 968e71bc..6cc25074 100644 --- a/desdeo/problem/gurobipy_evaluator.py +++ b/desdeo/problem/gurobipy_evaluator.py @@ -578,7 +578,12 @@ def set_optimization_target(self, target: str, maximize: bool = False): Raises: GurobipyEvaluatorError: the given target was not an attribute of the gurobipy model. """ - if not ((target in self.objective_functions) or (target in self.scalarizations)): + if target in self.objective_functions: + objective = self.problem.get_objective(symbol=target, copy=False) + maximize = False if objective is None else bool(objective.maximize) + elif target in self.scalarizations: + maximize = False + else: msg = f"The gurobipy model has no objective or scalarization named {target}." raise GurobipyEvaluatorError(msg) diff --git a/desdeo/problem/json_parser.py b/desdeo/problem/json_parser.py index 13ef96f9..35285308 100644 --- a/desdeo/problem/json_parser.py +++ b/desdeo/problem/json_parser.py @@ -4,6 +4,7 @@ from enum import Enum from functools import reduce +import cvxpy as cp import gurobipy as gp import numpy as np import polars as pl @@ -14,6 +15,8 @@ # Mathematical objects in gurobipy can take many types gpexpression = gp.Var | gp.MVar | gp.LinExpr | gp.QuadExpr | gp.MLinExpr | gp.MQuadExpr | gp.GenExpr +# Mathematical objects in cvxpy can take many types +cvxpyexpression = cp.Variable | cp.Expression | cp.Constant | int | float class FormatEnum(str, Enum): @@ -23,6 +26,7 @@ class FormatEnum(str, Enum): pyomo = "pyomo" sympy = "sympy" gurobipy = "gurobipy" + cvxpy = "cvxpy" class ParserError(Exception): @@ -683,6 +687,86 @@ def _gurobipy_random_access(*args): self.MIN: lambda *args: gp.min_(args), } + def _cvxpy_error(): + msg = "The cvxpy model format only supports linear and quadratic expressions." + return lambda x: (_ for _ in ()).throw(ParserError(msg)) + + def _cvxpy_matmul(*args): + """CVXPY matrix multiplication.""" + + def _matmul(a, b): + if isinstance(a, list): + a = np.array(a) + if isinstance(b, list): + b = np.array(b) + if len(np.shape(a @ b)) == 1: + return a @ b + return (a @ b).sum() + + return reduce(_matmul, args) + + def _cvxpy_summation(summand): + """CVXPY matrix summation.""" + + def _sum(summand): + if isinstance(summand, list): + summand = np.array(summand) + return cp.sum(summand) + + return _sum(summand) + + def _cvxpy_random_access(*args): + msg = ( + "Tensor random access with 'At' has not been implemented for the CVXPY parser yet. " + "Feel free to contribute!" + ) + raise NotImplementedError(msg) + + cvxpy_env = { + # Define the operations for the different operators. + # Basic arithmetic operations + self.NEGATE: lambda x: -x, + self.ADD: lambda *args: reduce(lambda x, y: x + y, args), + self.SUB: lambda *args: reduce(lambda x, y: x - y, args), + self.MUL: lambda *args: reduce(lambda x, y: x * y, args), + self.DIV: lambda *args: reduce(lambda x, y: x / y, args), + # Vector and matrix operations + self.MATMUL: _cvxpy_matmul, + self.SUM: _cvxpy_summation, + self.RANDOM_ACCESS: _cvxpy_random_access, + # Exponentiation and logarithms + # CVXPY supports some of these via special functions, but with restrictions + self.EXP: lambda x: cp.exp(x), + self.LN: lambda x: cp.log(x), + self.LB: lambda x: cp.log(x) / np.log(2), + self.LG: lambda x: cp.log(x) / np.log(10), + self.LOP: lambda x: cp.log1p(x), + self.SQRT: lambda x: cp.sqrt(x), + self.SQUARE: lambda x: cp.square(x), + self.POW: lambda x, y: x**y, # may cause errors for non-integer powers + # Trigonometric operations - CVXPY supports these + self.ARCCOS: lambda x: cp.arccos(x), + self.ARCCOSH: lambda x: cp.arccosh(x), + self.ARCSIN: lambda x: cp.arcsin(x), + self.ARCSINH: lambda x: cp.arcsinh(x), + self.ARCTAN: lambda x: cp.arctan(x), + self.ARCTANH: lambda x: cp.arctanh(x), + self.COS: lambda x: cp.cos(x), + self.COSH: lambda x: cp.cosh(x), + self.SIN: lambda x: cp.sin(x), + self.SINH: lambda x: cp.sinh(x), + self.TAN: lambda x: cp.tan(x), + self.TANH: lambda x: cp.tanh(x), + # Rounding operations + self.ABS: lambda x: cp.abs(x), + self.CEIL: lambda x: cp.ceil(x), + self.FLOOR: lambda x: cp.floor(x), + # Other operations + self.RATIONAL: lambda lst: reduce(lambda x, y: x / y, lst), + self.MAX: lambda *args: cp.max(cp.stack(args, axis=0), axis=0), + self.MIN: lambda *args: cp.min(cp.stack(args, axis=0), axis=0), + } + match to_format: case FormatEnum.polars: self.env = polars_env @@ -696,6 +780,9 @@ def _gurobipy_random_access(*args): case FormatEnum.gurobipy: self.env = gurobipy_env self.parse = self._parse_to_gurobipy + case FormatEnum.cvxpy: + self.env = cvxpy_env + self.parse = self._parse_to_cvxpy case _: msg = f"Given target format {to_format} not supported. Must be one of {FormatEnum}." raise ParserError(msg) @@ -913,6 +1000,56 @@ def _parse_to_gurobipy( msg = f"Encountered unsupported type '{type(expr)}' during parsing." raise ParserError(msg) + def _parse_to_cvxpy( + self, expr: list | str | int | float, callback: Callable[[str], cvxpyexpression] + ) -> cvxpyexpression: + """Parses the MathJSON format recursively into a CVXPY expression. + + CVXPY supports a much broader range of expressions compared to gurobipy, including + exponentials, logarithms, and trigonometric functions. However, some operations still have + restrictions due to DCP (Disciplined Convex Programming) rules. + + Args: + expr (list | str | int | float): a list with a Polish notation expression that describes a, e.g., + ["Multiply", ["Sqrt", 2], "x2"] + callback (Callable): A function that can return a CVXPY expression associated with the + correct model when called with symbol str. + + Returns: + Returns a CVXPY expression equivalent to the original expression. + """ + if isinstance(expr, (cp.Variable, cp.Parameter, cp.Expression)): + # Terminal case: cvxpy expression + return expr + if isinstance(expr, str): + # Terminal case: str expression, represent a variable or expression + return callback(expr) + if isinstance(expr, self.literals): + # Terminal case: numeric literal + return expr + + if isinstance(expr, list): + # Extract the operation name + if isinstance(expr[0], str) and expr[0] in self.env: + op_name = expr[0] + # Parse the operands + operands = [self._parse_to_cvxpy(e, callback) for e in expr[1:]] + + while isinstance(operands, list) and len(operands) == 1: + # if the operands have redundant brackets, remove them + operands = operands[0] + + if isinstance(operands, list): + return self.env[op_name](*operands) + + return self.env[op_name](operands) + + # else, assume the list contents are parseable expressions + return [self._parse_to_cvxpy(e, callback) for e in expr] + + msg = f"Encountered unsupported type '{type(expr)}' during parsing." + raise ParserError(msg) + def replace_str(lst: list | str, target: str, sub: list | str | float | int) -> list: """Replace a target in list with a substitution recursively. diff --git a/desdeo/problem/testproblems/simple_problem.py b/desdeo/problem/testproblems/simple_problem.py index bb5bf5a6..58c1370f 100644 --- a/desdeo/problem/testproblems/simple_problem.py +++ b/desdeo/problem/testproblems/simple_problem.py @@ -193,7 +193,7 @@ def simple_linear_test_problem() -> Problem: ) -def simple_constrained_quadratic_tensor_test_problem() -> Problem: +def simple_constrained_quadratic_tensor_test_problem(dqp=False) -> Problem: """Defines a simple constrained quadratic problem with tensor variables, suitable for testing purposes.""" xvar = TensorVariable( name="X", @@ -236,7 +236,7 @@ def simple_constrained_quadratic_tensor_test_problem() -> Problem: obj = Objective( name="f_1", symbol="f_1", - func="-0.5*X@X", # this is equivalent to 0.5*X.T@X, since X is a 1D tensor variable + func="Sum(-0.5*(X**2))" if dqp else "-0.5*X@X", maximize=True, is_linear=False, is_convex=True, diff --git a/desdeo/tools/__init__.py b/desdeo/tools/__init__.py index 9b8f36ff..a102c533 100644 --- a/desdeo/tools/__init__.py +++ b/desdeo/tools/__init__.py @@ -3,6 +3,8 @@ __all__ = [ "BaseSolver", "BonminOptions", + "CVXPYSolver", + "CVXPYSolverOptions", "GurobipySolver", "IpoptOptions", "NevergradGenericOptions", @@ -75,6 +77,10 @@ add_group_stom_agg_diff, add_group_stom_diff, ) +from desdeo.tools.cvxpy_solver_interfaces import ( + CVXPYSolver, + CVXPYSolverOptions, +) from desdeo.tools.gurobipy_solver_interfaces import ( GurobipySolver, PersistentGurobipySolver, diff --git a/desdeo/tools/cvxpy_solver_interfaces.py b/desdeo/tools/cvxpy_solver_interfaces.py new file mode 100644 index 00000000..eac4072f --- /dev/null +++ b/desdeo/tools/cvxpy_solver_interfaces.py @@ -0,0 +1,146 @@ +"""Defines solver interfaces for cvxpy.""" + +from pydantic import BaseModel, Field + +from desdeo.problem import ( + CVXPYEvaluator, + Problem, +) +from desdeo.tools.generics import BaseSolver, SolverResults + + +class CVXPYSolverOptions(BaseModel): + """Defines a pydantic model to store and pass options to the CVXPY solver.""" + + solver: str | None = Field( + description="The solver to use.", + default=None, + ) + solver_path: list[str | tuple[str, dict[str, object]]] | None = Field( + description="The solvers to try with optional arguments, in order of preference.", + default=None, + ) + verbose: bool = Field(description="Overrides the default of hiding solver output.", default=False) + gp: bool = Field(description="Parse the problem as a disciplined geometric program.", default=False) + qcp: bool = Field(description="Parse the problem as a disciplined quasiconvex program.", default=False) + requires_grad: bool = Field( + description=( + "Allow gradient computation with respect to Parameters by calling problem.backward() " + "or problem.derivative() after solving." + ), + default=False, + ) + enforce_dpp: bool = Field( + description=( + "Raise a DPPError for non-DPP problems when solving instead of only warning. " + "Only relevant for problems involving Parameters." + ), + default=False, + ) + ignore_dpp: bool = Field( + description=("Treat DPP problems as non-DPP, which may speed up compilation."), + default=False, + ) + extra_options: dict[str, object] | None = Field( + description="Additional keyword arguments forwarded directly to cvxpy Problem.solve().", + default=None, + ) + + +_default_cvxpy_options = CVXPYSolverOptions() + + +def parse_cvxpy_optimizer_results(problem: Problem, evaluator: CVXPYEvaluator) -> SolverResults: + """Parses results from CVXPYEvaluator's problem into DESDEO SolverResults. + + Args: + problem (Problem): the problem being solved. + evaluator (CVXPYEvaluator): the evaluator utilized to solve the problem. + + Returns: + SolverResults: DESDEO solver results. + """ + results = evaluator.get_values() + + variable_values = {var.symbol: results[var.symbol] for var in problem.variables} + objective_values = {obj.symbol: results[obj.symbol] for obj in problem.objectives} + constraint_values = ( + {con.symbol: results[con.symbol] for con in problem.constraints} if problem.constraints is not None else None + ) + extra_func_values = ( + {extra.symbol: results[extra.symbol] for extra in problem.extra_funcs} + if problem.extra_funcs is not None + else None + ) + scalarization_values = ( + {scal.symbol: results[scal.symbol] for scal in problem.scalarization_funcs} + if problem.scalarization_funcs is not None + else None + ) + lagrange_multipliers = None + + success = evaluator.problem_model.status in {"optimal", "optimal_inaccurate"} + if evaluator.problem_model.status == "optimal": + status = "Optimal solution found." + elif evaluator.problem_model.status == "optimal_inaccurate": + status = "Optimal solution found (inaccurate)." + elif evaluator.problem_model.status == "infeasible": + status = "Problem is infeasible." + elif evaluator.problem_model.status == "unbounded": + status = "Problem is unbounded." + elif evaluator.problem_model.status == "infeasible_inaccurate": + status = "Problem is infeasible (inaccurate)." + elif evaluator.problem_model.status == "unbounded_inaccurate": + status = "Problem is unbounded (inaccurate)." + else: + status = f"Optimization ended with status: {evaluator.problem_model.status}" + msg = f"CVXPY solver status is: '{status}'" + + return SolverResults( + optimal_variables=variable_values, + optimal_objectives=objective_values, + constraint_values=constraint_values, + extra_func_values=extra_func_values, + scalarization_values=scalarization_values, + lagrange_multipliers=lagrange_multipliers, + success=success, + message=msg, + ) + + +class CVXPYSolver(BaseSolver): + """Creates a CVXPY solver that utilizes CVXPY's optimization capabilities.""" + + def __init__(self, problem: Problem, options: CVXPYSolverOptions = _default_cvxpy_options): + """The solver is initialized by supplying a problem and options. + + CVXPY is a Python-embedded modeling language for convex optimization problems, + supporting a broad range of problem types. + + Args: + problem (Problem): the problem to be solved. + options (CVXPYSolverOptions): Pydantic model containing solver options for CVXPY. + For available options see https://www.cvxpy.org/api_reference/cvxpy.problems.html#solve + """ + self.evaluator = CVXPYEvaluator(problem) + self.problem = problem + + options_dict = {k: v for k, v in options.model_dump().items() if v is not None} + extra_options = options_dict.pop("extra_options", None) + if extra_options is not None: + options_dict.update(extra_options) + self.solve_options = options_dict + + def solve(self, target: str) -> SolverResults: + """Solve the problem for the given target. + + Args: + target (str): the symbol of the function to be optimized, and which is + defined in the problem given when initializing the solver. + + Returns: + SolverResults: the results of the optimization. + """ + self.evaluator.set_optimization_target(target) + self.evaluator.solve(**self.solve_options) + return parse_cvxpy_optimizer_results(self.problem, self.evaluator) diff --git a/pyproject.toml b/pyproject.toml index 13eedfa1..dab912ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "pymoo >= 0.6.1.2", "shap >= 0.47.0", "requests >= 2.32.0", - "cvxpy[scip] >= 1.6.4", + "cvxpy[scip]>=1.6.4", "moocore >= 0.1.7", "websockets>=15.0.1", "coco-experiment>=2.8.2", @@ -228,6 +228,7 @@ markers = [ "scalarization: tests related to scalarization functions", "nimbus: tests related to the NIMBUS method", "gurobipy: tests related to the gurobipy solver", + "cvxpy: tests related to the cvxpy solver", "pareto_navigator: tests related to the Pareto Navigator method", "patterns: tests related to the publisher-subscriber pattern", "forest_problem: tests related to the forest problem examples.", diff --git a/tests/test_cvxpy_solver.py b/tests/test_cvxpy_solver.py new file mode 100644 index 00000000..267b40fc --- /dev/null +++ b/tests/test_cvxpy_solver.py @@ -0,0 +1,396 @@ +"""Tests for the cvxpy solver.""" + +import numpy as np +import pytest + +from desdeo.problem.schema import ( + Constant, + Constraint, + ConstraintTypeEnum, + ExtraFunction, + Objective, + Problem, + Variable, +) +from desdeo.problem.testproblems import ( + simple_constrained_quadratic_tensor_test_problem, + simple_knapsack_vectors, + simple_linear_test_problem, +) +from desdeo.tools import CVXPYSolver, CVXPYSolverOptions + + +@pytest.mark.slow +@pytest.mark.cvxpy +def test_cvxpy_solver(): + """Tests the cvxpy solver.""" + problem = simple_linear_test_problem() + solver = CVXPYSolver(problem) + + results = solver.solve("f_1") + + assert results.success + + xs = results.optimal_variables + assert np.isclose(xs["x_1"], 4.2, atol=1e-8) + assert np.isclose(xs["x_2"], 2.1, atol=1e-8) + + +@pytest.mark.slow +@pytest.mark.cvxpy +def test_cvxpy_solver_with_tensors(): + """Test cvxpy solver with a problem with TensorVariables.""" + problem = simple_knapsack_vectors() + solver = CVXPYSolver(problem) + + results = solver.solve("f_1_min") + + assert results.success + xs, ys = results.optimal_variables, results.optimal_objectives + + assert np.allclose(xs["X"], [1.0, 1.0, 0.0, 0.0]) + assert np.isclose(ys["f_1"], 8.0) + assert np.isclose(ys["f_2"], 6.0) + + results = solver.solve("f_2_min") + + assert results.success + xs, ys = results.optimal_variables, results.optimal_objectives + + assert np.allclose(xs["X"], [0.0, 0.0, 1.0, 0.0]) + assert np.isclose(ys["f_1"], 6.0) + assert np.isclose(ys["f_2"], 7.0) + + +@pytest.mark.slow +@pytest.mark.cvxpy +def test_cvxpy_solver_qp_with_tensors(): + """Test cvxpy solver with a quadratic problem with TensorVariables.""" + problem = simple_constrained_quadratic_tensor_test_problem(dqp=True) + solver = CVXPYSolver(problem) + + results = solver.solve("f_1_min") + + assert results.success + xs, ys = results.optimal_variables, results.optimal_objectives + + assert np.allclose(xs["X"], [2 / 3, 2 / 3]) + assert np.isclose(ys["f_1"], -((2 / 3) ** 2)) + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_basic_arithmetic(): + """Test the JSON parser for correctly parsing MathJSON into CVXPY expressions with basic arithmetic.""" + problem = Problem( + name="Test Problem", + description="Test Problem", + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=0.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=0.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", "x_1", "x_2"], + maximize=False, + ), + ], + ) + + solver = CVXPYSolver(problem) + results = solver.solve("f_1") + + assert results.optimal_variables["x_1"] >= -1e-8 # should be near 0 + assert results.optimal_variables["x_2"] >= -1e-8 # should be near 0 + assert np.isclose(results.optimal_objectives["f_1"], 0.0, atol=1e-8) + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_with_constants(): + """Test the JSON parser for correctly parsing CVXPY expressions with constants.""" + problem = Problem( + name="Test Problem with Constants", + description="Test Problem with Constants", + constants=[Constant(name="c1", symbol="c_1", value=5.0)], + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=0.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=0.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", ["Multiply", "c_1", "x_1"], "x_2"], + maximize=False, + ), + ], + ) + + solver = CVXPYSolver(problem) + results = solver.solve("f_1") + + assert results.optimal_variables["x_1"] >= -1e-8 + assert results.optimal_variables["x_2"] >= -1e-8 + # f_1 = 5*x_1 + x_2, minimized -> both should be near 0 + assert np.isclose(results.optimal_objectives["f_1"], 0.0, atol=1e-8) + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_with_constraints(): + """Test the JSON parser for correctly parsing CVXPY expressions with constraints.""" + problem = Problem( + name="Test Problem with Constraints", + description="Test Problem with Constraints", + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=0.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=0.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", "x_1", "x_2"], + maximize=True, + ), + ], + constraints=[ + Constraint( + name="Constraint 1", + symbol="g_1", + cons_type=ConstraintTypeEnum.LTE, + func=["Add", "x_1", "x_2", -5], # x_1 + x_2 <= 5 + ), + ], + ) + + solver = CVXPYSolver(problem) + results = solver.solve("f_1") + + # Maximum of x_1 + x_2 subject to x_1 + x_2 <= 5 is exactly 5 + assert np.isclose(results.optimal_objectives["f_1"], 5.0, atol=1e-6) + # Constraint should be satisfied: x_1 + x_2 - 5 <= 0 + assert np.isclose(results.constraint_values["g_1"], 0.0, atol=1e-6) or results.constraint_values["g_1"] < 1e-5 + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_quadratic_problem(): + """Test the JSON parser for correctly parsing CVXPY quadratic expressions.""" + problem = Problem( + name="Quadratic Test Problem", + description="Quadratic Test Problem", + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=-10.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=-10.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", ["Square", "x_1"], ["Square", "x_2"]], + maximize=False, + ), + ], + ) + + solver = CVXPYSolver(problem) + results = solver.solve("f_1") + + # Minimum should be at origin + assert np.isclose(results.optimal_variables["x_1"], 0.0, atol=1e-6) + assert np.isclose(results.optimal_variables["x_2"], 0.0, atol=1e-6) + assert np.isclose(results.optimal_objectives["f_1"], 0.0, atol=1e-6) + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_multiple_operators(): + """Test the JSON parser for CVXPY with multiple mathematical operators.""" + problem = Problem( + name="Multiple Operators Test", + description="Multiple Operators Test", + constants=[Constant(name="c1", symbol="c_1", value=2.0)], + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=1.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=1.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=[ + "Add", + ["Multiply", "c_1", ["Square", "x_1"]], + ["Divide", "x_2", "c_1"], + ], + maximize=False, + ), + ], + ) + + options = CVXPYSolverOptions(ignore_dpp=True) + solver = CVXPYSolver(problem, options=options) + results = solver.solve("f_1") + + # Both variables should be minimized to lower bounds + assert results.optimal_variables["x_1"] >= 1.0 - 1e-6 + assert results.optimal_variables["x_2"] >= 1.0 - 1e-6 + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_equality_constraint(): + """Test the JSON parser for CVXPY with equality constraints.""" + problem = Problem( + name="Equality Constraint Test", + description="Equality Constraint Test", + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=-10.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=-10.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", ["Square", "x_1"], ["Square", "x_2"]], + maximize=False, + ), + ], + constraints=[ + Constraint( + name="Constraint 1", + symbol="g_1", + cons_type=ConstraintTypeEnum.EQ, + func=["Add", "x_1", "x_2", -4], # x_1 + x_2 = 4 + ), + ], + ) + + solver = CVXPYSolver(problem) + results = solver.solve("f_1") + + # Constraint should be satisfied: x_1 + x_2 = 4 + assert np.isclose(results.constraint_values["g_1"], 0.0, atol=1e-5) + # Both variables should be equal for minimum + assert np.isclose(results.optimal_variables["x_1"], results.optimal_variables["x_2"], atol=1e-5) + # Sum should be 4 + assert np.isclose(results.optimal_variables["x_1"] + results.optimal_variables["x_2"], 4.0, atol=1e-5) + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_parse_cvxpy_negation(): + """Test the JSON parser for CVXPY with negation.""" + problem = Problem( + name="Negation Test", + description="Negation Test", + variables=[ + Variable(name="x1", symbol="x_1", variable_type="real", lowerbound=0.0, upperbound=10.0), + Variable(name="x2", symbol="x_2", variable_type="real", lowerbound=0.0, upperbound=10.0), + ], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", ["Negate", "x_1"], "x_2"], + maximize=False, + ), + ], + ) + + solver = CVXPYSolver(problem) + results = solver.solve("f_1") + + # f_1 = -x_1 + x_2, minimize -> x_1 should be high, x_2 should be low + assert results.optimal_variables["x_1"] >= 9.9 or np.isclose(results.optimal_variables["x_1"], 10.0, atol=1e-5) + assert results.optimal_variables["x_2"] <= 1e-6 + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_cvxpy_solver_from_problem(extra_functions_problem): + """Test CVXPYSolver with a problem containing extra functions and constraints.""" + solver = CVXPYSolver(extra_functions_problem) + + # Check that variables are initialized in the evaluator + assert "x_1" in solver.evaluator.variables + assert "x_2" in solver.evaluator.variables + + # Check that objectives are initialized in the evaluator + assert "f_1" in solver.evaluator.objective_functions + assert "f_2" in solver.evaluator.objective_functions + + # Check that constraints are initialized in the evaluator + assert "con_1" in solver.evaluator.constraints + + # Check that extra functions are initialized in the evaluator + assert "ef_1" in solver.evaluator.extra_functions + assert "ef_2" in solver.evaluator.extra_functions + + # Check that constants are initialized in the evaluator + assert "c_1" in solver.evaluator.constants + + +@pytest.mark.json +@pytest.mark.cvxpy +def test_cvxpy_solver_solve_and_constraint_evaluation(extra_functions_problem): + """Test that CVXPYSolver correctly evaluates constraints after solving.""" + solver = CVXPYSolver(extra_functions_problem) + results = solver.solve("f_1") + + # Check that constraint values are computed + assert "con_1" in results.constraint_values + # Constraints in DESDEO are defined as f(x) <= 0, so this should be <= 0 (or close) + assert results.constraint_values["con_1"] is not None + + +@pytest.fixture +def extra_functions_problem(): + """Defines a problem with extra functions and constraints for testing.""" + return Problem( + name="Extra functions test problem", + description="Extra functions test problem", + variables=[ + Variable(name="Test 1", symbol="x_1", variable_type="real", lowerbound=0.0, upperbound=10.0), + Variable(name="Test 2", symbol="x_2", variable_type="real", lowerbound=0.0, upperbound=10.0), + ], + constants=[Constant(name="Constant 1", symbol="c_1", value=3.0)], + objectives=[ + Objective( + name="Objective 1", + symbol="f_1", + func=["Add", "x_1", "x_2"], + maximize=False, + ), + Objective( + name="Objective 2", + symbol="f_2", + func=["Multiply", "x_1", "x_2"], + maximize=False, + ), + ], + constraints=[ + Constraint( + name="Constraint 1", + symbol="con_1", + cons_type=ConstraintTypeEnum.LTE, + func=["Add", "x_1", "x_2", -5], # x_1 + x_2 <= 5 + ), + ], + extra_funcs=[ + ExtraFunction( + name="Extra Function 1", + symbol="ef_1", + func=["Multiply", "x_1", "c_1"], # x_1 * 3 + ), + ExtraFunction( + name="Extra Function 2", + symbol="ef_2", + func=["Add", "x_2", "c_1"], # x_2 + 3 + ), + ], + )