Source code for sknetwork.topology.cycles

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in February 2024
@author: Thomas Bonald <thomas.bonald@telecom-paris.fr>
@author: Yiwen Peng <yiwen.peng@telecom-paris.fr>
"""
from typing import Tuple, Optional, Union, List

import numpy as np
from scipy import sparse

from sknetwork.utils.check import is_symmetric, check_format
from sknetwork.utils.format import get_adjacency
from sknetwork.path import get_distances


[docs] def is_acyclic(adjacency: sparse.csr_matrix, directed: Optional[bool] = None) -> bool: """Check whether a graph has no cycle. Parameters ---------- adjacency: Adjacency matrix of the graph. directed: Whether to consider the graph as directed (inferred if not specified). Returns ------- is_acyclic : bool A boolean with value True if the graph has no cycle and False otherwise. Example ------- >>> from sknetwork.topology import is_acyclic >>> from sknetwork.data import star, grid >>> is_acyclic(star()) True >>> is_acyclic(grid()) False """ if directed is False: # the graph must be undirected if not is_symmetric(adjacency): raise ValueError("The adjacency matrix is not symmetric. The parameter 'directed' must be True.") elif directed is None: # if not specified, infer from the graph directed = not is_symmetric(adjacency) has_loops = (adjacency.diagonal() > 0).any() if has_loops: return False else: n_cc = sparse.csgraph.connected_components(adjacency, directed, connection='strong', return_labels=False) n_nodes = adjacency.shape[0] if directed: return n_cc == n_nodes else: n_edges = adjacency.nnz // 2 return n_cc == n_nodes - n_edges
[docs] def get_cycles(adjacency: sparse.csr_matrix, directed: Optional[bool] = None) -> List[List[int]]: """Get all possible cycles of a graph. Parameters ---------- adjacency : Adjacency matrix of the graph. directed : Whether to consider the graph as directed (inferred if not specified). Returns ------- cycles : list List of cycles, each cycle represented as a list of nodes. Example ------- >>> from sknetwork.topology import get_cycles >>> from sknetwork.data import cyclic_digraph >>> graph = cyclic_digraph(4, metadata=True) >>> get_cycles(graph.adjacency, directed=True) [[np.int64(0), np.int64(1), np.int64(2), np.int64(3)]] """ if directed is False: # the graph must be undirected if not is_symmetric(adjacency): raise ValueError("The adjacency matrix is not symmetric. The parameter 'directed' must be True.") elif directed is None: # if not specified, infer from the graph directed = not is_symmetric(adjacency) cycles = [] n_nodes = adjacency.shape[0] self_loops = np.argwhere(adjacency.diagonal() > 0).ravel() if len(self_loops) > 0: # add self-loops as cycles for node in self_loops: cycles.append([node]) # check if the graph is acyclic n_cc, cc_labels = sparse.csgraph.connected_components(adjacency, directed, connection='strong', return_labels=True) if directed and n_cc == n_nodes: return cycles elif not directed: # acyclic undirected graph n_edges = adjacency.nnz // 2 if n_cc == n_nodes - n_edges: return cycles # locate possible cycles labels, counts = np.unique(cc_labels, return_counts=True) if directed: labels = labels[counts > 1] cycle_starts = [np.argwhere(cc_labels == label).ravel()[0] for label in labels] # find cycles for indicated nodes (depth-first traversal) for start_node in cycle_starts: stack = [(start_node, [start_node])] while stack: current_node, path = stack.pop() for neighbor in adjacency.indices[adjacency.indptr[current_node]:adjacency.indptr[current_node+1]]: if not directed and len(path) > 1 and neighbor == path[-2]: # ignore the inverse edge (back move) in undirected graph continue if neighbor in path: cycles.append(path[path.index(neighbor):]) else: stack.append((neighbor, path + [neighbor])) # remove duplicates visited_cycles, unique_cycles = set(), [] for cycle in cycles: candidate = np.roll(cycle, -cycle.index(min(cycle))) current_cycle = tuple(candidate) if not directed: current_cycle = tuple(np.sort(candidate)) if current_cycle not in visited_cycles: unique_cycles.append(list(candidate)) visited_cycles.add(current_cycle) return unique_cycles
[docs] def break_cycles(adjacency: sparse.csr_matrix, root: Union[int, List[int]], directed: Optional[bool] = None) -> sparse.csr_matrix: """Break cycles of a graph from given roots. Parameters ---------- adjacency : Adjacency matrix of the graph. root : The root node or list of root nodes to break cycles from. directed : Whether to consider the graph as directed (inferred if not specified). Returns ------- adjacency : sparse.csr_matrix Adjacency matrix of the acyclic graph. Example ------- >>> from sknetwork.topology import break_cycles, is_acyclic >>> from sknetwork.data import cyclic_digraph >>> adjacency = cyclic_digraph(4) >>> dag = break_cycles(adjacency, root=0, directed=True) >>> is_acyclic(dag, directed=True) True """ if is_acyclic(adjacency, directed): return adjacency if root is None: raise ValueError("The parameter root must be specified.") else: out_degree = len(adjacency[root].indices) if out_degree == 0: raise ValueError("Invalid root node. The root must have at least one outgoing edge.") if isinstance(root, int): root = [root] if directed is False: # the graph must be undirected if not is_symmetric(adjacency): raise ValueError("The adjacency matrix is not symmetric. The parameter 'directed' must be True.") elif directed is None: # if not specified, infer from the graph directed = not is_symmetric(adjacency) # break self-loops adjacency = (adjacency - sparse.diags(adjacency.diagonal().astype(int), format='csr')).astype(int) if directed: # break cycles from the cycle node closest to the root _, cc_labels = sparse.csgraph.connected_components(adjacency, directed, connection='strong', return_labels=True) labels, counts = np.unique(cc_labels, return_counts=True) cycle_labels = labels[counts > 1] distances = get_distances(adjacency, source=root) for label in cycle_labels: cycle_nodes = np.argwhere(cc_labels == label).ravel() roots_ix = np.argwhere(distances[cycle_nodes] == min(distances[cycle_nodes])).ravel() subroots = set(cycle_nodes[roots_ix]) stack = [(subroot, [subroot]) for subroot in subroots] # break cycles using depth-first traversal while stack: current_node, path = stack.pop() # check if the edge still exists if len(path) > 1 and adjacency[path[-2], path[-1]] <= 0: continue neighbors = adjacency.indices[adjacency.indptr[current_node]:adjacency.indptr[current_node+1]] cycle_neighbors = set(neighbors) & set(cycle_nodes) for neighbor in cycle_neighbors: if neighbor in path: adjacency[current_node, neighbor] = 0 adjacency.eliminate_zeros() else: stack.append((neighbor, path + [neighbor])) else: # break cycles from given roots for undirected graphs for start_node in root: stack = [(start_node, [start_node])] # break cycles using depth-first traversal while stack: current_node, path = stack.pop() # check if the edge still exists if len(path) > 1 and adjacency[path[-2], path[-1]] <= 0: continue neighbors = list(adjacency.indices[adjacency.indptr[current_node]:adjacency.indptr[current_node+1]]) for neighbor in neighbors: if len(path) > 1 and neighbor == path[-2]: continue if neighbor in path: adjacency[current_node, neighbor] = 0 adjacency[neighbor, current_node] = 0 adjacency.eliminate_zeros() else: stack.append((neighbor, path + [neighbor])) return adjacency