#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in July 2019
@author: Nathan de Lara <nathan.delara@polytechnique.org>
@author: Thomas Bonald <thomas.bonald@telecom-paris.fr>
"""
from typing import Union, Optional, Tuple
import numpy as np
from scipy import sparse
from sknetwork.linalg.normalizer import normalize
from sknetwork.regression.base import BaseRegressor
from sknetwork.utils import get_adjacency_values, get_degrees
def init_temperatures(seeds: np.ndarray, init: Optional[float]) -> Tuple[np.ndarray, np.ndarray]:
"""Init temperatures."""
n = len(seeds)
border = (seeds >= 0)
if init is None:
temperatures = seeds[border].mean() * np.ones(n)
else:
temperatures = init * np.ones(n)
temperatures[border] = seeds[border]
return temperatures, border
[docs]
class Diffusion(BaseRegressor):
"""Regression by diffusion along the edges, given the temperatures of some seed nodes (heat equation).
The row vector of tempreatures :math:`T` evolves like:
:math:`T \\gets (1-\\alpha) T + \\alpha PT`
where :math:`\\alpha` is the damping factor and :math:`P` is the transition matrix of the random walk in the graph.
All values are updated, including those of seed nodes (free diffusion).
See ``Dirichlet`` for diffusion with boundary constraints.
Parameters
----------
n_iter : int
Number of iterations of the diffusion (must be positive).
damping_factor : float
Damping factor.
Attributes
----------
values_ : np.ndarray
Value of each node (= temperature).
values_row_: np.ndarray
Values of rows, for bipartite graphs.
values_col_: np.ndarray
Values of columns, for bipartite graphs.
Example
-------
>>> from sknetwork.data import house
>>> diffusion = Diffusion(n_iter=1)
>>> adjacency = house()
>>> values = {0: 1, 2: 0}
>>> values_pred = diffusion.fit_predict(adjacency, values)
>>> np.round(values_pred, 1)
array([0.8, 0.5, 0.2, 0.4, 0.6])
References
----------
Chung, F. (2007). The heat kernel as the pagerank of a graph. Proceedings of the National Academy of Sciences.
"""
def __init__(self, n_iter: int = 3, damping_factor: float = 0.5):
super(Diffusion, self).__init__()
if n_iter <= 0:
raise ValueError('The number of iterations must be positive.')
else:
self.n_iter = n_iter
self.damping_factor = damping_factor
self.bipartite = None
[docs]
def fit(self, input_matrix: Union[sparse.csr_matrix, np.ndarray],
values: Optional[Union[dict, list, np.ndarray]] = None,
values_row: Optional[Union[dict, list, np.ndarray]] = None,
values_col: Optional[Union[dict, list, np.ndarray]] = None, init: Optional[float] = None,
force_bipartite: bool = False) -> 'Diffusion':
"""Compute the diffusion (temperatures at equilibrium).
Parameters
----------
input_matrix :
Adjacency matrix or biadjacency matrix of the graph.
values :
Temperatures of nodes in initial state (dictionary or vector). Negative temperatures ignored.
values_row, values_col :
Temperatures of rows and columns for bipartite graphs. Negative temperatures ignored.
init :
Temperature of nodes in initial state.
If ``None``, use the average temperature of seed nodes (default).
force_bipartite :
If ``True``, consider the input matrix as a biadjacency matrix (default = ``False``).
Returns
-------
self: :class:`Diffusion`
"""
adjacency, values, self.bipartite = get_adjacency_values(input_matrix, force_bipartite=force_bipartite,
values=values,
values_row=values_row,
values_col=values_col)
values, _ = init_temperatures(values, init)
diffusion = normalize(adjacency.T.tocsr())
degrees = get_degrees(diffusion)
diag = sparse.diags((degrees == 0).astype(int)).tocsr()
diffusion += diag
diffusion = (1 - self.damping_factor) * sparse.identity(len(degrees)).tocsr() + self.damping_factor * diffusion
for i in range(self.n_iter):
values = diffusion.dot(values)
self.values_ = values
if self.bipartite:
self._split_vars(input_matrix.shape)
return self
[docs]
class Dirichlet(BaseRegressor):
"""Regression by the Dirichlet problem (heat diffusion with boundary constraints).
The temperatures of some seed nodes are fixed. The temperatures of other nodes are computed.
Parameters
----------
n_iter : int
Number of iterations of the diffusion (must be positive).
Attributes
----------
values_ : np.ndarray
Value of each node (= temperature).
values_row_: np.ndarray
Values of rows, for bipartite graphs.
values_col_: np.ndarray
Values of columns, for bipartite graphs.
Example
-------
>>> from sknetwork.regression import Dirichlet
>>> from sknetwork.data import house
>>> dirichlet = Dirichlet()
>>> adjacency = house()
>>> values = {0: 1, 2: 0}
>>> values_pred = dirichlet.fit_predict(adjacency, values)
>>> np.round(values_pred, 2)
array([1. , 0.54, 0. , 0.31, 0.62])
References
----------
Chung, F. (2007). The heat kernel as the pagerank of a graph. Proceedings of the National Academy of Sciences.
"""
def __init__(self, n_iter: int = 10):
super(Dirichlet, self).__init__()
if n_iter <= 0:
raise ValueError('The number of iterations must be positive.')
else:
self.n_iter = n_iter
self.bipartite = None
[docs]
def fit(self, input_matrix: Union[sparse.csr_matrix, np.ndarray],
values: Optional[Union[dict, list, np.ndarray]] = None,
values_row: Optional[Union[dict, list, np.ndarray]] = None,
values_col: Optional[Union[dict, list, np.ndarray]] = None, init: Optional[float] = None,
force_bipartite: bool = False) -> 'Dirichlet':
"""Compute the solution to the Dirichlet problem (temperatures at equilibrium).
Parameters
----------
input_matrix :
Adjacency matrix or biadjacency matrix of the graph.
values :
Temperatures of nodes (dictionary or vector). Negative temperatures ignored.
values_row, values_col :
Temperatures of rows and columns for bipartite graphs. Negative temperatures ignored.
init :
Temperature of nodes in initial state.
If ``None``, use the average temperature of seed nodes (default).
force_bipartite :
If ``True``, consider the input matrix as a biadjacency matrix (default = ``False``).
Returns
-------
self: :class:`Dirichlet`
"""
adjacency, values, self.bipartite = get_adjacency_values(input_matrix, force_bipartite=force_bipartite,
values=values,
values_row=values_row,
values_col=values_col)
temperatures, border = init_temperatures(values, init)
values = temperatures.copy()
diffusion = normalize(adjacency)
for i in range(self.n_iter):
values = diffusion.dot(values)
values[border] = temperatures[border]
self.values_ = values
if self.bipartite:
self._split_vars(input_matrix.shape)
return self