Source code for sknetwork.classification.diffusion

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in July 2022
@author: Thomas Bonald <thomas.bonald@telecom-paris.fr>
"""
from typing import Optional, Union

import numpy as np
from scipy import sparse

from sknetwork.classification.base import BaseClassifier
from sknetwork.path.distances import get_distances
from sknetwork.linalg.normalization import normalize
from sknetwork.utils.format import get_adjacency_values
from sknetwork.utils.membership import get_membership
from sknetwork.utils.neighbors import get_degrees


[docs]class DiffusionClassifier(BaseClassifier): """Node classification by heat diffusion. For each label, the temperature of a node corresponds to its probability to have this label. Parameters ---------- n_iter : int Number of iterations of the diffusion (discrete time). centering : bool If ``True``, center the temperature of each label to its mean before classification (default). scale : float Multiplicative factor applied to tempreatures before softmax (default = 5). Used only when centering is ``True``. Attributes ---------- labels_ : np.ndarray, shape (n_labels,) Labels of nodes. probs_ : sparse.csr_matrix, shape (n_row, n_labels) Probability distribution over labels. labels_row_, labels_col_ : np.ndarray Labels of rows and columns, for bipartite graphs. probs_row_, probs_col_ : sparse.csr_matrix, shape (n_row, n_labels) Probability distributions over labels for rows and columns (for bipartite graphs). Example ------- >>> from sknetwork.data import karate_club >>> diffusion = DiffusionClassifier() >>> graph = karate_club(metadata=True) >>> adjacency = graph.adjacency >>> labels_true = graph.labels >>> labels = {0: labels_true[0], 33: labels_true[33]} >>> labels_pred = diffusion.fit_predict(adjacency, labels) >>> np.round(np.mean(labels_pred == labels_true), 2) 0.97 References ---------- Zhu, X., Lafferty, J., & Rosenfeld, R. (2005). `Semi-supervised learning with graphs` (Doctoral dissertation, Carnegie Mellon University, language technologies institute, school of computer science). """ def __init__(self, n_iter: int = 10, centering: bool = True, scale: float = 5): super(DiffusionClassifier, self).__init__() if n_iter <= 0: raise ValueError('The number of iterations must be positive.') else: self.n_iter = n_iter self.centering = centering self.scale = scale
[docs] def fit(self, input_matrix: Union[sparse.csr_matrix, np.ndarray], labels: Optional[Union[dict, np.ndarray]] = None, labels_row: Optional[Union[dict, np.ndarray]] = None, labels_col: Optional[Union[dict, np.ndarray]] = None, force_bipartite: bool = False) \ -> 'DiffusionClassifier': """Compute the solution to the Dirichlet problem (temperatures at equilibrium). Parameters ---------- input_matrix : Adjacency matrix or biadjacency matrix of the graph. labels : Known labels (dictionary or vector of int). Negative values ignored. labels_row, labels_col : Labels of rows and columns for bipartite graphs. Negative values ignored. force_bipartite : If ``True``, consider the input matrix as a biadjacency matrix (default = ``False``). Returns ------- self: :class:`DiffusionClassifier` """ adjacency, values, self.bipartite = get_adjacency_values(input_matrix, force_bipartite=force_bipartite, values=labels, values_row=labels_row, values_col=labels_col) labels = values.astype(int) if (labels < 0).all(): raise ValueError('At least one node must be given a non-negative label.') temperatures = get_membership(labels).toarray() temperatures_seeds = temperatures[labels >= 0] temperatures[labels < 0] = 0.5 diffusion = normalize(adjacency) for i in range(self.n_iter): temperatures = diffusion.dot(temperatures) temperatures[labels >= 0] = temperatures_seeds if self.centering: temperatures -= temperatures.mean(axis=0) labels_ = temperatures.argmax(axis=1) # softmax if self.centering: temperatures = np.exp(self.scale * temperatures) # set label -1 to nodes not reached by diffusion distances = get_distances(adjacency, source=np.flatnonzero(labels >= 0)) labels_[distances < 0] = -1 temperatures[distances < 0] = 0 self.labels_ = labels_ self.probs_ = sparse.csr_matrix(normalize(temperatures)) self._split_vars(input_matrix.shape) return self