Source code for sknetwork.hierarchy.metrics

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on March 2019
@author: Thomas Bonald <bonald@enst.fr>
"""
import numpy as np
from scipy import sparse
from sknetwork.hierarchy.paris import AggregateGraph

from sknetwork.utils.check import check_format, get_probs, check_square
from sknetwork.utils.check import check_min_size, check_min_nnz
from sknetwork.utils.format import directed2undirected


def _instantiate_vars(adjacency: sparse.csr_matrix, weights: str = 'uniform'):
    """Initialize standard variables for metrics."""
    weights_row = get_probs(weights, adjacency)
    weights_col = get_probs(weights, adjacency.T)
    sym_adjacency = directed2undirected(adjacency)
    aggregate_graph = AggregateGraph(weights_row, weights_col, sym_adjacency.data.astype(float),
                                     sym_adjacency.indices, sym_adjacency.indptr)
    return aggregate_graph, weights_row, weights_col


def get_sampling_distributions(adjacency: sparse.csr_matrix, dendrogram: np.ndarray, weights: str = 'uniform'):
    """Get sampling distributions over each internal node of the tree.
    Parameters
    ----------
    adjacency :
        Adjacency matrix of the graph.
    dendrogram :
        Dendrogram.
    weights :
        Weights of nodes.
        ``'degree'`` or ``'uniform'`` (default).
    Returns
    -------
    edge_sampling: np.ndarray
        Edge sampling distribution.
    node_sampling: np.ndarray
        Node sampling distribution.
    cluster_weights: np.ndarray
        Cluster weights.
    """
    n = adjacency.shape[0]
    aggregate_graph, weights_row, weights_col = _instantiate_vars(adjacency, weights)
    cluster_weight = np.zeros(n-1)
    edge_sampling = np.zeros(n-1)
    node_sampling = np.zeros(n-1)

    for t in range(n - 1):
        i = int(dendrogram[t][0])
        j = int(dendrogram[t][1])
        if j in aggregate_graph.neighbors[i]:
            edge_sampling[t] += 2 * aggregate_graph.neighbors[i][j]
        node_sampling[t] += aggregate_graph.cluster_out_weights[i] * aggregate_graph.cluster_in_weights[j] + \
            aggregate_graph.cluster_out_weights[j] * aggregate_graph.cluster_in_weights[i]
        cluster_weight[t] = aggregate_graph.cluster_out_weights[i] + aggregate_graph.cluster_out_weights[j] + \
            aggregate_graph.cluster_in_weights[i] + aggregate_graph.cluster_in_weights[j]
        for node in {i, j}:
            if node < n:
                # self-loop
                node_sampling[t] += aggregate_graph.cluster_out_weights[node] * aggregate_graph.cluster_in_weights[node]
                if node in aggregate_graph.neighbors[node]:
                    edge_sampling[t] += aggregate_graph.neighbors[node][node]
        aggregate_graph.merge(i, j)
    return edge_sampling, node_sampling, cluster_weight / 2


[docs]def dasgupta_cost(adjacency: sparse.csr_matrix, dendrogram: np.ndarray, weights: str = 'uniform', normalized: bool = False) -> float: """Dasgupta's cost of a hierarchy. Expected size (weights = ``'uniform'``) or expected volume (weights = ``'degree'``) of the cluster induced by random edge sampling (closest ancestor of the two nodes in the hierarchy). Parameters ---------- adjacency : Adjacency matrix of the graph. dendrogram : Dendrogram. weights : Weights of nodes. ``'degree'`` or ``'uniform'`` (default). normalized : If ``True``, normalized cost (between 0 and 1). Returns ------- cost : float Cost. Example ------- >>> from sknetwork.hierarchy import dasgupta_score, Paris >>> from sknetwork.data import house >>> paris = Paris() >>> adjacency = house() >>> dendrogram = paris.fit_transform(adjacency) >>> cost = dasgupta_cost(adjacency, dendrogram) >>> np.round(cost, 2) 3.33 References ---------- Dasgupta, S. (2016). A cost function for similarity-based hierarchical clustering. Proceedings of ACM symposium on Theory of Computing. """ adjacency = check_format(adjacency) check_square(adjacency) n = adjacency.shape[0] check_min_size(n, 2) edge_sampling, _, cluster_weight = get_sampling_distributions(adjacency, dendrogram, weights) cost = edge_sampling.dot(cluster_weight) if not normalized: if weights == 'degree': cost *= adjacency.data.sum() else: cost *= n return cost
[docs]def dasgupta_score(adjacency: sparse.csr_matrix, dendrogram: np.ndarray, weights: str = 'uniform') -> float: """Dasgupta's score of a hierarchy (quality metric, between 0 and 1). Defined as 1 - normalized Dasgupta's cost. Parameters ---------- adjacency : Adjacency matrix of the graph. dendrogram : Dendrogram. weights : Weights of nodes. ``'degree'`` or ``'uniform'`` (default). Returns ------- score : float Score. Example ------- >>> from sknetwork.hierarchy import dasgupta_score, Paris >>> from sknetwork.data import house >>> paris = Paris() >>> adjacency = house() >>> dendrogram = paris.fit_transform(adjacency) >>> score = dasgupta_score(adjacency, dendrogram) >>> np.round(score, 2) 0.33 References ---------- Dasgupta, S. (2016). A cost function for similarity-based hierarchical clustering. Proceedings of ACM symposium on Theory of Computing. """ return 1 - dasgupta_cost(adjacency, dendrogram, weights, normalized=True)
[docs]def tree_sampling_divergence(adjacency: sparse.csr_matrix, dendrogram: np.ndarray, weights: str = 'degree', normalized: bool = True) -> float: """Tree sampling divergence of a hierarchy (quality metric). Parameters ---------- adjacency : Adjacency matrix of the graph. dendrogram : Dendrogram. weights : Weights of nodes. ``'degree'`` (default) or ``'uniform'``. normalized : If ``True``, normalized score (between 0 and 1). Returns ------- score : float Score. Example ------- >>> from sknetwork.hierarchy import tree_sampling_divergence, Paris >>> from sknetwork.data import house >>> paris = Paris() >>> adjacency = house() >>> dendrogram = paris.fit_transform(adjacency) >>> score = tree_sampling_divergence(adjacency, dendrogram) >>> np.round(score, 2) 0.05 References ---------- Charpentier, B. & Bonald, T. (2019). `Tree Sampling Divergence: An Information-Theoretic Metric for Hierarchical Graph Clustering. <https://hal.telecom-paristech.fr/hal-02144394/document>`_ Proceedings of IJCAI. """ adjacency = check_format(adjacency) check_square(adjacency) check_min_nnz(adjacency.nnz, 1) adjacency = adjacency.astype(float) n = adjacency.shape[0] check_min_size(n, 2) adjacency.data /= adjacency.data.sum() edge_sampling, node_sampling, _ = get_sampling_distributions(adjacency, dendrogram, weights) index = np.where(edge_sampling)[0] score = edge_sampling[index].dot(np.log(edge_sampling[index] / node_sampling[index])) if normalized: weights_row = get_probs(weights, adjacency) weights_col = get_probs(weights, adjacency.T) inv_out_weights = sparse.diags(weights_row, shape=(n, n), format='csr') inv_out_weights.data = 1 / inv_out_weights.data inv_in_weights = sparse.diags(weights_col, shape=(n, n), format='csr') inv_in_weights.data = 1 / inv_in_weights.data sampling_ratio = inv_out_weights.dot(adjacency.dot(inv_in_weights)) inv_out_weights.data = np.ones(len(inv_out_weights.data)) inv_in_weights.data = np.ones(len(inv_in_weights.data)) edge_sampling = inv_out_weights.dot(adjacency.dot(inv_in_weights)) mutual_information = edge_sampling.data.dot(np.log(sampling_ratio.data)) if mutual_information > 0: score /= mutual_information return score