Source code for decogo.problem.approx_data

"""This module stores approximation data generated during the solution process.
"""

import copy
import logging

import numpy as np

from decogo.model.constraints import LinearConstraint
from decogo.util.block_vector import BlockVector

logger = logging.getLogger('decogo')


[docs]class ApproxData: """A wrapper class for storing approximation data, i.e. inner points, linearization cuts, valid compact cuts and cells. :param block_model: Block model :type block_model: BlockModel :param inner_points: Stores the inner points :type inner_points: InnerPoints :param linearization_cuts: Stores linearization cuts (for OA algorithm) :type linearization_cuts: LinearizationCuts :param compact_cuts: Stores valid compact cuts :type compact_cuts: CompactCuts """
[docs] def __init__(self, block_model): """Constructor method""" # self.block_model = block_model self.inner_points = InnerPoints(block_model) self.linearization_cuts = LinearizationCuts(block_model) self.compact_cuts = CompactCuts(block_model)
[docs] def add_linearization_cuts(self, y, eps, x=None, block_id=None): """Adds linearization cuts by calling :meth:LinearizationCuts.add_linearization_cuts""" return self.linearization_cuts.add_linearization_cuts(y, eps, x=x, block_id=block_id)
[docs] def add_inner_point(self, block_id, point, min_inner_point_distance=None): """Adds inner point by calling :meth:`InnerPoints.add_point`""" return self.inner_points.add_point(block_id, point, min_inner_point_distance)
[docs] def get_min_inner_point(self, block_id, direction): """Gets inner point with respect to the minimum value computed with some direction in original space by calling :meth:`InnerPoints.get_min_inner_point`""" return self.inner_points.get_min_inner_point(block_id, direction)
[docs] def get_min_column(self, block_id, direction): """Gets column with respect to the minimum value computed with some direction in image space by calling :meth:`InnerPoints.get_min_column`""" return self.inner_points.get_min_column(block_id, direction)
[docs] def get_inner_points_size(self, block_id): """Gets inner points size by calling :meth:`InnerPoints.get_size`""" return self.inner_points.get_size(block_id)
[docs] def add_compact_cut(self, block_id, lhs, relation, rhs): """Add valid compact cut by calling :meth:`CompactCuts.add`""" self.compact_cuts.add(block_id, lhs, relation, rhs)
[docs] def get_compact_cuts_size(self): """Gets compact cut size by calling :meth:`CompactCuts.get_length`""" return self.compact_cuts.get_length()
[docs]class InnerPoints: """This class stores inner points and corresponding columns :param block_model: Block model :type block_model: BlockModel """
[docs] def __init__(self, block_model): """Constructor method""" self.block_model = block_model # how we store the points: # we create the dictionary (k -> [list of tuples (hat_y_k, w_k)]) # where k is block_id, hat_y_k is a point and w_k is a column defined # by w_k = (c_k*hat_y_k, A_k*hat_y_k) self.points = {} # Points stored block-wise as dictionary of lists for k in range(self.block_model.num_blocks): self.points[k] = [] # container for indexes of atomic block in aggregated block self.KT = {} # initialize KT for atomic blocks for k in range(self.block_model.num_blocks): self.KT[k] = (k,)
[docs] def __getitem__(self, item): """Index operator :param item: Given index as tuple (block_id, index) :type item: tuple :return: tuple (point, column), i.e point and corresponding column :rtype: tuple """ k, j = item return self.points[k][j]
[docs] def is_new_point(self, block_id, column, min_inner_point_distance=None): """Computes minimum distance (based on the infinity norm) of column to all columns in one block :param block_id: Block identifier :type block_id: int :param column: Given possibly new column :type column: ndarray :param min_inner_point_distance: Threshold for identifying if the \ column is new :type min_inner_point_distance: float or None :return: Result if the column is new :rtype: bool """ min_dist = float('inf') if len(self.KT[block_id]) == 1: # find minimum distance to columns for p, col in self.points[block_id]: dist = np.linalg.norm(column - col, ord=np.inf) if dist < min_dist: min_dist = dist new_point = False if min_dist >= 1e-10: # makes sure that completely identical column is not added # to the inner points if min_inner_point_distance is not None: if min_dist > min_inner_point_distance: new_point = True else: new_point = False else: new_point = True elif len(self.KT[block_id]) > 1: # find minimum distance to long columns for p, col in self.points[block_id]: dist_sum = 0 for i in self.KT[block_id]: dist = np.linalg.norm(column[i] - col[i], ord=np.inf) dist_sum += dist if dist_sum < min_dist: min_dist = dist_sum new_point = False if min_dist >= 1e-10: # makes sure that completely identical column is not added # to the inner points if min_inner_point_distance is not None: if min_dist > min_inner_point_distance: new_point = True else: new_point = False else: new_point = True return new_point
[docs] def add_point(self, block_id, point, min_inner_point_distance=None): """Adds the new point and column. If ``min_inner_point_distance`` is ``None``, the column is always added (based on the reduced cost computation) :param block_id: Block identifier :type block_id: int :param point: Given new point :type point: ndarray :param min_inner_point_distance: Threshold for identifying if the \ column is new :type min_inner_point_distance: float or None :return: Tuple which containes the flag if the column is new, \ point and corresponding column :rtype: tuple """ if len(self.KT[block_id]) == 1: if self.block_model.sub_models[block_id].linear is False: column = self.block_model.trans_into_im_space(block_id, point) new_point = self.is_new_point(block_id, column, min_inner_point_distance) else: # add no column to linear atomic block new_point = False column = self.block_model.trans_into_im_space(block_id, point) if new_point is True: self.points[block_id].append((point, column)) return new_point, point, column elif len(self.KT[block_id]) > 1: # sub-problem for hyper block generate list of points for # atomic blocks and list of columns are transformed from the points # K_t = [k1, k2] # column_t_j = { k1: column_k1_j, k2:column_k2_j} column_dict = {} point_dict = {} for i in self.KT[block_id]: point_dict[i] = point.get_block(i) column = self.block_model.trans_into_im_space(i, point_dict[i]) column_dict[i] = column new_point = self.is_new_point(block_id, column_dict, min_inner_point_distance) if new_point is True: self.points[block_id].append((point_dict, column_dict)) return new_point, point_dict, column_dict
[docs] def get_min_inner_point(self, block_id, dir_orig_space): """Get the inner point based on the minimum value regarding the direction in original space, i.e. :math:`y = {\\mathrm{argmin\ }} d^Tx, x \\in S`, where :math:`S` is the set of inner points :param block_id: Block identifier :type block_id: int :param dir_orig_space: Given direction :type dir_orig_space: ndarray :return: Point and corresponding minimum value :rtype: tuple """ min_point = 0 min_obj_val = float('inf') if len(self.KT[block_id]) == 1: for point, column in self.points[block_id]: obj_val = np.dot(dir_orig_space, point) if obj_val < min_obj_val: min_obj_val = obj_val min_point = point elif len(self.KT[block_id]) > 1: for point, column in self.points[block_id]: obj_val = 0 for k in self.KT[block_id]: obj_val += np.dot(dir_orig_space.get_block(k), point[k]) if obj_val < min_obj_val: min_obj_val = obj_val min_point = point return min_point, min_obj_val
[docs] def get_min_column(self, block_id, dir_im_space): """Get the column based on the minimum value regarding the direction in image space, i.e. :math:`y = {\\mathrm{argmin\ }} d^Tx, x \\in S`, where :math:`S` is the set of inner points :param block_id: Block identifier :type block_id: int :param dir_im_space: Given direction :type dir_im_space: ndarray :return: Column and corresponding minimum value :rtype: tuple """ min_column = 0 min_obj_val = float('inf') if len(self.KT[block_id]) == 1: for point, column in self.points[block_id]: obj_val = np.dot(dir_im_space, column) if obj_val < min_obj_val: min_obj_val = obj_val min_column = column elif len(self.KT[block_id]) > 1: for _, column_dict in self.points[block_id]: obj_val = 0 for (k, column) in column_dict.items(): obj_val += np.dot(dir_im_space, column) if obj_val < min_obj_val: min_obj_val = obj_val min_column_dict = column_dict # transform to 2-D array min_column = np.array([[]]) for (_, column) in min_column_dict.items(): if min_column.shape[1] == 0: min_column = \ np.concatenate((min_column, np.array([column])), axis=1) else: min_column = \ np.concatenate((min_column, np.array([column])), axis=0) return min_column, min_obj_val
[docs] def get_size(self, block_id): """Gets the size of inner points of the given block :param block_id: Block identifier :type block_id: int :return: Size of the inner points :rtype: int """ return len(self.points[block_id])
[docs] def get_estimated_nadir_point(self, block_id): """Gets maximum value for each component of the existing columns :param block_id: Block identifier :type block_id: int :return: Vector with maximum components :rtype: ndarray """ w = np.full(shape=self.block_model.cuts.num_of_global_cuts + 1, fill_value=-np.inf) for _, column in self.points[block_id]: w = np.maximum(w, column) return w
[docs] def get_estimated_ideal_point(self, block_id): """Gets minimum value for each component of the existing columns :param block_id: Block identifier :type block_id: int :return: Vector with minimum components :rtype: ndarray """ w = np.full(shape=self.block_model.cuts.num_of_global_cuts + 1, fill_value=np.inf) for _, column in self.points[block_id]: w = np.minimum(w, column) return w
[docs] def check_block(self, kt): # check if kt exists in KT, if not, add kt to KT and return index t # if yes, return index of kt in KT # is_new_hyper_block = False # if kt in self.KT.values(): # block_id = list(self.KT.keys())[list(self.KT.values()).index(kt)] # else: # block_id = max(self.points.keys()) + 1 # self.points[block_id] = [] # self.KT[block_id] = kt # is_new_hyper_block = True is_new_hyper_block = True block_id = None if kt in self.KT.values(): is_new_hyper_block = False block_id = list(self.KT.keys())[list(self.KT.values()).index(kt)] else: # avoid overlapping between hyper-blocks hyper_k = [item[k] for item in self.KT.values() if len(item) > 1 for k in range(len(item))] for k in kt: if k in hyper_k: is_new_hyper_block = False if is_new_hyper_block: block_id = max(self.points.keys()) + 1 self.points[block_id] = [] self.KT[block_id] = kt return block_id, is_new_hyper_block
[docs] def get_num_blocks(self): return len(self.KT)
[docs]class LinearizationCuts: """This class computes and stores the linearization cuts which are used for OA solver :param block_model: Block model :type block_model: BlockModel """
[docs] def __init__(self, block_model): """Constructor method""" self.block_model = block_model self.cuts = {k: [] for k in range(self.block_model.num_blocks)} # List
# of linearization cuts
[docs] def add_linearization_cuts(self, y, eps, x=None, block_id=None): """This method decides for which cases to compute linearization cuts. The cuts are computed with formula :math:`g(y) + \\nabla g(y)^T(x-y) \\leq 0`, where :math:`y` is a linearization point. Depending on the input values the cuts are added in the following cases: * if ``x`` is ``None``, then the cuts are added to active constraint \ at point y. * if ``x`` is not ``None``, then the cuts are added only for \ constraint which are violated at point x and are active at point y * if ``block_id`` is ``None``, then linearization cuts are added to \ the all blocks :param y: Linearization point :type y: BlockVector or ndarray :param eps: Accuracy for checking if the constraint is active :type eps: float :param x: point of OA solution :type x: BlockVector or ndarray or None :param block_id: Block identifier :type block_id: int or None :return: Number of new added cuts :rtype: int """ # linearization cuts at point y are the following: g(y) + # grad g(y)*(x-y) <= 0 number_of_cuts = [0] * self.block_model.num_blocks if x is None: if block_id is None: for k, sub_model in enumerate(self.block_model.sub_models): for j, nonlin_con in enumerate(sub_model.nonlin_constr): viol, val = nonlin_con.eval(y.get_block(k)) if abs(val) < eps: number_of_cuts[k] += \ self.compute_and_add_linearization_cut( k, j, y.get_block(k)) else: for j, nonlin_con in enumerate( self.block_model.sub_models[block_id].nonlin_constr): viol, val = nonlin_con.eval(y) if abs(val) < eps: number_of_cuts[block_id] += \ self.compute_and_add_linearization_cut(block_id, j, y) else: if block_id is None: for k, sub_model in enumerate(self.block_model.sub_models): for j, nonlin_con in enumerate(sub_model.nonlin_constr): viol, val = nonlin_con.eval(x.get_block(k)) if viol > 0: viol, val = nonlin_con.eval(y.get_block(k)) if abs(val) < eps: number_of_cuts[k] += \ self.compute_and_add_linearization_cut( k, j, y.get_block(k)) else: for j, nonlin_con in enumerate( self.block_model.sub_models[block_id].nonlin_constr): viol, val = nonlin_con.eval(x) if viol > 0: viol, val = nonlin_con.eval(y) if abs(val) < eps: number_of_cuts[block_id] += \ self.compute_and_add_linearization_cut( block_id, j, y) return number_of_cuts
[docs] def compute_and_add_linearization_cut(self, block_id, j, y): """Method which computes the linearization cut for nonlinear constraint of the k-th block regarding a reference point y :param block_id: Block identifier :type block_id: int :param j: Nonlinear constraint index :type j: int :param y: Reference point :type y: ndarray :return: ``positive integer``, which correspond to the number of \ added cuts, if all gradient values are ok (and the cut was possible \ to compute), i.e. no :math:`\\infty` numbers, ``0`` - otherwise :rtype: int """ constraint = self.block_model.sub_models[block_id].nonlin_constr[j] gradient = constraint.compute_gradient_at_point(y) # local linear constraint creation g(y) + grad g(y)*(x-y) <= 0 # reformulated grad g(y)*x <= grad(y)*y - g(y), y is given point, # x is variable lhs = [] rhs = 0 for var_name in gradient.keys(): i = constraint.orig_var_names_in_block.index(var_name) if not np.isnan(gradient[var_name]): grad = gradient[var_name] lhs.append((block_id, i, grad)) rhs += y[i] * grad else: return 0 # eval the function g at point y viol, val_g = constraint.eval(y) rhs -= val_g if constraint.relation == '>=': self.cuts[block_id].append( LinearConstraint(lhs, ">=", rhs, self.block_model.block_sizes)) else: self.cuts[block_id].append( LinearConstraint(lhs, "<=", rhs, self.block_model.block_sizes)) return 1
[docs] def num_cuts(self, block_id): """Get number of cuts :param block_id: Block identifier :type block_id: int :return: Number of the cuts :rtype: int """ return len(self.cuts[block_id])
[docs] def __getitem__(self, item): """Index operator :param item: Given index :type item: tuple :return: The cuts associated with the index :rtype: LinearConstraint """ k, i = item return self.cuts[k][i]
[docs]class CompactCuts: """A class that stores valid compact linear cuts got after solving the subproblems. They are defined with :math:`a^Tw \\leq b`, where :math:`a, w \\in \\mathbb{R}^{m+1}, m` is the number of global linear constraints :param block_model: Block model :type block_model: BlockModel """
[docs] def __init__(self, block_model): """Constructor method""" self.block_model = block_model self.cuts = {k: [] for k in range(self.block_model.num_blocks)} # Cuts # stored blockwise as a dictionary with list as values self.rhs = {k: [] for k in range(self.block_model.num_blocks)}
[docs] def add(self, block_id, lhs, relation, rhs): """Add new compact linear cut :param block_id: Block identifier :type block_id: int :param lhs: Left hand side of the constraint :type lhs: ndarray :param relation: Relation of the constraint :type relation: str :param rhs: Right hand side of the constraint :type rhs: float """ self.cuts[block_id].append((copy.deepcopy(lhs), relation)) self.rhs[block_id].append(rhs)
[docs] def __getitem__(self, key): """Index operator :param key: Given index as tuple: (block_id, index) :type key: tuple :return: Cut associated with index :rtype: tuple """ block_id, i = key (lhs, relation) = self.cuts[block_id][i] rhs = self.rhs[block_id][i] value = (lhs, relation, rhs) return value
[docs] def __setitem__(self, key, value): """Index operator :param item: Given index as tuple: (block_id, index) :type item: tuple :param value: Left hand side, relation, right hand side :type value: tuple :return: Cut associated with index :rtype: tuple """ block_id, i = key (lhs, relation, rhs) = value self.cuts[block_id][i] = (lhs, relation) self.rhs[block_id][i] = rhs
[docs] def get_length(self): """ Get number of the cuts for each block :return: List of the cuts number blockwise :rtype: list """ return [len(self.cuts[k]) for k in range(self.block_model.num_blocks)]