Source code for sknetwork.classification.diffusion

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in July 2022
@author: Thomas Bonald <thomas.bonald@telecom-paris.fr>
"""
from typing import Optional, Union

import numpy as np
from scipy import sparse

from sknetwork.classification.base import BaseClassifier
from sknetwork.path.distances import get_distances
from sknetwork.linalg.normalizer import normalize
from sknetwork.utils.format import get_adjacency_values
from sknetwork.utils.membership import get_membership
from sknetwork.utils.neighbors import get_degrees


[docs] class DiffusionClassifier(BaseClassifier): """Node classification by heat diffusion. For each label, the temperature of a node corresponds to its probability to have this label. Parameters ---------- n_iter : int Number of iterations of the diffusion (discrete time). centering : bool If ``True``, center the temperature of each label to its mean before classification (default). scale : float Multiplicative factor applied to tempreatures before softmax (default = 5). Used only when centering is ``True``. Attributes ---------- labels_ : np.ndarray, shape (n_labels,) Labels of nodes. probs_ : sparse.csr_matrix, shape (n_row, n_labels) Probability distribution over labels. labels_row_ : np.ndarray Labels of rows, for bipartite graphs. labels_col_ : np.ndarray Labels of columns, for bipartite graphs. probs_row_ : sparse.csr_matrix, shape (n_row, n_labels) Probability distributions over labels of rows, for bipartite graphs. probs_col_ : sparse.csr_matrix, shape (n_col, n_labels) Probability distributions over labels of columns, for bipartite graphs. Example ------- >>> from sknetwork.data import karate_club >>> diffusion = DiffusionClassifier() >>> graph = karate_club(metadata=True) >>> adjacency = graph.adjacency >>> labels_true = graph.labels >>> labels = {0: labels_true[0], 33: labels_true[33]} >>> labels_pred = diffusion.fit_predict(adjacency, labels) >>> float(round(np.mean(labels_pred == labels_true), 2)) 0.97 References ---------- Zhu, X., Lafferty, J., & Rosenfeld, R. (2005). `Semi-supervised learning with graphs` (Doctoral dissertation, Carnegie Mellon University, language technologies institute, school of computer science). """ def __init__(self, n_iter: int = 10, centering: bool = True, scale: float = 5): super(DiffusionClassifier, self).__init__() if n_iter <= 0: raise ValueError('The number of iterations must be positive.') else: self.n_iter = n_iter self.centering = centering self.scale = scale
[docs] def fit(self, input_matrix: Union[sparse.csr_matrix, np.ndarray], labels: Optional[Union[dict, list, np.ndarray]] = None, labels_row: Optional[Union[dict, list, np.ndarray]] = None, labels_col: Optional[Union[dict, list, np.ndarray]] = None, force_bipartite: bool = False) \ -> 'DiffusionClassifier': """Compute the solution to the Dirichlet problem (temperatures at equilibrium). Parameters ---------- input_matrix : sparse.csr_matrix, np.ndarray Adjacency matrix or biadjacency matrix of the graph. labels : dict, np.ndarray Known labels (dictionary or vector of int). Negative values ignored. labels_row : dict, np.ndarray Labels of rows for bipartite graphs. Negative values ignored. labels_col : dict, np.ndarray Labels of columns for bipartite graphs. Negative values ignored. force_bipartite : bool If ``True``, consider the input matrix as a biadjacency matrix (default = ``False``). Returns ------- self: :class:`DiffusionClassifier` """ adjacency, values, self.bipartite = get_adjacency_values(input_matrix, force_bipartite=force_bipartite, values=labels, values_row=labels_row, values_col=labels_col) labels = values.astype(int) if (labels < 0).all(): raise ValueError('At least one node must be given a non-negative label.') labels_reindex = labels.copy() labels_unique, inverse = np.unique(labels[labels >= 0], return_inverse=True) labels_reindex[labels >= 0] = inverse temperatures = get_membership(labels_reindex).toarray() temperatures_seeds = temperatures[labels >= 0] temperatures[labels < 0] = 0.5 diffusion = normalize(adjacency) for i in range(self.n_iter): temperatures = diffusion.dot(temperatures) temperatures[labels >= 0] = temperatures_seeds if self.centering: temperatures -= temperatures.mean(axis=0) labels_ = labels_unique[temperatures.argmax(axis=1)] # softmax if self.centering: temperatures = np.exp(self.scale * temperatures) # set label -1 to nodes not reached by diffusion distances = get_distances(adjacency, source=np.flatnonzero(labels >= 0)) labels_[distances < 0] = -1 temperatures[distances < 0] = 0 self.labels_ = labels_ self.probs_ = sparse.csr_matrix(normalize(temperatures)) self._split_vars(input_matrix.shape) return self