Source code for sknetwork.path.distances

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in May 2023
@author: Thomas Bonald <bonald@enst.fr>
"""
from typing import Iterable, Optional, Union, Tuple

import numpy as np
from scipy import sparse

from sknetwork.utils.format import get_adjacency


[docs]def get_distances(input_matrix: sparse.csr_matrix, source: Optional[Union[int, Iterable]] = None, source_row: Optional[Union[int, Iterable]] = None, source_col: Optional[Union[int, Iterable]] = None, transpose: bool = False, force_bipartite: bool = False) \ -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]: """Get the distances from a source (or a set of sources) in number of hops. Parameters ---------- input_matrix : Adjacency matrix or biadjacency matrix of the graph. source : If an integer, index of the source node. If a list or array, indices of source nodes (the shortest distances to one of these nodes is returned). source_row, source_col : For bipartite graphs, index of source nodes on rows and columns. The parameter source_row is an alias for source (at least one of them must be ``None``). transpose : If ``True``, transpose the input matrix. force_bipartite : If ``True``, consider the input matrix as the biadjacency matrix of a bipartite graph. Set to ``True`` is the parameters source_row or source_col re specified. Returns ------- distances : np.ndarray of shape (n_nodes,) Vector of distances from source (distance = -1 if no path exists from the source). For a bipartite graph, two vectors are returned, one for the rows and one for the columns. Examples -------- >>> from sknetwork.data import cyclic_digraph >>> adjacency = cyclic_digraph(3) >>> get_distances(adjacency, source=0) array([0, 1, 2]) >>> get_distances(adjacency, source=[0, 2]) array([0, 1, 0]) """ if transpose: matrix = sparse.csr_matrix(input_matrix.T) else: matrix = input_matrix if source_row is not None or source_col is not None: force_bipartite = True adjacency, bipartite = get_adjacency(matrix, force_bipartite=force_bipartite, allow_empty=True) adjacency_transpose = adjacency.astype(bool).T.tocsr() n_row, n_col = matrix.shape n_nodes = adjacency.shape[0] mask = np.zeros(n_nodes, dtype=bool) if bipartite: if source is not None: if source_row is not None: raise ValueError('Only one of the parameters source and source_row can be specified.') source_row = source if source_row is None and source_col is None: raise ValueError('At least one of the parameters source_row or source_col must be specified.') if source_row is not None: mask[source_row] = 1 if source_col is not None: mask[n_row + np.array(source_col)] = 1 else: if source is None: raise ValueError('The parameter source must be specified.') mask[source] = 1 distances = -np.ones(n_nodes, dtype=int) distances[mask] = 0 distance = 0 reach = mask while 1: distance += 1 mask = adjacency_transpose.dot(reach).astype(bool) & ~reach if np.sum(mask) == 0: break distances[mask] = distance reach |= mask if bipartite: return distances[:n_row], distances[n_row:] else: return distances