Source code for AnalyticsGraph

from typing import List, Tuple, Union

from networkx import nx

try:
    from Creator import Creator
    from Analytics import Analytics
except ImportError:
    from .Creator import Creator
    from .Analytics import Analytics


[docs]class AnalyticsGraph: _graph: nx.Graph _old_graph = [List[Tuple[int, int, bool]]] _adjacency_matrix_sa: List[List[int]] _old_adjacency_matrix_sa: List[Tuple[int, int, int]] #_laplacian_matrix: List[List[int]] #_old_laplacian_matrix: List[Tuple[int, int, int]] _convergence_rate: float _old_convergence_rate: float _convergence_rate_dirty: bool _old_convergence_rate_dirty: bool _is_connected: bool _old_is_connected: bool _is_connected_dirty: bool _old_is_connected_dirty: bool _connectivity_nodes: Tuple[int, int] _old_connectivity_nodes: Union[Tuple[int, int], None] _edge_cost: int _old_edge_cost: int _dimension: int def __init__(self, nxg: nx.Graph): self._graph = nxg self._adjacency_matrix_sa = Analytics.get_adjacency_matrix(self._graph, True) #self._laplacian_matrix = Analytics.get_laplacian_matrix(self._graph) self._dimension = len(self._adjacency_matrix_sa) self._convergence_rate = None self._old_convergence_rate = None self._convergence_rate_dirty = True self._old_convergence_rate_dirty = True self._is_connected = None self._old_is_connected = None self._is_connected_dirty = True self._old_is_connected_dirty = True self._connectivity_nodes = None self._old_connectivity_nodes = None self._edge_cost = Analytics.total_edge_cost(self._graph) self._old_edge_cost = self._edge_cost
[docs] def graph(self) -> nx.Graph: """ Returns the graph instance that the class has been working on. :return: The current networkx graph instance. """ return self._graph
[docs] def get_convergence_rate(self) -> float: """ Calculates the convergence rate for the current graph. :return: """ if self._convergence_rate_dirty: # Convert the stochastic neighbour matrix to a stochastic one stochastic_neighbour_matrix = Analytics.get_stochastic_neighbour_matrix( adjacency_matrix=self._adjacency_matrix_sa ) # Get the convergence rate self._convergence_rate = Analytics.convergence_rate( stochastic_neighbour_matrix=stochastic_neighbour_matrix ) self._convergence_rate_dirty = False return self._convergence_rate
[docs] def is_connected(self) -> bool: """ Checks whether the graph is connected or not. :return: """ if self._is_connected_dirty: if self._connectivity_nodes is None: self._is_connected = nx.is_connected(self._graph) else: self._is_connected = Analytics.is_nodes_connected_cuda( mx=nx.to_numpy_matrix(self.graph(), weight=None), origin=self._connectivity_nodes[0], destination=self._connectivity_nodes[1] ) self._is_connected_dirty = False return self._is_connected
[docs] def get_edge_cost(self) -> float: """ Calculates the edge cost for the current graph. :return: """ return self._edge_cost
[docs] def add_edge(self, origin, destination): if self.has_edge(origin, destination): return False self.reset_stage_actions() self._stage_graph(origin, destination, False) self._stage_is_connected() self._stage_convergence_rate() self._stage_edge_cost() Creator.add_weighted_edge(self._graph, origin, destination, ignore_validity=True) self._edge_cost += self._graph[origin][destination]['weight'] self._set_adjacency_matrix_sa(origin, destination, 1) #self._laplacian_added_edge(origin, destination) self._convergence_rate_dirty = True # Is connected won't be dirty in this case return True
[docs] def remove_edge(self, origin, destination): if not self.has_edge(origin, destination): return False self.reset_stage_actions() self._stage_graph(origin, destination, True) self._stage_is_connected() self._stage_convergence_rate() self._stage_edge_cost() self._edge_cost -= self._graph[origin][destination]['weight'] self._graph.remove_edge(origin, destination) self._set_adjacency_matrix_sa(origin, destination, 0) #self._laplacian_removed_edge(origin, destination) self._convergence_rate_dirty = True self._is_connected_dirty = True self._connectivity_nodes = (origin, destination) return True
[docs] def move_edge(self, origin, old_destination, new_destination): if old_destination == new_destination: return False if self.has_edge(origin, new_destination) or not self.has_edge(origin, old_destination): return False self.reset_stage_actions() self._stage_graph(origin, old_destination, True) self._stage_graph(origin, new_destination, False) self._stage_is_connected() self._stage_convergence_rate() self._stage_edge_cost() # Remove the old edge from the graph self._edge_cost -= self._graph[origin][old_destination]['weight'] self._graph.remove_edge(origin, old_destination) self._set_adjacency_matrix_sa(origin, old_destination, 0) #self._laplacian_removed_edge(origin, old_destination) # Add the new edge to the graph Creator.add_weighted_edge(self._graph, origin, new_destination, ignore_validity=True) self._edge_cost += self._graph[origin][new_destination]['weight'] self._set_adjacency_matrix_sa(origin, new_destination, 1) #self._laplacian_added_edge(origin, new_destination) self._convergence_rate_dirty = True self._is_connected_dirty = True self._connectivity_nodes = (origin, old_destination) return True
[docs] def has_edge(self, origin, destination): """ Checks whether the graph has an edge by looking up directly in a adjacency matrix. :param origin: :param destination: :return: """ if origin == destination: return False return self._adjacency_matrix_sa[origin][destination] == 1
def _set_adjacency_matrix_sa(self, origin, destination, val): """ Sets a mirrored value for the _adjacency_matrix_sa matrix. :param origin: :param destination: :param val: """ self._stage_adjacency_matrix_sa(origin, destination) self._adjacency_matrix_sa[origin][destination] = val self._adjacency_matrix_sa[destination][origin] = val def _laplacian_added_edge(self, origin, destination): """ Updates the laplacian matrix based on adding an edge. :param origin: :param destination: """ raise NotImplementedError self._stage_laplacian(origin, destination) self._laplacian_matrix[origin][destination] = -1 self._laplacian_matrix[destination][origin] = -1 self._laplacian_matrix[origin][origin] += 1 self._laplacian_matrix[destination][destination] += 1 def _laplacian_removed_edge(self, origin, destination): """ Updates the laplacian matrix based on removing an edge. :param origin: :param destination: """ raise NotImplementedError self._stage_laplacian(origin, destination) self._laplacian_matrix[origin][destination] = 0 self._laplacian_matrix[destination][origin] = 0 self._laplacian_matrix[origin][origin] -= 1 self._laplacian_matrix[destination][destination] -= 1 def _stage_laplacian(self, origin, destination): """ Stages the values for the laplacian matrix. :param origin: :param destination: """ self._old_laplacian_matrix += [ (origin, destination, self._laplacian_matrix[origin][destination]), (destination, origin, self._laplacian_matrix[destination][origin]), (origin, origin, self._laplacian_matrix[origin][origin]), (destination, destination, self._laplacian_matrix[destination][destination]), ] def _stage_adjacency_matrix_sa(self, origin, destination): """ Stages the values for the adjacency matrix with self-assignment. :param origin: :param destination: """ self._old_adjacency_matrix_sa += [ (origin, destination, self._adjacency_matrix_sa[origin][destination]), (destination, origin, self._adjacency_matrix_sa[destination][origin]) ] def _stage_graph(self, origin, destination, edge): self._old_graph += [ (origin, destination, edge) ] def _stage_convergence_rate(self): self._old_convergence_rate = self._convergence_rate self._old_convergence_rate_dirty = self._old_convergence_rate_dirty def _stage_is_connected(self): self._old_is_connected = self._is_connected self._old_is_connected_dirty = self._is_connected_dirty self._old_connectivity_nodes = self._connectivity_nodes def _stage_edge_cost(self): self._old_edge_cost = self._edge_cost
[docs] def revert(self): # Revert the graph for action in self._old_graph: if action[2] is True: Creator.add_weighted_edge(self._graph, action[0], action[1], True) else: self._graph.remove_edge(action[0], action[1]) # Revert the laplacian matrix #for action in self._old_laplacian_matrix: # self._laplacian_matrix[action[0]][action[1]] = action[2] # Revert the adjacency matrix for action in self._old_adjacency_matrix_sa: self._adjacency_matrix_sa[action[0]][action[1]] = action[2] # Revert the convergence rate self._convergence_rate = self._old_convergence_rate self._convergence_rate_dirty = self._old_convergence_rate_dirty # Revert the is connected state self._is_connected = self._old_is_connected self._is_connected_dirty = self._old_is_connected_dirty self._connectivity_nodes = self._old_connectivity_nodes # Revert the edge cost self._edge_cost = self._old_edge_cost
[docs] def reset_stage_actions(self): #self._old_laplacian_matrix = [] self._old_adjacency_matrix_sa = [] self._old_graph = [] self._old_connectivity_nodes = None
[docs] def get_adjacency_matrix_sa(self): return self._adjacency_matrix_sa
[docs] def get_laplacian_matrix(self): raise NotImplementedError return self._laplacian_matrix
[docs] def get_dimension(self): return self._dimension