Source code for sknetwork.embedding.spring

#!/usr/bin/env python3
# coding: utf-8
"""
Created on Apr 2020
@author: Nathan de Lara <nathan.delara@polytechnique.org>
"""
from typing import Optional, Union

import numpy as np
from scipy import sparse
from scipy.spatial import cKDTree

from sknetwork.embedding.base import BaseEmbedding
from sknetwork.embedding.spectral import Spectral
from sknetwork.linalg import normalize
from sknetwork.utils.check import check_adjacency_vector, check_format, check_square, is_symmetric
from sknetwork.utils.format import directed2undirected


[docs] class Spring(BaseEmbedding): """Spring layout for displaying small graphs. Parameters ---------- n_components : int Dimension of the graph layout. strength : float Intensity of the force that moves the nodes. n_iter : int Number of iterations to update positions. tol : float Minimum relative change in positions to continue updating. approx_radius : float If a positive value is provided, only the nodes within this distance a given node are used to compute the repulsive force. position_init : str How to initialize the layout. If 'spectral', use Spectral embedding in dimension 2, otherwise, use random initialization. Attributes ---------- embedding_ : np.ndarray Layout. Example ------- >>> from sknetwork.embedding import Spring >>> from sknetwork.data import karate_club >>> spring = Spring() >>> adjacency = karate_club() >>> embedding = spring.fit_transform(adjacency) >>> embedding.shape (34, 2) Notes ----- Simple implementation designed to display small graphs. References ---------- Fruchterman, T. M. J., Reingold, E. M. (1991). `Graph Drawing by Force-Directed Placement. <https://onlinelibrary.wiley.com/doi/pdf/10.1002/spe.4380211102>`_ Software – Practice & Experience. """ def __init__(self, n_components: int = 2, strength: float = None, n_iter: int = 50, tol: float = 1e-4, approx_radius: float = -1, position_init: str = 'random'): super(Spring, self).__init__() self.n_components = n_components self.strength = strength self.n_iter = n_iter self.tol = tol self.approx_radius = approx_radius if position_init not in ['random', 'spectral']: raise ValueError('Unknown initial position, try "spectral" or "random".') else: self.position_init = position_init
[docs] def fit(self, adjacency: Union[sparse.csr_matrix, np.ndarray], position_init: Optional[np.ndarray] = None, n_iter: Optional[int] = None) -> 'Spring': """Compute layout. Parameters ---------- adjacency : Adjacency matrix of the graph, treated as undirected. position_init : np.ndarray Custom initial positions of the nodes. Shape must be (n, 2). If ``None``, use the value of self.pos_init. n_iter : int Number of iterations to update positions. If ``None``, use the value of self.n_iter. Returns ------- self: :class:`Spring` """ adjacency = check_format(adjacency) check_square(adjacency) if not is_symmetric(adjacency): adjacency = directed2undirected(adjacency) n = adjacency.shape[0] position = np.zeros((n, self.n_components)) if position_init is None: if self.position_init == 'random': position = np.random.randn(n, self.n_components) elif self.position_init == 'spectral': position = Spectral(n_components=self.n_components).fit_transform(adjacency) elif isinstance(position_init, np.ndarray): if position_init.shape == (n, self.n_components): position = position_init.copy() else: raise ValueError('Initial position has invalid shape.') else: raise TypeError('Initial position must be a numpy array.') if n_iter is None: n_iter = self.n_iter if self.strength is None: strength = np.sqrt((1 / n)) else: strength = self.strength pos_max = position.max(axis=0) pos_min = position.min(axis=0) step_max: float = 0.1 * (pos_max - pos_min).max() step: float = step_max / (n_iter + 1) tree = None delta = np.zeros((n, self.n_components)) for iteration in range(n_iter): delta *= 0 if self.approx_radius > 0: tree = cKDTree(position) for i in range(n): # attraction indices = adjacency.indices[adjacency.indptr[i]:adjacency.indptr[i+1]] attraction = adjacency.data[adjacency.indptr[i]:adjacency.indptr[i+1]] / strength grad = position[i] - position[indices] attraction *= np.linalg.norm(grad, axis=1) attraction = (grad * attraction[:, np.newaxis]).sum(axis=0) # repulsion if tree is None: grad: np.ndarray = (position[i] - position) # shape (n, n_components) distance: np.ndarray = np.linalg.norm(grad, axis=1) # shape (n,) else: neighbors = tree.query_ball_point(position[i], self.approx_radius) grad: np.ndarray = (position[i] - position[neighbors]) # shape (n_neigh, n_components) distance: np.ndarray = np.linalg.norm(grad, axis=1) # shape (n_neigh,) distance = np.where(distance < 0.01, 0.01, distance) repulsion = (grad * (strength / distance)[:, np.newaxis] ** 2).sum(axis=0) # total force delta[i]: np.ndarray = repulsion - attraction length = np.linalg.norm(delta, axis=0) length = np.where(length < 0.01, 0.1, length) delta = delta * step_max / length position += delta step_max -= step err: float = np.linalg.norm(delta) / n if err < self.tol: break self.embedding_ = position return self
[docs] def predict(self, adjacency_vectors: Union[sparse.csr_matrix, np.ndarray]) -> np.ndarray: """Predict the embedding of new rows, defined by their adjacency vectors. Parameters ---------- adjacency_vectors : Adjacency vectors of nodes. Array of shape (n_col,) (single vector) or (n_vectors, n_col) Returns ------- embedding_vectors : np.ndarray Embedding of the nodes. """ self._check_fitted() embedding = self.embedding_ n = embedding.shape[0] adjacency_vectors = check_adjacency_vector(adjacency_vectors, n) embedding_vectors = normalize(adjacency_vectors).dot(embedding) if embedding_vectors.shape[0] == 1: embedding_vectors = embedding_vectors.ravel() return embedding_vectors