Source code for decogo.pyomo_problem.subproblem

"""This module implements and manages all sub-problems of Pyomo models"""

import logging
from abc import ABC, abstractmethod

import numpy as np
from pyomo.core import ConcreteModel, Param, RangeSet, Var, ConstraintList, \
    Objective, minimize, maximize
from pyomo.core.expr.visitor import identify_variables, replace_expressions
from pyomo.environ import Binary, Integers, Reals, SolverStatus, \
    TerminationCondition
from pyomo.opt import SolverFactory

logger = logging.getLogger('decogo')


[docs]class PyomoSubProblemBase: """A base class for construction of Pyomo model. Here are implemented methods for creating local linear and nonlinear constraints :param block_model: Block model :type block_model: PyomoBlockModel :param block_id: Block identifier :type block_id: int """
[docs] def __init__(self, block_model, block_id): """Constructor method""" self.block_id = block_id self.block_model = block_model self.model = ConcreteModel() self.model.Y = Var(RangeSet(self.block_model.block_sizes[block_id])) for n in range(1, block_model.sub_models[block_id].block_size + 1): lb = block_model.sub_models[block_id].variables[n - 1].lower_bound self.model.Y[n].value = lb self.model.Y[n].setlb(lb) ub = block_model.sub_models[block_id].variables[n - 1].upper_bound self.model.Y[n].setub(ub) if block_model.sub_models[block_id].variables[ n - 1].type == "Binary": self.model.Y[n].domain = Binary self.model.Y[n].value = 0 elif block_model.sub_models[block_id].variables[ n - 1].type == "Integers": self.model.Y[n].domain = Integers else: self.model.Y[n].domain = Reals self.model.lin_con = ConstraintList() def lin_con_rule(model, k, m): return sum( self.block_model.cuts.local_cuts[k][m].lhs[self.block_id, n] * model.Y[n + 1] for n in range(block_model.sub_models[k].block_size)) for m, local_constr in enumerate(block_model.cuts.local_cuts[block_id]): if local_constr.relation == "<=": self.model.lin_con.add(expr=lin_con_rule(self.model, block_id, m) <= local_constr.rhs) elif local_constr.relation == "=": self.model.lin_con.add(expr=lin_con_rule(self.model, block_id, m) == local_constr.rhs) elif local_constr.relation == ">=": self.model.lin_con.add(expr=lin_con_rule(self.model, block_id, m) >= local_constr.rhs) self.model.nonlin_constr = ConstraintList() for constr in block_model.sub_models[block_id].nonlin_constr: expr = constr.expr.clone() vars_in_expr = identify_variables(expr) substitution_map = {} # map for replacing the objects in the expression for var in vars_in_expr: index = block_model.sub_models[block_id].vars_in_block.index( var.name) substitution_map[id(var)] = self.model.Y[index + 1] new_expr = replace_expressions(expr, substitution_map) self.model.nonlin_constr.add(expr=new_expr)
[docs] def solve(self, solver_name, start_point=None, solver_options=None): """Base method for calling the external solver to solve the model :param solver_name: External solver name :type solver_name: str :param start_point: Starting point for the solver, defaults to ``None`` :type start_point: ndaray or None :param solver_options: Options for external solver, defaults to ``None`` :type solver_options: list :return: Solution point, primal bound, dual bound, flag if the primal \ bound is feasible :rtype: tuple """ if start_point is not None: for i in range(len(self.model.Y)): self.model.Y[i + 1].value = start_point[i] # solving the problem opt = SolverFactory(solver_name) # # set solver options # # options are pairs: (parameter name, value) # if solver_options is not None: # for key, value in solver_options: # opt.options[key] = value # # results = opt.solve(self.model, tee=False) def recursive_solve(opt_recursive, options=None, iter_i=10): solver_error = False try: if options is not None: for key, value in solver_options: opt_recursive.options[key] = value result_recursive = opt_recursive.solve(self.model, tee=False) except ValueError as error: logger.info(error) if iter_i - 2 < 6: logger.info('error: cannot solve') solver_error = True result_recursive = None else: logger.info('solve: max_iter = {0}'.format(iter_i - 2)) result_recursive, solver_error = \ recursive_solve(opt_recursive, options=('max_iter', iter_i - 2), iter_i=iter_i - 2) return result_recursive, solver_error results, error_in_solver = recursive_solve(opt, solver_options) sol_is_feasible = True dual_bound = None primal_bound = None y_new = np.zeros(shape=len(self.model.Y)) if error_in_solver: sol_is_feasible = False else: if results.solver.status == SolverStatus.ok: sol_is_feasible = True elif results.solver.status == SolverStatus.warning: if results.solver.termination_condition == TerminationCondition.infeasible: sol_is_feasible = False logger.info(self.model.name + ' is infeasible') elif results.solver.termination_condition == TerminationCondition.maxIterations: sol_is_feasible = False for i in range(len(self.model.Y)): y_new[i] = self.model.Y[i + 1].value primal_bound = self.model.obj.expr() # by default dual bound is equal to primal bound dual_bound = primal_bound if solver_name == 'scip' and solver_options is not None: log_lines = opt._log.split('\n') for line in log_lines: if line.startswith('Dual Bound'): dual_bound = float(line.split(':')[1].strip()) return y_new, primal_bound, dual_bound, sol_is_feasible
[docs] def add_linear_constraint(self): """Adds new linear constraint""" def lin_con_rule(model, constr): return sum(constr.lhs[self.block_id, n] * model.Y[n + 1] for n in range(self.block_model.block_sizes[self.block_id])) constr = self.block_model.cuts.local_cuts[self.block_id][ -1] # this takes the last one if constr.relation == "<=": self.model.lin_con.add( expr=lin_con_rule(self.model, constr) <= constr.rhs) elif constr.relation == "=": self.model.lin_con.add( expr=lin_con_rule(self.model, constr) == constr.rhs) elif constr.relation == ">=": self.model.lin_con.add( expr=lin_con_rule(self.model, constr) >= constr.rhs)
[docs] def set_nlp(self): """Relaxes integer varaibles to continious""" for i, var_domain in enumerate( self.block_model.sub_models[self.block_id].variables): if var_domain.type == "Integers": self.model.Y[i + 1].domain = Reals elif var_domain.type == "Binary": self.model.Y[i + 1].domain = Reals
[docs] def unset_nlp(self): """Sets to integer type for the variables which are integer in the original formulation""" for i, var_domain in enumerate( self.block_model.sub_models[self.block_id].variables): if var_domain.type == "Integers": self.model.Y[i + 1].domain = Integers elif var_domain.type == "Binary": self.model.Y[i + 1].domain = Binary
[docs] def update_var_lower_bound(self, index): """Updates the lower bound of the variable. The value is taken form the BlockModel :param index: Index (block_id, index) :type index: tuple """ k, i = index self.model.Y[i + 1].setlb( self.block_model.sub_models[k].variables[i].lower_bound)
[docs] def update_var_upper_bound(self, index): """Updates the upper bound of the variable. The value is taken form the BlockModel :param index: Index (block_id, index) :type index: tuple """ k, i = index self.model.Y[i + 1].setub( self.block_model.sub_models[k].variables[i].upper_bound)
[docs] def fix_integers(self, x): """Fixes integer variables with given value :param x: Given array :type x: ndarray """ for i, var in enumerate( self.block_model.sub_models[self.block_id].variables): if var.type == 'Integer' or var.type == 'Binary': self.model.Y[i + 1].value = x[i] self.model.Y[i + 1].fix()
[docs] def unfix_integers(self): """Unfixes all integer variables""" for i, var in enumerate( self.block_model.sub_models[self.block_id].variables): if var.type == 'Integers' or var.type == 'Binary': self.model.Y[i + 1].unfix()
[docs] def add_objective(self): """Sets default objective function, i.e. :math:`c_k^Tx_k`, where :math:`c_k` - partial objective from the original model """ def objective_rule(model): ans_obj = 0 ans_obj += sum( self.block_model.cuts.obj.c[self.block_id, n] * model.Y[n + 1] for n in range(self.block_model.block_sizes[self.block_id])) ans_obj += self.block_model.cuts.obj.const return ans_obj self.model.del_component('obj') self.model.obj = Objective(rule=objective_rule, sense=minimize)
[docs] def set_objective(self, direction): """Sets the objective function with given direction vector :param direction: Given vector :type direction: ndarray """ new_obj_expr = sum(direction[n] * self.model.Y[n + 1] for n in range( self.block_model.sub_models[self.block_id].block_size)) self.model.del_component('obj') self.model.obj = Objective(expr=new_obj_expr)
[docs]class PyomoMinlpSubProblem(PyomoSubProblemBase): """A class for defining the following sub-problem .. math:: \\begin{equation} \\begin{split} \\min \\ &d_k^Ty_k, \\newline &y_k \\in X_k, d_k \\in \\mathbb{R}^{n_k} \\end{split} \\end{equation} :param block_model: Block model :type block_model: PyomoBlockModel :param block_id: Block identifier :type block_id: int """
[docs] def __init__(self, block_model, block_id): """Constructor method""" super().__init__(block_model, block_id)
[docs]class PyomoProjectionSubProblem(PyomoSubProblemBase): """A class for defining a projection sub-problem .. math:: \\begin{equation} \\begin{split} \\min \\ &||y_k-x_k||^2,\\newline &y_k \\in G_k, x_k \\text{ is fixed} \\end{split} \\end{equation} :param block_model: Block model :type block_model: PyomoBlockModel :param block_id: Block identifier :type block_id: int """
[docs] def __init__(self, block_model, block_id): """Constructor method""" super().__init__(block_model, block_id) self.model.X = Param(RangeSet(self.block_model.block_sizes[block_id]), mutable=True, initialize=0) self.model.obj = Objective(expr=self.obj_rule()) self.model.name = 'Projection sub problem'
[docs] def obj_rule(self): """Defines the objective function :return: Objective expression :rtype: Expression """ expr = sum(pow((self.model.Y[n + 1] - self.model.X[n + 1]), 2) for n in range(self.block_model.sub_models[self.block_id].block_size)) return expr
[docs] def set_projection_point(self, point_to_project): """Sets projection point, i.e. :math:`x_k` :param point_to_project: Projection point :type point_to_project: ndarray """ for i in range(len(self.model.X)): self.model.X[i + 1].value = point_to_project[i]
[docs]class PyomoResourceProjectionSubProblem(PyomoSubProblemBase): """A class for defining a projection sub-problem .. math:: \\begin{equation} \\begin{split} \\min \\ &||A_kx_k - w_k||^2,\\newline &y_k \\in G_k, w_k \\text{ is fixed} \\end{split} \\end{equation} :param block_model: Block model :type block_model: PyomoBlockModel :param block_id: Block identifier :type block_id: int """
[docs] def __init__(self, block_model, block_id): """Constructor method""" super().__init__(block_model, block_id) self.model.w = Param(RangeSet(self.block_model.cuts.num_of_global_cuts + 1), mutable=True, initialize=0) self.model.obj = Objective(expr=self.obj_rule()) self.model.name = 'Resource projection sub problem'
[docs] def obj_rule(self): """Defines the objective function :return: Objective expression :rtype: Expression """ expr = 0 for j in range(self.block_model.cuts.num_of_global_cuts + 1): if j == 0: expr += \ (sum(self.block_model.cuts.obj.c[self.block_id, i] * self.model.Y[i + 1] for i in range(self.block_model.block_sizes[self.block_id])) - self.model.w[j + 1]) ** 2 else: expr += \ (sum(self.block_model.cuts.global_cuts[j - 1].lhs[self.block_id, i] * self.model.Y[i + 1] for i in range(self.block_model.block_sizes[self.block_id])) - self.model.w[j + 1]) ** 2 return expr
[docs] def set_projection_point(self, point_to_project): """Sets projection point, i.e. :math:`w_k` :param point_to_project: Projection point :type point_to_project: ndarray """ for i in range(len(self.model.w)): self.model.w[i + 1].value = point_to_project[i]
[docs]class PyomoLineSearchSubProblem(PyomoSubProblemBase): """This class defines line search sub-problem between exterior point :math:`x_k^{ext}` and interior point :math:`x_k^{int}` .. math:: \\begin{equation} \\begin{split} \\max \\ & \\alpha, \\newline &y_k = \\alpha x_k^{ext} + (1 - \\alpha) x_k^{int}, \\newline &y_k \\in X_k \\end{split} \\end{equation} :param block_model: Block model :type block_model: PyomoBlockModel :param block_id: Block identifier :type block_id: int """
[docs] def __init__(self, block_model, block_id): super().__init__(block_model, block_id) self.model.lin_con.deactivate() # variable for finding the point on the line between x_hat and x_star self.model.alpha = Var(bounds=(0, 1), initialize=0, within=Reals) # objective self.model.obj = Objective(expr=self.model.alpha, sense=maximize) self.model.interior_point = Param( RangeSet(self.block_model.block_sizes[block_id]), mutable=True, initialize=0) self.model.exterior_point = Param( RangeSet(self.block_model.block_sizes[block_id]), mutable=True, initialize=0) self.model.line_search_con = ConstraintList() for i in range(self.block_model.block_sizes[self.block_id]): self.model.line_search_con.add( self.model.Y[i + 1] == self.model.alpha * self.model.exterior_point[i + 1] + (1 - self.model.alpha) * self.model.interior_point[i + 1])
[docs] def set_interior_exterior_points(self, exterior_point, interior_point): """Sets values of two endpoints :param exterior_point: Exterior point :type exterior_point: ndarray :param interior_point: Interior point :type interior_point: ndarray """ for i in range(self.block_model.block_sizes[self.block_id]): self.model.exterior_point[i + 1].value = exterior_point[i] self.model.interior_point[i + 1].value = interior_point[i]
[docs] def solve(self, solver_name, start_point=None, solver_options=None): """Solves the subproblem and gets value of :math:`\\alpha`. For more info, see base method :meth:`SubProblemBase.solve` """ y_new, primal_bound, dual_bound, sol_is_feasible = \ super().solve(solver_name, start_point=start_point, solver_options=solver_options) alpha = self.model.alpha.value return alpha, y_new
[docs]class PyomoSubProblems: """A container class for managing all sub-problems :param minlp_sub_problem: Sub-probem with linear objective :type minlp_sub_problem: PyomoMinlpSubProblem :param proj_sub_problem: Pure projection sub-problem :type proj_sub_problem: PyomoProjectionSubProblem :param resource_constrained_sub_problem: Resource constrained sub-problem \ for checking the feasibility of the resources :type resource_constrained_sub_problem: PyomoResourceConstrainedSubProblem :param line_search_sub_problem: Line search sub-problem :type line_search_sub_problem: PyomoLineSearchSubProblem :param block_model: Block model :type block_model: PyomoBlockModel :param block_id: Block identifier :type block_id: int """
[docs] def __init__(self, block_model, block_id): """Constructor method""" self.minlp_sub_problem = PyomoMinlpSubProblem(block_model, block_id) self.proj_sub_problem = PyomoProjectionSubProblem(block_model, block_id) self.line_search_sub_problem = PyomoLineSearchSubProblem(block_model, block_id) self.resource_proj_sub_problem = \ PyomoResourceProjectionSubProblem(block_model, block_id)
[docs] def minlp_solve(self, solver_name, solver_options=None, start_point=None, direction=None, cell=None): """Solves :class:`MinlpSubProblem` :param solver_name: External solver name :type solver_name: str :param solver_options: External solver options, defaults to ``None`` :type solver_options: list or None :param start_point: Staring point for the solver, defaults to ``None`` :type start_point: ndarray or None :param direction: Direction for the objective, defaults to ``None`` :type direction: ndarray or None :param cell: Cell, defaults to ``None`` :type cell: Cell or None :return: Solution point, primal and dual bound :rtype: tuple """ if direction is None: self.minlp_sub_problem.add_objective() else: self.minlp_sub_problem.set_objective(direction) if cell is not None: self.minlp_sub_problem.add_cell_constraints(cell) y_new, primal_bound, dual_bound, sol_is_feasible = \ self.minlp_sub_problem.solve(solver_name, solver_options=solver_options, start_point=start_point) if cell is not None: self.minlp_sub_problem.remove_cell_constraints() return y_new, primal_bound, dual_bound, sol_is_feasible
[docs] def fixed_minlp_solve(self, solver_name, start_point, solver_options=None, direction=None, cell=None): """Solves :class:`MinlpSubProblem` with fixed integer variables :param solver_name: External solver name :type solver_name: str :param start_point: Staring point for the solver, and point which is \ used for fixing the integers :type start_point: ndarray or None :param solver_options: External solver options, defaults to ``None`` :type solver_options: list or None :param direction: Direction for the objective, defaults to ``None`` :type direction: ndarray or None :param cell: Cell, defaults to ``None`` :type cell: Cell or None :return: Solution point, primal and dual bound :rtype: tuple """ if direction is None: self.minlp_sub_problem.add_objective() else: self.minlp_sub_problem.set_objective(direction) if cell is not None: self.minlp_sub_problem.add_cell_constraints(cell) self.minlp_sub_problem.fix_integers(start_point) y_new, primal_bound, dual_bound, sol_is_feasible = \ self.minlp_sub_problem.solve(solver_name, solver_options=solver_options, start_point=start_point) self.minlp_sub_problem.unfix_integers() if cell is not None: self.minlp_sub_problem.remove_cell_constraints() return y_new, primal_bound, sol_is_feasible
[docs] def nlp_proj_solve(self, solver_name, point_to_project, start_point=None): """Solves :class:`ProjSubProblem` :param solver_name: External solver name :type solver_name: str :param point_to_project: Projection point :type point_to_project: ndarray :param start_point: Staring point for the solver, defaults to ``None`` :type start_point: ndarray or None :return: Solution point, primal bound and flag indicating the \ feasibility of primal solution :rtype: tuple """ self.proj_sub_problem.set_projection_point(point_to_project) y_new, primal_bound, dual_bound, sol_is_feasible = \ self.proj_sub_problem.solve(solver_name, start_point=start_point) return y_new, primal_bound, sol_is_feasible
[docs] def resource_proj_solve(self, solver_name, point_to_project, solver_options=None, start_point=None): """Solves :class:`ResourceProjSubProblem` :param solver_name: External solver name :type solver_name: str :param point_to_project: Projection point :type point_to_project: ndarray :param start_point: Staring point for the solver, defaults to ``None`` :type start_point: ndarray or None :return: Solution point, primal bound and flag indicating the \ feasibility of primal solution :rtype: tuple """ self.resource_proj_sub_problem.set_projection_point(point_to_project) y_new, primal_bound, dual_bound, sol_is_feasible = \ self.resource_proj_sub_problem.solve(solver_name, start_point=start_point, solver_options=solver_options) return y_new, primal_bound, sol_is_feasible
[docs] def solve_line_search_subproblem(self, solver_name, exterior_point, interior_point): """Solves :class:`LineSearchSubProblem` :param solver_name: External solver name :type solver_name: str :param exterior_point: Exterior point :type exterior_point: ndarray :param interior_point: Interior point :type interior_point: ndarray :return: value of parameter :math:`\\alpha` and solution point :rtype: tuple """ self.line_search_sub_problem.set_interior_exterior_points( exterior_point, interior_point) alpha, point = self.line_search_sub_problem.solve(solver_name) return alpha, point
[docs] def add_linear_constraint(self): """Adds linear constraints to sub-problems""" self.minlp_sub_problem.add_linear_constraint() self.proj_sub_problem.add_linear_constraint()
[docs] def update_var_lower_bound(self, index): """Updates the lower bound of variable by index :param index: Index of the variable (block_id, index) :type index: tuple """ self.minlp_sub_problem.update_var_lower_bound(index) self.proj_sub_problem.update_var_lower_bound(index) self.line_search_sub_problem.update_var_lower_bound(index)
[docs] def update_var_upper_bound(self, index): """Updates the upper bound of variable by index :param index: Index of the variable (block_id, index) :type index: tuple """ self.minlp_sub_problem.update_var_upper_bound(index) self.proj_sub_problem.update_var_upper_bound(index) self.line_search_sub_problem.update_var_upper_bound(index)