"""Stores the results obtained by the solver"""
import logging
import time
import numpy as np
import os
logger = logging.getLogger('decogo')
[docs]class Results:
"""A class which stores and makes some manipulations with the results.
The results are: primal/dual bound, primal solution point, timings, gap"""
[docs] def __init__(self):
"""Constructor method"""
self.strategy = None # solver strategy
self.sense = None # sense of the original problem, i.e. min or max
# general timing info
self.start_clock_time = 0 # stores time of CPU at start of the solution process
self.total_time = 0 # total time spent for running the solver
self.decomp_time = 0 # time spent for decomposition
self.containers_time = 0 # time spent for containers creation
# region OA summary info
self.sub_problem_time = 0 # time spent for solving sub problems
self.init_time = 0 # time spent for oa_start
self.main_alg_time = 0 # time spent for oa main algorithm
self.num_iter = 0 # number of iterations
self.primal_bound = float('inf') # primal bound
self.primal_sol_point = None # solution to primal bound of optimal value
self.dual_bound = float('-inf') # dual bound
# endregion
# region DIOR summary info
self.main_iterations = 0 # number of main iterations
self.optimal_subproblems = True # indicates if all subproblems were solved to optimality
self.num_cg_iterations = 0 # number of CG iterations
self.cg_num_minlp_problems = 0 # number of MINLP subproblems during CG
self.cg_num_unfixed_nlp_problems = 0 # number of unfixed NLP subproblems during CG
self.cg_num_fixed_nlp_problems = 0 # number of fixed NLP subproblems during CG
self.cg_relaxation = float('inf') # objective value of CG relaxation
self.num_outer_iterations = 0 # number inner outer MIP iterations
self.compact_oa_obj_val = float('-inf') # objective value of compact OA
self.compact_ia_obj_val = float('-inf') # objective value of compact MIP IA
self.num_of_columns_after_cg = 0 # number of columns after CG
self.sub_problem_number_after_cg = 0 # number of solved subproblems after CG
self.total_number_columns = 0 # total number of columns
self.total_sub_problem_number = 0 # total number of solved subproblems
self.c_tilde_y = float('inf') # objective value in the NLP projected point
self.total_number_columns_blockwise = {} # total number of columns
self.current_used_time = float('inf') # used time during the running
# endregion
self.sub_problem_data = SubProblemData()
[docs] def set_new_primal_bound(self, primal_bound, point):
"""Sets new primal bound and primal point.
:param primal_bound: Primal bound
:type primal_bound: float
:param point: Point which corresponds to the primal bound
:type point: BlockVector
"""
improved = False
if primal_bound < self.primal_bound:
improved = True
self.primal_bound = primal_bound
self.primal_sol_point = point
return improved
[docs] def get_gap(self, primal_bound, dual_bound):
"""Computes the gap in percentage between primal and dual bound
:param primal_bound: Primal bound
:type primal_bound: float
:param dual_bound: Dual bound
:type dual_bound: float
:return: Gap
:rtype: float
"""
diff = abs(dual_bound - primal_bound)
return round(
100 * diff / (max(abs(primal_bound), abs(dual_bound)) + 1e-5),
10) # or min?
[docs] def print_results(self):
"""Prints summary of results at the end of the logging process
depending on the strategy"""
self.calculate_total_time()
self.print_decomp_time_and_total_time()
if self.strategy == 'OA':
self.print_oa_stats()
elif self.strategy == 'CG':
self.print_col_gen_stats()
elif self.strategy == 'DBCG':
self.print_col_gen_stats()
[docs] def print_decomp_time_and_total_time(self):
"""Prints total time and decomposition time"""
logger.info('\n{0: <50}{1: <30}'.format('Total time:', self.total_time))
logger.info('\n{0: <50}{1: <30}'.format('Reformulation time:',
self.decomp_time + self.containers_time))
logger.info(
'{0: <50}{1: <30}'.format('Decomposition time:', self.decomp_time))
logger.info(
'{0: <50}{1: <30}'.format('Containers time:', self.containers_time))
[docs] def print_oa_stats(self):
"""Prints OA stats"""
logger.info(
'\n{0: <50}{1: <30}'.format('Number of iterations:', self.num_iter))
logger.info('{0: <50}{1: <30}'.format('Dual bound:',
self.sense * self.dual_bound))
logger.info(
'{0: <50}{1: <30}'.format('Primal bound:',
round(self.sense * self.primal_bound, 10)))
logger.info('{0: <50}{1: <30}'.format('Gap:',
self.get_gap(self.primal_bound,
self.dual_bound)))
logger.info('\n{0: <50}{1: <30}'.format('Oa start time:',
self.init_time))
logger.info('{0: <50}{1: <30}'.format('Oa main algorithm time:',
self.main_alg_time))
logger.info('{0: <50}{1: <30}'.format('Algorithm time:',
self.init_time + self.main_alg_time))
logger.info('{0: <50}{1: <30}'.format('Sub problems time:',
self.sub_problem_time))
[docs] def print_col_gen_stats(self):
"""Print Column Generation stats"""
logger.info(
'\n{0: <50}{1: <30}'.format('Primal bound:',
round(self.sense * self.primal_bound,
10)))
logger.info('\n{0: <50}{1: <30}'.format('Main iterations:', self.main_iterations))
logger.info('{0: <50}{1: <30}'.format('Number of CG iterations:',
self.num_cg_iterations))
logger.info('{0: <50}{1: <30}'.format('CG relaxation obj. value:',
self.sense * self.cg_relaxation))
logger.info('{0: <50}{1: <30}'.format('Number of MINLP subproblems:',
self.cg_num_minlp_problems))
logger.info(
'{0: <50}{1: <30}'.format('Number of unfixed NLP subproblems:',
self.cg_num_unfixed_nlp_problems))
logger.info(
'{0: <50}{1: <30}'.format('Number of fixed NLP subproblems:',
self.cg_num_fixed_nlp_problems))
logger.info(
'{0: <50}{1: <30}'.format('Number of solved sub-problems after CG:',
self.sub_problem_number_after_cg))
logger.info('{0: <50}{1: <30}'.format('Number of columns after CG:',
self.num_of_columns_after_cg))
# logger.info(
# '{0: <50}{1: <30}'.format('CG Gap (CG relaxation and primal bound):',
# self.get_gap(self.primal_bound,
# self.cg_relaxation)))
logger.info('{0: <50}{1: <30}'.format('Total number of columns:',
self.total_number_columns))
[docs] def add_sub_problem_time(self, time):
"""Accumulates sub-problem time
:param time: Time to add
:type time: float
"""
self.sub_problem_time += time
[docs] def calculate_total_time(self):
"""Calculates total time"""
self.total_time = time.time() - self.start_clock_time
[docs] def add_sub_problem_data(self, block_id, dir_im_space, dir_orig_space,
column, point, ia_sol):
""""Add sub-problem data by calling :meth:`SubProblemData.add_data`"""
self.sub_problem_data.add_data_im_space(block_id, dir_im_space, column)
self.sub_problem_data.add_data_orig_space(block_id,
dir_orig_space,
point,
ia_sol)
[docs]class SubProblemData:
"""A class which stores direction and columns
"""
[docs] def __init__(self):
"""Constructor method"""
self.data_orig_space = {}
self.data_im_space = {}
[docs] def __getitem__(self, item):
"""Index operator
:param item: Given index as tuple (block_id, index)
:type item: tuple
:return: tuple (point, column), i.e point and corresponding column
:rtype: tuple
"""
k, j = item
return self.data[k][j]
[docs] def add_data_im_space(self, block_id, dir_im_space, column):
"""Add sub-problem data
:param block_id: Block identifier
:type block_id: int
:param dir_im_space: Given direction
:type dir_im_space: ndarray
:param column: Given column
:type column: ndarray
:return: Tuple which containes direction, column
:rtype: tuple
"""
if block_id not in self.data_im_space.keys():
self.data_im_space[block_id] = {}
if self.data_im_space[block_id]:
self.data_im_space[block_id]['input'] = \
np.vstack((self.data_im_space[block_id]['input'],
dir_im_space[np.newaxis]))
self.data_im_space[block_id]['output'] = \
np.vstack((self.data_im_space[block_id]['output'],
column[np.newaxis]))
else:
# empty dataset, dir_im_space, column, 1 dim -> 2 dim array
self.data_im_space[block_id]['input'] = \
dir_im_space[np.newaxis]
self.data_im_space[block_id]['output'] = column[np.newaxis]
return dir_im_space, column
[docs] def add_data_orig_space(self, block_id, dir_orig_space, point, ia_sol=None):
"""Add sub-problem data
:param block_id: Block identifier
:type block_id: int
:param dir_orig_space: Given direction
:type dir_orig_space: ndarray
:param point: Given point
:type point: ndarray
:return: Tuple which containes direction, point
:rtype: tuple
"""
if block_id not in self.data_orig_space.keys():
self.data_orig_space[block_id] = {}
if self.data_orig_space[block_id]:
self.data_orig_space[block_id]['input'] = \
np.vstack((self.data_orig_space[block_id]['input'],
dir_orig_space[np.newaxis]))
self.data_orig_space[block_id]['output'] = \
np.vstack((self.data_orig_space[block_id]['output'],
point[np.newaxis]))
if ia_sol is not None:
self.data_orig_space[block_id]['ia_sol'] = \
np.vstack((self.data_orig_space[block_id]['ia_sol'],
ia_sol[np.newaxis]))
else:
ia_sol = np.zeros(point.shape)
ia_sol[0] = float('inf')
self.data_orig_space[block_id]['ia_sol'] = \
np.vstack((self.data_orig_space[block_id]['ia_sol'],
ia_sol[np.newaxis]))
logger.info('- no ia sol for sub-problem solving')
else:
# empty dataset, dir_im_space, column, 1 dim -> 2 dim array
self.data_orig_space[block_id]['input'] = \
dir_orig_space[np.newaxis]
self.data_orig_space[block_id]['output'] = point[np.newaxis]
if ia_sol is not None:
self.data_orig_space[block_id]['ia_sol'] = ia_sol[np.newaxis]
else:
ia_sol = np.zeros(point.shape)
ia_sol[0] = float('inf')
self.data_orig_space[block_id]['ia_sol'] = ia_sol[np.newaxis]
logger.info('- no ia sol for sub-problem solving')
return dir_orig_space, point
[docs] def export_data(self, folder_name='data', exact_solver=False):
wd = os.getcwd()
path = '{0}\\{1}'.format(wd, folder_name)
try:
os.mkdir(path)
if exact_solver is True:
path = '{0}\\{1}\\exact_solver'.format(wd, folder_name)
os.mkdir(path)
except OSError:
pass
if exact_solver is True:
try:
path = '{0}\\{1}\\exact_solver'.format(wd, folder_name)
os.mkdir(path)
except OSError:
pass
for block_id, data_np in self.data_orig_space.items():
if exact_solver is True:
input_txt = \
'{0}\\{1}\\exact_solver\\block_{2}_input_orig_space.csv'.\
format(wd, folder_name, block_id)
output_txt = \
'{0}\\{1}\\exact_solver\\block_{2}_output_orig_space.csv'\
.format(wd, folder_name, block_id)
ia_sol_txt = \
'{0}\\{1}\\exact_solver\\block_{2}_ia_orig_space.csv'\
.format(wd, folder_name, block_id)
else:
input_txt = \
'{0}\\{1}\\block_{2}_input_orig_space.csv'.\
format(wd, folder_name, block_id)
output_txt = \
'{0}\\{1}\\block_{2}_output_orig_space.csv'\
.format(wd, folder_name, block_id)
if 'input' in data_np.keys():
np.savetxt(input_txt, data_np['input'], delimiter=',')
if 'output' in data_np.keys():
np.savetxt(output_txt, data_np['output'], delimiter=',')
if 'ia_sol' in data_np.keys():
np.savetxt(ia_sol_txt, data_np['ia_sol'], delimiter=',')
for block_id, data_np in self.data_im_space.items():
if exact_solver is True:
input_txt = \
'{0}\\{1}\\exact_solver\\block_{2}_input_im_space.csv'. \
format(wd, folder_name, block_id)
output_txt = \
'{0}\\{1}\\exact_solver\\block_{2}_output_im_space.csv' \
.format(wd, folder_name, block_id)
else:
input_txt = \
'{0}\\{1}\\block_{2}_input_im_space.csv'. \
format(wd, folder_name, block_id)
output_txt = \
'{0}\\{1}\\block_{2}_output_im_space.csv' \
.format(wd, folder_name, block_id)
# np.savetxt(input_txt, data_np['input'], delimiter=',')
np.savetxt(output_txt, data_np['output'], delimiter=',')