"""Implements sparse and dense block vector data structures"""
import copy
from collections import Sequence
from collections import defaultdict
import numpy as np
import math
from scipy.sparse import coo_matrix
[docs]class SparseBlockVector:
"""Class for storing the values in sparse format blockwise.
It consists of k blocks. Each block uses the standard Python sparse
matrix with dimension :math:`1 \\times n_k`, where :math:`n_k`
is the block size
:param vectors: Sparse matrices stored blockwise
:type vectors: dict
:param blocks_size: Size of the blocks
:type blocks_size: list
:param blocks: Block ids which contain nonzero values
:type blocks: list
"""
[docs] def __init__(self, values, blocks_size):
"""Constructor method
:param values: List of tuples (block_id, index, value)
:type values: list
:param blocks_size: Number of variables per block
:type blocks_size: list
"""
self.vectors = {}
self.blocks_size = blocks_size
# dictionary: block -> data, col
data, col = defaultdict(list), defaultdict(list)
for (item_block, item_column, item_data) in values:
data[item_block].append(item_data)
col[item_block].append(item_column)
for block_id in data.keys():
row = [0] * len(col[block_id])
column_size = blocks_size[block_id]
self.vectors[block_id] = coo_matrix(
(data[block_id], (row, col[block_id])),
shape=(1, column_size)).tocsr()
self.blocks = list(self.vectors.keys())
[docs] def __getitem__(self, item):
"""Index operator
:param item: Input index as tuple: (block_id, index)
:type item: tuple
:return: Value associated with the given index
:rtype: float
"""
block_id, column = item
try:
elem = self.vectors[block_id][0, column]
except KeyError:
elem = 0
return elem
[docs] def scalar_prod(self, x):
"""Scalar product computation (or dot product) computed blockwise,
:math:`v=\\sum_{k \\in K}a_k^Tx_k`
:param x: Input vector
:type x: SparseBlockVector or BlockVector
:return: Value for scalar product :math:`v`
:rtype: float
"""
val = 0
for block_id in x.vectors.keys():
try:
val += self.vectors[block_id].dot(x.vectors[block_id])
except KeyError:
val += np.array([0.0])
return val[0]
[docs] def maxabs(self, block_id=None):
"""Finds maximum element by absolute value
:param block_id: Block id where to look for the maximum value
:type block_id: int, optional
:return: Max element by absolute value
:rtype: float or None
"""
try:
if block_id is not None:
max_elem = abs(max(self.vectors[block_id].min(),
self.vectors[block_id].max(), key=abs))
else:
max_elem = float('-inf')
for k in self.blocks:
elem = abs(max(self.vectors[k].min(), self.vectors[k].max(),
key=abs))
max_elem = max(elem, max_elem)
except KeyError:
max_elem = None
return max_elem
@property
def num_blocks(self):
"""Return number of blocks
:return: Return number of blocks
:rtype: int
"""
return len(self.blocks)
[docs] def get_block(self, block_id):
"""Gets vector associated with the given block id
:param block_id: Given block identifier
:type block_id: int
:return: Sparse vector (matrix)
:rtype: coo_matrix or None
"""
if block_id in self.vectors:
return self.vectors[block_id]
else:
return None
[docs] def get_block_size(self, k):
"""Get block size associated with the given block id
:param k: Given block identifier
:type k: int
:return: Block size
:rtype: int
"""
return self.blocks_size[k]
[docs] def get_norm(self):
norm_value = 0
for k in self.blocks:
norm_value += sum(i**2 for i in self.vectors[k].data)
norm_value = math.sqrt(norm_value)
return norm_value
[docs]class BlockVector:
"""This class stores dense vector with the respect to blocks
:param vectors: Dense vectors stored blockwise
:type vectors: dict
:param blocks_size: Size of the blocks
:type blocks_size: list
:param blocks: Block identifiers
:type blocks: list
"""
[docs] def __init__(self, blocks_size=None):
"""Constructor method"""
self.vectors = {}
self.blocks_size = blocks_size
if blocks_size is not None:
for block_id in range(len(blocks_size)):
self.vectors[block_id] = np.zeros(shape=blocks_size[block_id],
dtype=float)
self.blocks = list(self.vectors.keys())
[docs] def __getitem__(self, item):
"""Index operator
:param item: Input index in tuple (block_id, index)
:type item: tuple
:return: Value associated to index
:rtype: float
"""
block_id, index = item
return self.vectors[block_id][index]
[docs] def get_block(self, block_id):
"""Get vector associated to the blocks
:param block_id: Block identifier
:return: Vector
:rtype: ndarray
"""
return self.vectors[block_id]
[docs] def set_block(self, block_id, array):
"""Sets vector associated to block
:param block_id: Block identifier
:type block_id: int
:param array: Array to set
:type array: ndarray
"""
self.vectors[block_id] = array
self.blocks = list(self.vectors.keys())
[docs] def __setitem__(self, key, value):
"""Index operator
:param key: Index as tuple: (block_id, index)
:type key: tuple
:param value: Value to set
:type value: float
"""
block_id, index = key
self.vectors[block_id][index] = value
[docs] def __str__(self):
"""String operator
:return: Nicer way of string representation
:rtype: str
"""
val = \
'\n'.join(np.array2string(self.vectors[key],
formatter={'float': lambda x: "%.3f" % x})
for key in self.vectors.keys())
return val
@property
def num_blocks(self):
"""Return number of blocks
:return: Number of blocks
:rtype: int
"""
return len(self.vectors.keys())
[docs] def __add__(self, other):
"""Operator +
:param other: Other vector
:type other: BlockVector
:return: Sum of two vectors
:rtype: BlockVector
"""
res = copy.deepcopy(self)
for k in range(self.num_blocks):
for n in range(len(self.vectors[k])):
res[k, n] = self[k, n] + other[k, n]
return res
[docs] def __sub__(self, other):
"""Operator -
:param other: Other vector
:type other: BlockVector
:return: Difference of two vectors
:rtype: BlockVector
"""
res = copy.deepcopy(self)
for k in range(self.num_blocks):
for n in range(len(self.vectors[k])):
res[k, n] = self[k, n] - other[k, n]
return res
[docs] def __eq__(self, other):
"""Operator ==, checks if two vectors are equal
:param other: Other vector
:type other: BlockVector
:return: True or False
:rtype: bool
"""
equal = True
for k in range(self.num_blocks):
for n in range(len(self.vectors[k])):
if abs(self[k, n] - other[k, n]) < 0.0001:
equal = True
else:
return False
return equal
[docs] def __mul__(self, other):
"""Multiplication operator. Currently supports only scalar types for
multiplication
:param other: Scalar value
:type other: float
:return: Vector multiplied by scalar value
:rtype: BlockVector
:raise ValueError: If :attr:`other` is not a sequence
"""
# supports only scalar types for multiplication
# check if 'other' is scalar
if isinstance(other, Sequence) is False:
res = BlockVector(self.blocks_size)
for k in range(self.num_blocks):
res.vectors[k] = other * self.vectors[k]
return res
else:
raise ValueError('Multiplication for sequences is not supported')
[docs] def __truediv__(self, other):
"""True division operator. Currently supports only scalar types for
division
:param other: Scalar value
:type other: float
:return: Vector divided by scalar value
:rtype: BlockVector
:raise ValueError: If :attr:`other` is not a sequence
"""
# supports only scalar types for division
# check if 'other' is scalar
if isinstance(other, Sequence) is False:
res = BlockVector(self.blocks_size)
for k in range(self.num_blocks):
res.vectors[k] = self.vectors[k] / other
return res
else:
raise ValueError('Division for sequences is not supported')
[docs] def __abs__(self):
"""Absolute value operator
:return: New vector with absolute values
:rtype: BlockVector
"""
res = copy.deepcopy(self)
for k in range(self.num_blocks):
res.set_block(k, np.abs(res.get_block(k)))
return res
[docs]def infinity_norm(x):
"""Computes infinity norm, i.e.
:math:`||x||_\\infty = \\max\\limits_{k \\in K, i \\in n_k}|x_{ki}|`
:param x: Input vector
:type x: BlockVector or SparseBlockVector
:return: Value of infinity norm
:rtype: float
"""
res = 0
for k in x.blocks:
res = max(res, np.linalg.norm(x.get_block(k), ord=np.inf))
return res