# Source code for sknetwork.data.parse

```#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created in December 2018
@author: Quentin Lutz <qlutz@enst.fr>
@author: Nathan de Lara <nathan.delara@polytechnique.org>
@author: Thomas Bonald <bonald@enst.fr>
"""

from typing import Dict, List, Tuple, Union
from xml.etree import ElementTree

import numpy as np
from scipy import sparse

from sknetwork.data.base import Bunch
from sknetwork.utils.format import directed2undirected

[docs]def from_edge_list(edge_list: Union[np.ndarray, List[Tuple]], directed: bool = False,
bipartite: bool = False, weighted: bool = True, reindex: bool = True,
sum_duplicates: bool = True, matrix_only: bool = None) -> Union[Bunch, sparse.csr_matrix]:
"""Load a graph from an edge list.

Parameters
----------
edge_list : Union[np.ndarray, List[Tuple]]
The edge list to convert, given as a NumPy array of size (n, 2) or (n, 3) or a list of tuples of
length 2 or 3.
directed : bool
If ``True``, considers the graph as directed.
bipartite : bool
If ``True``, returns a biadjacency matrix.
weighted : bool
If ``True``, returns a weighted graph.
reindex : bool
If ``True``, reindex nodes and returns the original node indices as names.
Reindexing is enforced if nodes are not integers.
sum_duplicates : bool
If ``True`` (default), sums weights of duplicate edges.
Otherwise, the weight of each edge is that of the first occurrence of this edge.
matrix_only : bool
Otherwise, returns a ``Dataset`` object with graph attributes (e.g., node names).
If not specified (default), selects the most appropriate format.
Returns
-------
graph : :class:`Dataset` (including node names) or sparse matrix

Examples
--------
>>> edges = [(0, 1), (1, 2), (2, 0)]
(3, 3)
>>> edges = [('Alice', 'Bob'), ('Bob', 'Carol'), ('Carol', 'Alice')]
>>> graph = from_edge_list(edges)
(3, 3)
>>> print(graph.names)
['Alice' 'Bob' 'Carol']
"""
edge_array = np.array([])
weights = None
if isinstance(edge_list, list):
try:
edge_array = np.array([[edge[0], edge[1]] for edge in edge_list])
if len(edge_list) and len(edge_list[0]) == 3:
weights = np.array([edge[2] for edge in edge_list])
else:
raise ValueError()
except ValueError:
ValueError('Edges must be given as tuples of fixed size (2 or 3).')
elif isinstance(edge_list, np.ndarray):
if edge_list.ndim != 2 or edge_list.shape[1] not in [2, 3]:
raise ValueError('The edge list must be given as an array of shape (n_edges, 2) or '
'(n_edges, 3).')
edge_array = edge_list[:, :2]
if edge_list.shape[1] == 3:
weights = edge_list[:, 2]
else:
raise TypeError('The edge list must be given as a NumPy array or a list of tuples.')
return from_edge_array(edge_array=edge_array, weights=weights, directed=directed, bipartite=bipartite,
weighted=weighted, reindex=reindex, sum_duplicates=sum_duplicates, matrix_only=matrix_only)

bipartite: bool = False, weighted: bool = True, reindex: bool = True,
sum_duplicates: bool = True, matrix_only: bool = None) -> Union[Bunch, sparse.csr_matrix]:

Parameters
----------
Adjacency list (neighbors of each node) or dictionary (node: neighbors).
directed : bool
If ``True``, considers the graph as directed.
bipartite : bool
If ``True``, returns a biadjacency matrix.
weighted : bool
If ``True``, returns a weighted graph.
reindex : bool
If ``True``, reindex nodes and returns the original node indices as names.
Reindexing is enforced if nodes are not integers.
sum_duplicates : bool
If ``True`` (default), sums weights of duplicate edges.
Otherwise, the weight of each edge is that of the first occurrence of this edge.
matrix_only : bool
Otherwise, returns a ``Dataset`` object with graph attributes (e.g., node names).
If not specified (default), selects the most appropriate format.
Returns
-------
graph : :class:`Dataset` or sparse matrix

Example
-------
>>> edges = [[1, 2], [0, 2, 3], [0, 1]]
(4, 4)
"""
edge_list = []
for j in neighbors:
edge_list.append((i, j))
for j in neighbors:
edge_list.append((i, j))
else:
raise TypeError('The adjacency list must be given as a list of lists or a dict of lists.')
return from_edge_list(edge_list=edge_list, directed=directed, bipartite=bipartite, weighted=weighted,
reindex=reindex, sum_duplicates=sum_duplicates, matrix_only=matrix_only)

def from_edge_array(edge_array: np.ndarray, weights: np.ndarray = None, directed: bool = False, bipartite: bool = False,
weighted: bool = True, reindex: bool = True, sum_duplicates: bool = True,
matrix_only: bool = None) -> Union[Bunch, sparse.csr_matrix]:
"""Load a graph from an edge array of shape (n_edges, 2) and weights (optional).

Parameters
----------
edge_array : np.ndarray
Array of edges.
weights : np.ndarray
Array of weights.
directed : bool
If ``True``, considers the graph as directed.
bipartite : bool
If ``True``, returns a biadjacency matrix.
weighted : bool
If ``True``, returns a weighted graph.
reindex : bool
If ``True``, reindex nodes and returns the original node indices as names.
Reindexing is enforced if nodes are not integers.
sum_duplicates : bool
If ``True`` (default), sums weights of duplicate edges.
Otherwise, the weight of each edge is that of the first occurrence of this edge.
matrix_only : bool
Otherwise, returns a ``Dataset`` object with graph attributes (e.g., node names).
If not specified (default), selects the most appropriate format.

Returns
-------
graph : :class:`Dataset` or sparse matrix
"""
try:
edge_array = edge_array.astype(float)
except ValueError:
pass
if edge_array.dtype == float and (edge_array == edge_array.astype(int)).all():
edge_array = edge_array.astype(int)
if weights is None:
weights = np.ones(len(edge_array))
if weights.dtype not in [bool, int, float]:
try:
weights = weights.astype(float)
except ValueError:
raise ValueError('Weights must be numeric.')
if all(weights == weights.astype(int)):
weights = weights.astype(int)
if not weighted:
weights = weights.astype(bool)

if not sum_duplicates:
_, index = np.unique(edge_array, axis=0, return_index=True)
edge_array = edge_array[index]
weights = weights[index]
graph = Bunch()
if bipartite:
row = edge_array[:, 0]
col = edge_array[:, 1]
if row.dtype != int or (reindex and len(set(row)) < max(row) + 1):
names_row, row = np.unique(row, return_inverse=True)
graph.names_row = names_row
graph.names = names_row
n_row = len(names_row)
else:
n_row = max(row) + 1
if col.dtype != int or (reindex and len(set(col)) < max(col) + 1):
names_col, col = np.unique(col, return_inverse=True)
graph.names_col = names_col
n_col = len(names_col)
else:
n_col = max(col) + 1
matrix = sparse.csr_matrix((weights, (row, col)), shape=(n_row, n_col))
else:
nodes = edge_array.ravel()
if nodes.dtype != int or (reindex and len(set(nodes)) < max(nodes) + 1):
names, nodes = np.unique(nodes, return_inverse=True)
graph.names = names
n = len(names)
edge_array = nodes.reshape(-1, 2)
else:
n = max(nodes) + 1
row = edge_array[:, 0]
col = edge_array[:, 1]
matrix = sparse.csr_matrix((weights, (row, col)), shape=(n, n))
if not directed:
matrix = directed2undirected(matrix)
if matrix_only or (matrix_only is None and len(graph) == 1):
return matrix
else:
return graph

[docs]def from_csv(file_path: str, delimiter: str = None, sep: str = None, comments: str = '#%',
data_structure: str = None, directed: bool = False, bipartite: bool = False, weighted: bool = True,
reindex: bool = True, sum_duplicates: bool = True, matrix_only: bool = None) \
-> Union[Bunch, sparse.csr_matrix]:
"""Load a graph from a CSV or TSV file.
The delimiter can be specified (e.g., ' ' for space-separated values).

Parameters
----------
file_path : str
Path to the CSV file.
delimiter : str
Delimiter used in the file. Guessed if not specified.
sep : str
Alias for delimiter.
Characters for comment lines.
data_structure : str
If 'edge_list', considers each row of the file as an edge (tuple of size 2 or 3).
If 'adjacency_list', considers each row of the file as an adjacency list (list of neighbors).
If 'adjacency_dict', considers each row of the file as an adjacency dictionary with key
given by the first column (node: list of neighbors).
If ``None`` (default), data_structure is guessed from the first rows of the file.
directed : bool
If ``True``, considers the graph as directed.
bipartite : bool
If ``True``, returns a biadjacency matrix of shape (n1, n2).
weighted : bool
If ``True``, returns a weighted graph (e.g., counts the number of occurrences of each edge).
reindex : bool
If ``True``, reindex nodes and returns the original node indices as names.
Reindexing is enforced if nodes are not integers.
sum_duplicates : bool
If ``True`` (default), sums weights of duplicate edges.
Otherwise, the weight of each edge is that of the first occurrence of this edge.
matrix_only : bool
Otherwise, returns a ``Dataset`` object with graph attributes (e.g., node names).
If not specified (default), selects the most appropriate format.

Returns
-------
graph: :class:`Dataset` or sparse matrix
"""
if delimiter is None:
if sep is not None:
delimiter = sep
else:
delimiter = delimiter_guess
if data_structure is None:
data_structure = data_structure_guess
if data_structure == 'edge_list':
try:
if np.isnan(array).any():
raise TypeError()
edge_array = array[:, :2].astype(int)
if array.shape[1] == 3:
weights = array[:, 2]
else:
weights = None
return from_edge_array(edge_array=edge_array, weights=weights, directed=directed, bipartite=bipartite,
weighted=weighted, reindex=reindex, sum_duplicates=sum_duplicates,
matrix_only=matrix_only)
except TypeError:
pass
with open(file_path, 'r', encoding='utf-8') as f:
if data_structure == 'edge_list':
edge_list = [tuple(row) for row in csv_reader]
return from_edge_list(edge_list=edge_list, directed=directed, bipartite=bipartite,
weighted=weighted, reindex=reindex, sum_duplicates=sum_duplicates,
matrix_only=matrix_only)
weighted=weighted, reindex=reindex, sum_duplicates=sum_duplicates,
matrix_only=matrix_only)
weighted=weighted, reindex=reindex, sum_duplicates=sum_duplicates,
matrix_only=matrix_only)

def scan_header(file_path: str, delimiters: str = None, comments: str = '#%', n_scan: int = 100):
"""Infer some properties of the graph from the first lines of a CSV file .
Parameters
----------
file_path : str
Path to the CSV file.
delimiters : str
Possible delimiters.
Possible comment characters.
n_scan : int
Number of rows scanned for inference.

Returns
-------
delimiter_guess : str
Guessed delimiter.
comment_guess : str
Guessed comment character.
data_structure_guess : str
"""
if delimiters is None:
delimiters = '\t,; '
count = {delimiter: [] for delimiter in delimiters}
rows = []
with open(file_path, 'r', encoding='utf-8') as f:
if row.startswith(tuple(comments)) or row == '':
if len(row):
comment_guess = row[0]
else:
rows.append(row.rstrip())
for delimiter in delimiters:
count[delimiter].append(row.count(delimiter))
if len(rows) == n_scan:
break
means = [np.mean(count[delimiter]) for delimiter in delimiters]
stds = [np.std(count[delimiter]) for delimiter in delimiters]
index = np.argwhere((np.array(means) > 0) * (np.array(stds) == 0)).ravel()
if len(index) == 1:
delimiter_guess = delimiters[int(index)]
else:
delimiter_guess = delimiters[int(np.argmax(means))]
length = {len(row.split(delimiter_guess)) for row in rows}
if length == {2} or length == {3}:
data_structure_guess = 'edge_list'
else:

"""Parser for files with a single entry on each row.

Parameters
----------
file : str
The path to the dataset

Returns
-------
labels: np.ndarray
Labels.
"""
rows = []
with open(file, 'r', encoding='utf-8') as f:
for row in f:
rows.append(row.strip())
return np.array(rows)

"""Check if the graph is directed, bipartite, weighted."""
directed, bipartite, weighted = False, False, True
with open(file, 'r', encoding='utf-8') as f:
if 'bip' in row:
bipartite = True
if 'unweighted' in row:
weighted = False
if 'asym' in row:
directed = True
return directed, bipartite, weighted

with open(file, 'r', encoding='utf-8') as f:
for row in f:
parts = row.split(delimiter)
key, value = parts[0], ': '.join(parts[1:]).strip('\n')

[docs]def from_graphml(file_path: str, weight_key: str = 'weight', max_string_size: int = 512) -> Bunch:

Hyperedges and nested graphs are not supported.

Parameters
----------
file_path: str
Path to the GraphML file.
weight_key: str
The key to be used as a value for edge weights
max_string_size: int
The maximum size for string features of the data

Returns
-------
data: :class:`Bunch`
The dataset in a Dataset with the adjacency as a CSR matrix.
"""
# see http://graphml.graphdrawing.org/primer/graphml-primer.html
# and http://graphml.graphdrawing.org/specification/dtd.html#top
tree = ElementTree.parse(file_path)
n_nodes = 0
n_edges = 0
symmetrize = None
naming_nodes = True
default_weight = 1
weight_type = bool
weight_id = None
# indices in the graph tree
node_indices = []
edge_indices = []
data = Bunch()
graph = None
file_description = None
attribute_descriptions = Bunch()
attribute_descriptions.node = Bunch()
attribute_descriptions.edge = Bunch()
keys = {}
for file_element in tree.getroot():
if file_element.tag.endswith('graph'):
graph = file_element
symmetrize = (graph.attrib['edgedefault'] == 'undirected')
for index, element in enumerate(graph):
if element.tag.endswith('node'):
node_indices.append(index)
n_nodes += 1
elif element.tag.endswith('edge'):
edge_indices.append(index)
if 'directed' in element.attrib:
if element.attrib['directed'] == 'true':
n_edges += 1
else:
n_edges += 2
elif symmetrize:
n_edges += 2
else:
n_edges += 1
if 'parse.nodeids' in graph.attrib:
naming_nodes = not (graph.attrib['parse.nodeids'] == 'canonical')
for file_element in tree.getroot():
if file_element.tag.endswith('key'):
attribute_name = file_element.attrib['attr.name']
attribute_type = java_type_to_python_type(file_element.attrib['attr.type'])
if attribute_name == weight_key:
weight_type = java_type_to_python_type(file_element.attrib['attr.type'])
weight_id = file_element.attrib['id']
for key_element in file_element:
if key_element.tag == 'default':
default_weight = attribute_type(key_element.text)
else:
default_value = None
if file_element.attrib['for'] == 'node':
size = n_nodes
if 'node_attribute' not in data:
data.node_attribute = Bunch()
for key_element in file_element:
if key_element.tag.endswith('desc'):
attribute_descriptions.node[attribute_name] = key_element.text
elif key_element.tag.endswith('default'):
default_value = attribute_type(key_element.text)
if attribute_type == str:
local_type = '<U' + str(max_string_size)
else:
local_type = attribute_type
if default_value:
data.node_attribute[attribute_name] = np.full(size, default_value, dtype=local_type)
else:
data.node_attribute[attribute_name] = np.zeros(size, dtype=local_type)
elif file_element.attrib['for'] == 'edge':
size = n_edges
if 'edge_attribute' not in data:
data.edge_attribute = Bunch()
for key_element in file_element:
if key_element.tag.endswith('desc'):
attribute_descriptions.edge[attribute_name] = key_element.text
elif key_element.tag.endswith('default'):
default_value = attribute_type(key_element.text)
if attribute_type == str:
local_type = '<U' + str(max_string_size)
else:
local_type = attribute_type
if default_value:
data.edge_attribute[attribute_name] = np.full(size, default_value, dtype=local_type)
else:
data.edge_attribute[attribute_name] = np.zeros(size, dtype=local_type)
keys[file_element.attrib['id']] = [attribute_name, attribute_type]
elif file_element.tag.endswith('desc'):
file_description = file_element.text
if file_description or attribute_descriptions.node or attribute_descriptions.edge:
data.meta = Bunch()
if file_description:
data.meta['description'] = file_description
if attribute_descriptions.node or attribute_descriptions.edge:
data.meta['attributes'] = attribute_descriptions
if graph is not None:
row = np.zeros(n_edges, dtype=int)
col = np.zeros(n_edges, dtype=int)
dat = np.full(n_edges, default_weight, dtype=weight_type)
data.names = None
if naming_nodes:
data.names = np.zeros(n_nodes, dtype='<U512')

node_map = {}
# deal with nodes first
for number, index in enumerate(node_indices):
node = graph[index]
if naming_nodes:
name = node.attrib['id']
data.names[number] = name
node_map[name] = number
for node_attribute in node:
if node_attribute.tag.endswith('data'):
data.node_attribute[keys[node_attribute.attrib['key']][0]][number] = \
keys[node_attribute.attrib['key']][1](node_attribute.text)
# deal with edges
edge_index = -1
for index in edge_indices:
edge_index += 1
duplicate = False
edge = graph[index]
if naming_nodes:
node1 = node_map[edge.attrib['source']]
node2 = node_map[edge.attrib['target']]
else:
node1 = int(edge.attrib['source'][1:])
node2 = int(edge.attrib['target'][1:])
row[edge_index] = node1
col[edge_index] = node2
for edge_attribute in edge:
if edge_attribute.tag.endswith('data'):
if edge_attribute.attrib['key'] == weight_id:
dat[edge_index] = weight_type(edge_attribute.text)
else:
data.edge_attribute[keys[edge_attribute.attrib['key']][0]][edge_index] = \
keys[edge_attribute.attrib['key']][1](edge_attribute.text)
if 'directed' in edge.attrib:
if edge.attrib['directed'] != 'true':
duplicate = True
elif symmetrize:
duplicate = True
if duplicate:
edge_index += 1
row[edge_index] = node2
col[edge_index] = node1
for edge_attribute in edge:
if edge_attribute.tag.endswith('data'):
if edge_attribute.attrib['key'] == weight_id:
dat[edge_index] = weight_type(edge_attribute.text)
else:
data.edge_attribute[keys[edge_attribute.attrib['key']][0]][edge_index] = \
keys[edge_attribute.attrib['key']][1](edge_attribute.text)
data.adjacency = sparse.csr_matrix((dat, (row, col)), shape=(n_nodes, n_nodes))
if data.names is None:
data.pop('names')
return data
else:
raise ValueError(f'No graph defined in {file_path}.')

def java_type_to_python_type(value: str) -> type:
if value == 'boolean':
return bool
elif value == 'int':
return int
elif value == 'string':
return str
elif value in ('long', 'float', 'double'):
return float

def is_number(s):
try:
float(s)
return True
except ValueError:
return False
```