Source code for sknetwork.classification.knn

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in November 2019
@author: Nathan de Lara <nathan.delara@polytechnique.org>
@author: Thomas Bonald <tbonald@enst.fr>
"""
from typing import Optional, Union

import numpy as np
from scipy import sparse

from sknetwork.classification.base import BaseClassifier
from sknetwork.embedding.base import BaseEmbedding
from sknetwork.linalg.normalization import get_norms, normalize
from sknetwork.utils.check import check_n_neighbors
from sknetwork.utils.format import get_adjacency_values


[docs]class NNClassifier(BaseClassifier): """Node classification by K-nearest neighbors in the embedding space. Parameters ---------- n_neighbors : Number of nearest neighbors . embedding_method : Embedding method used to represent nodes in vector space. If ``None`` (default), use identity. normalize : If ``True``, apply normalization so that all vectors have norm 1 in the embedding space. Attributes ---------- labels_ : np.ndarray, shape (n_labels,) Labels of nodes. probs_ : sparse.csr_matrix, shape (n_row, n_labels) Probability distribution over labels. labels_row_, labels_col_ : np.ndarray Labels of rows and columns, for bipartite graphs. probs_row_, probs_col_ : sparse.csr_matrix, shape (n_row, n_labels) Probability distributions over labels for rows and columns (for bipartite graphs). Example ------- >>> from sknetwork.classification import NNClassifier >>> from sknetwork.data import karate_club >>> classifier = NNClassifier(n_neighbors=1) >>> graph = karate_club(metadata=True) >>> adjacency = graph.adjacency >>> labels_true = graph.labels >>> labels = {0: labels_true[0], 33: labels_true[33]} >>> labels_pred = classifier.fit_predict(adjacency, labels) >>> np.round(np.mean(labels_pred == labels_true), 2) 0.82 """ def __init__(self, n_neighbors: int = 3, embedding_method: Optional[BaseEmbedding] = None, normalize: bool = True): super(NNClassifier, self).__init__() self.n_neighbors = n_neighbors self.embedding_method = embedding_method self.normalize = normalize @staticmethod def _instantiate_vars(labels: np.ndarray): index_train = np.flatnonzero(labels >= 0) index_test = np.flatnonzero(labels < 0) return index_train, index_test def _fit_core(self, embedding, labels, index_train, index_test): n_neighbors = check_n_neighbors(self.n_neighbors, len(index_train)) norms_train = get_norms(embedding[index_train], p=2) neighbors = [] for i in index_test: vector = embedding[i] if sparse.issparse(vector): vector = vector.toarray().ravel() distances = norms_train**2 - 2 * embedding[index_train].dot(vector) + np.sum(vector**2) neighbors += list(index_train[np.argpartition(distances, n_neighbors)[:n_neighbors]]) labels_neighbor = labels[neighbors] # membership matrix row = list(np.repeat(index_test, n_neighbors)) col = list(labels_neighbor) data = list(np.ones_like(labels_neighbor)) row += list(index_train) col += list(labels[index_train]) data += list(np.ones_like(index_train)) probs = normalize(sparse.csr_matrix((data, (row, col)), shape=(len(labels), np.max(labels) + 1))) labels = np.argmax(probs.toarray(), axis=1) return probs, labels
[docs] def fit(self, input_matrix: Union[sparse.csr_matrix, np.ndarray], labels: Union[np.ndarray, dict] = None, labels_row: Union[np.ndarray, dict] = None, labels_col: Union[np.ndarray, dict] = None) -> 'NNClassifier': """Node classification by k-nearest neighbors in the embedding space. Parameters ---------- input_matrix : Adjacency matrix or biadjacency matrix of the graph. labels : Known labels (dictionary or array). Negative values ignored. labels_row, labels_col : Known labels of rows and columns (for bipartite graphs). Returns ------- self: :class:`KNN` """ adjacency, labels, self.bipartite = get_adjacency_values(input_matrix, values=labels, values_row=labels_row, values_col=labels_col) labels = labels.astype(int) index_seed, index_remain = self._instantiate_vars(labels) if self.embedding_method is None: embedding = adjacency else: embedding = self.embedding_method.fit_transform(adjacency) if self.normalize: embedding = normalize(embedding, p=2) probs, labels = self._fit_core(embedding, labels, index_seed, index_remain) self.labels_ = labels self.probs_ = probs self._split_vars(input_matrix.shape) return self