from typing import Optional, Union

import numpy as np
from scipy import sparse
from scipy.spatial import cKDTree

from sknetwork.classification.base import BaseClassifier
from sknetwork.embedding.base import BaseEmbedding
from sknetwork.embedding.svd import GSVD
from sknetwork.linalg.normalization import normalize
from sknetwork.utils.check import check_n_neighbors, check_n_jobs
from sknetwork.utils.format import get_adjacency_seeds

[docs]class KNN(BaseClassifier): """Node classification by K-nearest neighbors in the embedding space. For bigraphs, classify rows only (see ``BiKNN`` for joint classification of rows and columns). Parameters ---------- embedding_method : Which algorithm to use to project the nodes in vector space. Default is ``GSVD``. n_neighbors : Number of nearest neighbors to consider. factor_distance : Power weighting factor :math:`\\alpha` applied to the distance to each neighbor. Neighbor at distance :math:``d`` has weight :math:`1 / d^\\alpha`. Default is 2. leaf_size : Leaf size passed to KDTree. p : Which Minkowski p-norm to use. Default is 2 (Euclidean distance). tol_nn : Tolerance in nearest neighbors search; the k-th returned value is guaranteed to be no further than ``1 + tol_nn`` times the distance to the actual k-th nearest neighbor. n_jobs : Number of jobs to schedule for parallel processing. If -1 is given all processors are used. Attributes ---------- labels_ : np.ndarray, shape (n_labels,) Label of each node. membership_ : sparse.csr_matrix, shape (n_row, n_labels) Membership matrix. labels_row_ : np.ndarray Labels of rows, for bipartite graphs. labels_col_ : np.ndarray Labels of columns, for bipartite graphs. membership_row_ : sparse.csr_matrix, shape (n_row, n_labels) Membership matrix of rows, for bipartite graphs. membership_col_ : sparse.csr_matrix, shape (n_col, n_labels) Membership matrix of columns, for bipartite graphs. Example ------- >>> from sknetwork.classification import KNN >>> from sknetwork.embedding import GSVD >>> from import karate_club >>> knn = KNN(GSVD(3), n_neighbors=1) >>> graph = karate_club(metadata=True) >>> adjacency = graph.adjacency >>> labels_true = graph.labels >>> seeds = {0: labels_true[0], 33: labels_true[33]} >>> labels_pred = knn.fit_transform(adjacency, seeds) >>> np.round(np.mean(labels_pred == labels_true), 2) 0.97 """ def __init__(self, embedding_method: BaseEmbedding = GSVD(10), n_neighbors: int = 5, factor_distance: float = 2, leaf_size: int = 16, p: float = 2, tol_nn: float = 0.01, n_jobs: Optional[int] = None): super(KNN, self).__init__() self.embedding_method = embedding_method self.n_neighbors = n_neighbors self.factor_distance = factor_distance self.leaf_size = leaf_size self.p = p self.tol_nn = tol_nn self.n_jobs = check_n_jobs(n_jobs) if self.n_jobs is None: self.n_jobs = -1 self.bipartite = None def _instantiate_vars(self, seeds: Union[np.ndarray, dict]): labels = seeds.astype(int) index_seed = np.argwhere(labels >= 0).ravel() index_remain = np.argwhere(labels < 0).ravel() labels_seed = labels[index_seed] return index_seed, index_remain, labels_seed def _fit_core(self, n, labels_seed, embedding, index_seed, index_remain): n_seeds = len(labels_seed) embedding_seed = embedding[index_seed] embedding_remain = embedding[index_remain] n_neighbors = check_n_neighbors(self.n_neighbors, n_seeds) tree = cKDTree(embedding_seed, self.leaf_size) distances, neighbors = tree.query(embedding_remain, n_neighbors, self.tol_nn, self.p, n_jobs=self.n_jobs) if n_neighbors == 1: distances = distances[:, np.newaxis] neighbors = neighbors[:, np.newaxis] labels_neighbor = labels_seed[neighbors] index = (np.min(distances, axis=1) == 0) weights_neighbor = np.zeros_like(distances).astype(float) # take all seeds at distance zero, if any weights_neighbor[index] = (distances[index] == 0).astype(float) # assign weights with respect to distances for other weights_neighbor[~index] = 1 / np.power(distances[~index], self.factor_distance) # form the corresponding matrix row = list(np.repeat(index_remain, n_neighbors)) col = list(labels_neighbor.ravel()) data = list(weights_neighbor.ravel()) row += list(index_seed) col += list(labels_seed) data += list(np.ones_like(index_seed)) membership = normalize(sparse.csr_matrix((data, (row, col)), shape=(n, np.max(labels_seed) + 1))) membership_dense = membership.toarray() labels = np.argmax(membership_dense, axis=1) return membership, labels
[docs] def fit(self, input_matrix: Union[sparse.csr_matrix, np.ndarray], seeds: Union[np.ndarray, dict] = None, seeds_row: Union[np.ndarray, dict] = None, seeds_col: Union[np.ndarray, dict] = None) -> 'KNN': """Node classification by k-nearest neighbors in the embedding space. Parameters ---------- input_matrix : Adjacency matrix or biadjacency matrix of the graph. seeds : Seed nodes. Can be a dict {node: label} or an array where "-1" means no label. seeds_row, seeds_col : Seeds of rows and columns (for bipartite graphs). Returns ------- self: :class:`KNN` """ adjacency, seeds, self.bipartite = get_adjacency_seeds(input_matrix, seeds=seeds, seeds_row=seeds_row, seeds_col=seeds_col) index_seed, index_remain, labels_seed = self._instantiate_vars(seeds) embedding = self.embedding_method.fit_transform(adjacency) membership, labels = self._fit_core(adjacency.shape[0], labels_seed, embedding, index_seed, index_remain) self.membership_ = membership self.labels_ = labels if self.bipartite: self._split_vars(input_matrix.shape) return self