ot_markov_distances.utils module
This module contains a lot of miscellanous functions.
It contains
Functions on markov chains:
weighted_transition_matrix()
,weighted_transition_matrix()
,degree_markov()
Display functions
draw_markov()
Support for sparse markov chains:
densify()
,dummy_densify()
,cost_matrix_index()
etc.
It would gain to be broken up into submodules, and merged with misc.py (as utils.markov, utils.display, utils.misc, utils.sparse) (TODO for a next version)
- ot_markov_distances.utils.weighted_transition_matrix(G: Graph, q: float)
Create a LMMC from a Graph
This function is largely inspired from the same one here https://github.com/chens5/WL-distance/blob/main/utils/utils.py
See def 5 in Chen et al. [CLM+22],
It returns the transition matrix of the random walk on graph G, where the probability of staying in the same node is q, and the probablity of transiting to one of the neighboring nodes is equiprobable
- Parameters:
G – the graph
q – the q parameter for the q-markov chain
- Returns:
Array – weighted transition matrix associated to q-random walks on G
- ot_markov_distances.utils.markov_measure(M: Tensor) Tensor
Takes a (batched) markov transition matrix, and outputs its stationary distribution
- Parameters:
M – (*b, n, n) the markov transition matrix
- Returns:
Tensor – m (*b, n) so that m @ b = m
- ot_markov_distances.utils.double_last_dimension(t: Tensor) Tensor
doubles last dimension of a tensor, by making it into a diagonal matrix :param t: (…, d)
- Returns:
t’ (…, d, d) – t’[…, i, i] = t[…, i] and t’[…, i, k] = 0 if i != k
- ot_markov_distances.utils.draw_markov(M: Tensor, pos=None, node_color=None, ax=None)
- ot_markov_distances.utils.degree_markov(m: Tensor) int
- ot_markov_distances.utils.densify(m: FloatTensor) tuple[torch.Tensor, torch.Tensor, torch.Tensor]
make a sparse markov matrix into a dense one
Takes a (batched) markov matrix m where every transition is very sparse.
Call :math:`d := max_i(deg(m_i)) = max_i #{j, m_{ij} neq 0 } ` (maxmized over the batch)
Assume d is low (so every one of the \(m_i\) is sparse)
call, for \(0 \leq i < [n]\), \(\text{index}_{i, 0} \ldots \text{index}_{i, d-1}\) the indices where \(m_i\) is nonzero, in increasing order (eventually 0-padded if there are less than d)
this gives us an index array of size (*batch, n, d)
call also index_mask the mask of size (*batch, n, d) corresponding to the 0-padding we did on index (so index_mask[*b, i, j] = True if index[*b, i, j] is an actual index, False if it’s just padding)
Finally, call m_dense so that
\[\begin{split}m_dense[*b, i, j] = \begin{cases} m[*b, i, index[*b, i, j]] \text{ if } index_mask[*b, i, j] \\ 0 \text{ otherwise } \end{cases}\end{split}\]
- ot_markov_distances.utils.dummy_densify(m: FloatTensor) tuple[torch.LongTensor, torch.BoolTensor, torch.FloatTensor]
Same as densify, but returns dummy results: the densified matrix is the full matrix, the index is simply (1, 2, …, n) and the mask is all true
- ot_markov_distances.utils.cost_matrix_index(C: Tensor, mx_index: Tensor, my_index: Tensor, mx_mask: Tensor, my_mask: Tensor)
reindex the cost matrix to go with densified distributions
takes the indices mx_index and my_index output by densify, as well as a cost matrix, and reindexes the cost matrix to go with the densified distributions.
More precisely, if
` mx_index, mx_mask, mx_dense = densify(mx) my_index, my_mask, my_dense = densify(my) C_index, C_mask = cost_matrix_index(mx_index, my_index, mx_mask, my_mask) C_dense = C.view(-1)[C_index] `
then
` sinkhorn(mx_dense[:, :, None, :], my_dense[:, None, :, :], C_dense, epsilon) == sinkhorn(mx[:, :, None, :], my[:, None, :, :], C[:, None, None, :, :]) `
- Parameters:
mx_index – (*batch, n, dx), LongTensor
my_index – (*batch, m, dy), LongTensor
- Returns:
C_index
C_mask
- ot_markov_distances.utils.reindex_cost_matrix(C, C_index, C_mask)
- ot_markov_distances.utils.re_project_C(C_dense: FloatTensor, C_index: LongTensor, C_mask: BoolTensor | None = None)
Inverse of reindex_cost_matrix
Takes a (b, n, m, dx, dy) reindexed cost matrix (or matching matrix) and outputs a (b, n, m, n, m) cost matrix C_sparse where C_sparse[b, i, j, mx_index[b, i, j, k], my_index[b, i, j, l]]
= C_dense[b, i, j, k, l]
And the rest are filled with zeros
- Parameters:
C_dense – (b, n, m, dx, dy)
C_index – (b, n, m, dx, dy)
C_mask – (b, n, m, dx, dy)
- ot_markov_distances.utils.pad_one_markov_to(M: Tensor, mu: Tensor, n: int)
Pad a markov chain to n states
Pads a markov chain to n states by adding (n - m) states with zero distribution mass, and transitions to themselves wp 1.
- Parameters:
M – (m, m) transition matrix
mu –
distribution
n – the number of states to pad to
- ot_markov_distances.utils.pad_markovs(transition_matrices: list[torch.Tensor], distributions: list[torch.Tensor])
Pad markov chains to the same number of states
Pads markov chains to the same number of states by adding states with zero distribution mass, and transitions to themselves wp 1.
- Parameters:
transition_matrices – list of transition matrices (of shape (m_i, m_i))
distributions – list of distributions (of shape (m_i))