diff --git a/nx_arangodb/classes/dict/adj.py b/nx_arangodb/classes/dict/adj.py index 90931d26..dab0825b 100644 --- a/nx_arangodb/classes/dict/adj.py +++ b/nx_arangodb/classes/dict/adj.py @@ -775,19 +775,20 @@ def __init__( self.__getitem_helper_db: Callable[[str, str], EdgeAttrDict | EdgeKeyDict] self.__setitem_helper: Callable[[EdgeAttrDict | EdgeKeyDict, str, str], None] + self.__delitem_helper: Callable[[str | list[str]], None] if self.is_multigraph: self.__contains_helper = self.__contains__multigraph self.__getitem_helper_db = self.__getitem__multigraph_db self.__getitem_helper_cache = self.__getitem__multigraph_cache - self.__setitem_helper = self.__setitem__multigraph # type: ignore[assignment] # noqa - self.__delitem_helper = self.__delitem__multigraph + self.__setitem_helper = self.__setitem__multigraph # type: ignore[assignment] # noqa + self.__delitem_helper = self.__delitem__multigraph # type: ignore[assignment] # noqa self.__fetch_all_helper = self.__fetch_all_multigraph else: self.__contains_helper = self.__contains__graph self.__getitem_helper_db = self.__getitem__graph_db self.__getitem_helper_cache = self.__getitem__graph_cache - self.__setitem_helper = self.__setitem__graph # type: ignore[assignment] # noqa - self.__delitem_helper = self.__delitem__graph + self.__setitem_helper = self.__setitem__graph # type: ignore[assignment] + self.__delitem_helper = self.__delitem__graph # type: ignore[assignment] self.__fetch_all_helper = self.__fetch_all_graph @property @@ -1104,6 +1105,7 @@ def __delitem__(self, key: str) -> None: dst_node_id, self.graph.name, direction=self.traversal_direction.name, + can_return_multiple=self.is_multigraph, ) if not result: @@ -1112,12 +1114,14 @@ def __delitem__(self, key: str) -> None: self.__delitem_helper(result) - @key_is_string def __delitem__graph(self, edge_id: str) -> None: """Helper function for __delitem__ in Graphs.""" - self.graph.delete_edge(edge_id) + try: + self.graph.delete_edge(edge_id) + except DocumentDeleteError as e: + m = f"Failed to delete edge '{edge_id}' from Graph: {e}." + raise KeyError(m) - @key_is_string def __delitem__multigraph(self, edge_ids: list[str]) -> None: """Helper function for __delitem__ in MultiGraphs.""" # TODO: Consider separating **edge_ids** by edge collection, @@ -1641,7 +1645,6 @@ def propagate_edge_directed( dst_node_id: str, edge_key_or_attr_dict: EdgeKeyDict | EdgeAttrDict, ) -> None: - self.__set_adj_inner_dict(self.mirror, dst_node_id) self.mirror.data[dst_node_id].data[src_node_id] = edge_key_or_attr_dict def propagate_edge_directed_symmetric( @@ -1651,7 +1654,6 @@ def propagate_edge_directed_symmetric( ) -> None: propagate_edge_directed(src_node_id, dst_node_id, edge_key_or_attr_dict) propagate_edge_undirected(src_node_id, dst_node_id, edge_key_or_attr_dict) - self.__set_adj_inner_dict(self.mirror, src_node_id) self.mirror.data[src_node_id].data[dst_node_id] = edge_key_or_attr_dict propagate_edge_func = ( @@ -1664,37 +1666,46 @@ def propagate_edge_directed_symmetric( ) ) + set_adj_inner_dict_mirror = ( + self.mirror.__set_adj_inner_dict if self.is_directed else lambda *args: None + ) + if node_dict is not None: for node_id in node_dict.keys(): - self.__set_adj_inner_dict(self, node_id) + self.__set_adj_inner_dict(node_id) + set_adj_inner_dict_mirror(node_id) for src_node_id, inner_dict in adj_dict.items(): for dst_node_id, edge_or_edges in inner_dict.items(): - self.__set_adj_inner_dict(self, src_node_id) - self.__set_adj_inner_dict(self, dst_node_id) + self.__set_adj_inner_dict(src_node_id) + self.__set_adj_inner_dict(dst_node_id) + + set_adj_inner_dict_mirror(src_node_id) + set_adj_inner_dict_mirror(dst_node_id) + edge_attr_or_key_dict = set_edge_func( # type: ignore[operator] src_node_id, dst_node_id, edge_or_edges ) propagate_edge_func(src_node_id, dst_node_id, edge_attr_or_key_dict) - def __set_adj_inner_dict( - self, adj_outer_dict: AdjListOuterDict, node_id: str - ) -> AdjListInnerDict: - if node_id in adj_outer_dict.data: - return adj_outer_dict.data[node_id] + def __set_adj_inner_dict(self, node_id: str) -> AdjListInnerDict: + if node_id in self.data: + return self.data[node_id] adj_inner_dict = self.adjlist_inner_dict_factory() adj_inner_dict.src_node_id = node_id adj_inner_dict.FETCHED_ALL_DATA = True adj_inner_dict.FETCHED_ALL_IDS = True - adj_outer_dict.data[node_id] = adj_inner_dict + self.data[node_id] = adj_inner_dict return adj_inner_dict def _fetch_all(self) -> None: self.clear() + if self.is_directed: + self.mirror.clear() ( node_dict, diff --git a/nx_arangodb/classes/digraph.py b/nx_arangodb/classes/digraph.py index 4d313bed..e2bea65c 100644 --- a/nx_arangodb/classes/digraph.py +++ b/nx_arangodb/classes/digraph.py @@ -64,6 +64,7 @@ def __init__( self.clear_edges = self.clear_edges_override self.add_node = self.add_node_override self.remove_node = self.remove_node_override + self.reverse = self.reverse_override if ( not self.is_multigraph() @@ -86,6 +87,12 @@ def __init__( # def out_edges(self): # pass + def reverse_override(self, copy: bool = True) -> Any: + if copy is False: + raise NotImplementedError("In-place reverse is not supported yet.") + + return super().reverse(copy=True) + def clear_edges_override(self): logger.info("Note that clearing edges ony erases the edges in the local cache") for predecessor_dict in self._pred.data.values(): diff --git a/nx_arangodb/classes/multidigraph.py b/nx_arangodb/classes/multidigraph.py index db10c11e..d9a57f9e 100644 --- a/nx_arangodb/classes/multidigraph.py +++ b/nx_arangodb/classes/multidigraph.py @@ -1,3 +1,4 @@ +from copy import deepcopy from typing import Any, Callable, ClassVar import networkx as nx @@ -59,6 +60,10 @@ def __init__( **kwargs, ) + if self.graph_exists_in_db: + self.reverse = self.reverse_override + self.to_undirected = self.to_undirected_override + ####################### # Init helper methods # ####################### @@ -66,3 +71,54 @@ def __init__( ########################## # nx.MultiGraph Overides # ########################## + + def reverse_override(self, copy: bool = True) -> Any: + if copy is False: + raise NotImplementedError("In-place reverse is not supported yet.") + + return super().reverse(copy=True) + + def to_undirected_override(self, reciprocal=False, as_view=False): + if reciprocal is False: + return super().to_undirected(reciprocal=False, as_view=as_view) + + graph_class = self.to_undirected_class() + if as_view is True: + return nx.graphviews.generic_graph_view(self, graph_class) + + # deepcopy when not a view + G = graph_class() + G.graph.update(deepcopy(self.graph)) + G.add_nodes_from((n, deepcopy(d)) for n, d in self._node.items()) + + ###################### + # NOTE: Monkey patch # + ###################### + + # Old + # G.add_edges_from( + # (u, v, key, deepcopy(data)) + # for u, nbrs in self._adj.items() + # for v, keydict in nbrs.items() + # for key, data in keydict.items() + # if v in self._pred[u] and key in self._pred[u][v] + # ) + + # New: + G.add_edges_from( + (u, v, key, deepcopy(data)) + for u, nbrs in self._adj.items() + for v, keydict in nbrs.items() + for key, data in keydict.items() + if v in self._pred[u] # and key in self._pred[u][v] + ) + + # Reason: MultiGraphs in `nxadb` don't use integer-based keys for edges. + # They use ArangoDB Edge IDs. Therefore, the statement `key in self._pred[u][v]` + # will always be False in the context of MultiDiGraphs. For more details on why + # this adjustment is needed, see the `test_to_undirected_reciprocal` + # in `test_multidigraph.py`. + + ########################### + + return G diff --git a/tests/test_digraph.py b/tests/test_digraph.py index 5a7ea06a..28fe7b47 100644 --- a/tests/test_digraph.py +++ b/tests/test_digraph.py @@ -18,12 +18,10 @@ from .conftest import db # from .test_graph import TestEdgeSubgraph as _TestGraphEdgeSubgraph -from .test_graph import BaseAttrGraphTester, BaseGraphTester +from .test_graph import GRAPH_NAME, BaseAttrGraphTester, BaseGraphTester from .test_graph import TestGraph as _TestGraph from .test_graph import get_doc -GRAPH_NAME = "test_graph" - class BaseDiGraphTester(BaseGraphTester): def test_has_successor(self): @@ -216,14 +214,36 @@ def test_size(self): def test_to_undirected_reciprocal(self): G = self.EmptyGraph() + assert G.number_of_edges() == 0 G.add_edge(1, 2) - assert G.to_undirected().has_edge("test_graph_node/1", "test_graph_node/2") - assert not G.to_undirected(reciprocal=True).has_edge(1, 2) - G.add_edge(2, 1) - assert G.to_undirected(reciprocal=True).has_edge( + assert G.number_of_edges() == 1 + + G_undirected = G.to_undirected() + assert G_undirected.number_of_edges() == 1 + assert G_undirected.has_edge("test_graph_node/1", "test_graph_node/2") + assert G_undirected.has_edge("test_graph_node/2", "test_graph_node/1") + + G_undirected_reciprocal = G.to_undirected(reciprocal=True) + assert G_undirected_reciprocal.number_of_edges() == 0 + assert not G_undirected_reciprocal.has_edge( "test_graph_node/1", "test_graph_node/2" ) + G.add_edge("test_graph_node/2", "test_graph_node/1", foo="bar") + assert G.number_of_edges() == 2 + G_undirected_reciprocal = G.to_undirected(reciprocal=True) + assert G_undirected_reciprocal.number_of_edges() == 1 + assert G_undirected_reciprocal.has_edge( + "test_graph_node/1", "test_graph_node/2" + ) + assert G_undirected_reciprocal.has_edge( + "test_graph_node/2", "test_graph_node/1" + ) + edge_1_2 = G_undirected_reciprocal["test_graph_node/1"]["test_graph_node/2"] + edge_2_1 = G_undirected_reciprocal["test_graph_node/2"]["test_graph_node/1"] + assert edge_1_2 == edge_2_1 + assert edge_1_2["foo"] == "bar" + def test_reverse_copy(self): G = self.EmptyGraph(incoming_graph_data=[(0, 1), (1, 2)]) R = G.reverse() @@ -240,21 +260,9 @@ def test_reverse_copy(self): def test_reverse_nocopy(self): G = self.EmptyGraph(incoming_graph_data=[(0, 1), (1, 2)]) - R = G.reverse(copy=False) - assert R[1][0] - assert R[2][1] - assert R._pred[0][1] - assert R._pred[1][2] - with pytest.raises(KeyError): - R[0][1] - with pytest.raises(KeyError): - R[1][2] - with pytest.raises(KeyError): - R._pred[1][0] - with pytest.raises(KeyError): - R._pred[2][1] - with pytest.raises(nx.NetworkXError): - R.remove_edge(1, 0) + with pytest.raises(NotImplementedError): + G.reverse(copy=False) + pytest.skip("NotImplementedError: In-place reverse is not supported yet.") def test_reverse_hashable(self): pytest.skip("Class-based nodes are not supported in ArangoDB.") diff --git a/tests/test_graph.py b/tests/test_graph.py index 2f0cd8a8..2eb084d9 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -1044,9 +1044,10 @@ def test_clear_edges(self): G = self.K3Graph() G.graph["name"] = "K3" nodes = list(G.nodes) + assert G.number_of_edges() > 0 G.clear_edges() # clearing only removes local cache! assert list(G.nodes) == nodes - assert G.number_of_edges() == 3 + assert G.number_of_edges() > 0 assert G.graph["name"] == "K3" def test_edges_data(self): diff --git a/tests/test_multidigraph.py b/tests/test_multidigraph.py new file mode 100644 index 00000000..8034228c --- /dev/null +++ b/tests/test_multidigraph.py @@ -0,0 +1,788 @@ +# type: ignore + +import time +from collections import UserDict + +import networkx as nx +import pytest +from networkx.utils import edges_equal + +import nx_arangodb as nxadb + +from .conftest import db +from .test_graph import GRAPH_NAME, get_doc + +# from .test_multigraph import TestEdgeSubgraph as _TestMultiGraphEdgeSubgraph +from .test_multigraph import BaseMultiGraphTester +from .test_multigraph import TestMultiGraph as _TestMultiGraph + + +class BaseMultiDiGraphTester(BaseMultiGraphTester): + def test_adjacency(self): + G = self.K3Graph() + + edge_0_1_id = list(G[0][1])[0] + edge_0_1 = get_doc(edge_0_1_id) + edge_0_2_id = list(G[0][2])[0] + edge_0_2 = get_doc(edge_0_2_id) + edge_1_0_id = list(G[1][0])[0] + edge_1_0 = get_doc(edge_1_0_id) + edge_2_0_id = list(G[2][0])[0] + edge_2_0 = get_doc(edge_2_0_id) + edge_1_2_id = list(G[1][2])[0] + edge_1_2 = get_doc(edge_1_2_id) + edge_2_1_id = list(G[2][1])[0] + edge_2_1 = get_doc(edge_2_1_id) + + assert dict(G.adjacency()) == { + "test_graph_node/0": { + "test_graph_node/1": {edge_0_1_id: edge_0_1}, + "test_graph_node/2": {edge_0_2_id: edge_0_2}, + }, + "test_graph_node/1": { + "test_graph_node/0": {edge_1_0_id: edge_1_0}, + "test_graph_node/2": {edge_1_2_id: edge_1_2}, + }, + "test_graph_node/2": { + "test_graph_node/0": {edge_2_0_id: edge_2_0}, + "test_graph_node/1": {edge_2_1_id: edge_2_1}, + }, + } + + def get_edges_data(self, G): + edges_data = [] + for src, dst, _ in G.edges: + edge_id = list(G[src][dst])[0] + edges_data.append((src, dst, get_doc(edge_id))) + + return sorted(edges_data) + + def test_edges(self): + G = self.K3Graph() + assert edges_equal(G.edges(), self.edges_all) + assert edges_equal(G.edges(0), self.edges_0) + assert edges_equal(G.edges([0, 1]), self.edges_0_1) + pytest.raises((KeyError, nx.NetworkXError), G.edges, -1) + + def test_edges_data(self): + G = self.K3Graph() + edges_data = self.get_edges_data(G) + edges_data_0 = edges_data[0:2] + assert sorted(G.edges(data=True)) == edges_data + assert sorted(G.edges(0, data=True)) == edges_data_0 + pytest.raises((KeyError, nx.NetworkXError), G.neighbors, -1) + + def test_edges_multi(self): + G = self.K3Graph() + assert sorted(G.edges()) == sorted(self.edges_all) + assert sorted(G.edges(0)) == sorted(self.edges_0) + assert G.number_of_edges() == 6 + edge_id = G.add_edge(0, 1) + assert db.has_document(edge_id) + assert G.number_of_edges() == 7 + assert sorted(G.edges()) == sorted( + self.edges_all + [("test_graph_node/0", "test_graph_node/1")] + ) + + def test_out_edges(self): + G = self.K3Graph() + assert sorted(G.out_edges()) == sorted(self.edges_all) + assert sorted(G.out_edges(0)) == sorted(self.edges_0) + pytest.raises((KeyError, nx.NetworkXError), G.out_edges, -1) + edges_0_with_keys = [ + (src, dst, G[src][dst][0]["_id"]) for src, dst in self.edges_0 + ] + assert sorted(G.out_edges(0, keys=True)) == edges_0_with_keys + + def test_out_edges_multi(self): + G = self.K3Graph() + assert sorted(G.out_edges()) == sorted(self.edges_all) + assert sorted(G.out_edges(0)) == sorted(self.edges_0) + edge_id = G.add_edge(0, 1, 2) + assert edge_id != 2 + assert db.has_document(edge_id) + assert sorted(G.edges()) == sorted( + self.edges_all + [("test_graph_node/0", "test_graph_node/1")] + ) + + def test_out_edges_data(self): + G = self.K3Graph() + + edges_data = self.get_edges_data(G) + edges_data_0 = edges_data[0:2] + + assert sorted(G.edges(0, data=True)) == edges_data_0 + G.remove_edge(0, 1) + edge_0_1_new_id = G.add_edge(0, 1, data=1) + edge_0_1_new = get_doc(edge_0_1_new_id) + edge_0_data_new = [ + ("test_graph_node/0", "test_graph_node/1", edge_0_1_new), + edges_data_0[1], + ] + assert sorted(G.edges(0, data=True)) == edge_0_data_new + assert sorted(G.edges(0, data="data")) == [ + ("test_graph_node/0", "test_graph_node/1", 1), + ("test_graph_node/0", "test_graph_node/2", None), + ] + assert sorted(G.edges(0, data="data", default=-1)) == [ + ("test_graph_node/0", "test_graph_node/1", 1), + ("test_graph_node/0", "test_graph_node/2", -1), + ] + + def test_in_edges(self): + G = self.K3Graph() + + edges_0_in = [ + ("test_graph_node/1", "test_graph_node/0"), + ("test_graph_node/2", "test_graph_node/0"), + ] + + assert sorted(G.in_edges()) == sorted(self.edges_all) + assert sorted(G.in_edges(0)) == edges_0_in + pytest.raises((KeyError, nx.NetworkXError), G.in_edges, -1) + G.add_edge(0, 1, 2) + assert sorted(G.in_edges()) == sorted( + self.edges_all + [("test_graph_node/0", "test_graph_node/1")] + ) + assert sorted(G.in_edges(0, keys=True)) == [ + ( + "test_graph_node/1", + "test_graph_node/0", + "test_graph_node_to_test_graph_node/0", + ), + ( + "test_graph_node/2", + "test_graph_node/0", + "test_graph_node_to_test_graph_node/1", + ), + ] + + def test_in_edges_no_keys(self): + G = self.K3Graph() + edges_0_in = [ + ("test_graph_node/1", "test_graph_node/0"), + ("test_graph_node/2", "test_graph_node/0"), + ] + + assert sorted(G.in_edges()) == sorted(self.edges_all) + assert sorted(G.in_edges(0)) == edges_0_in + G.add_edge(0, 1, 2) + assert sorted(G.in_edges()) == sorted( + self.edges_all + [("test_graph_node/0", "test_graph_node/1")] + ) + + edges_data = self.get_edges_data(G) + in_edges_data = G.in_edges(data=True, keys=False) + assert len(in_edges_data) == len(edges_data) + assert len(list(in_edges_data)[0]) == 3 + + def test_in_edges_data(self): + G = self.K3Graph() + list(G[2][0]) + list(G[1][0]) + edge_2_0 = get_doc(G[2][0][0]["_id"]) + edge_1_0 = get_doc(G[1][0][0]["_id"]) + assert sorted(G.in_edges(0, data=True)) == [ + ("test_graph_node/1", "test_graph_node/0", edge_1_0), + ("test_graph_node/2", "test_graph_node/0", edge_2_0), + ] + G.remove_edge(1, 0) + G.add_edge(1, 0, data=1) + edge_1_0 = get_doc(G[1][0][0]["_id"]) + assert sorted(G.in_edges(0, data=True)) == [ + ("test_graph_node/1", "test_graph_node/0", edge_1_0), + ("test_graph_node/2", "test_graph_node/0", edge_2_0), + ] + assert sorted(G.in_edges(0, data="data")) == [ + ("test_graph_node/1", "test_graph_node/0", 1), + ("test_graph_node/2", "test_graph_node/0", None), + ] + assert sorted(G.in_edges(0, data="data", default=-1)) == [ + ("test_graph_node/1", "test_graph_node/0", 1), + ("test_graph_node/2", "test_graph_node/0", -1), + ] + + def is_shallow(self, H, G): + # graph + assert G.graph["foo"] == H.graph["foo"] + G.graph["foo"].append(1) + assert G.graph["foo"] == H.graph["foo"] + # node + assert G.nodes[0]["foo"] == H.nodes[0]["foo"] + G.nodes[0]["foo"].append(1) + assert G.nodes[0]["foo"] == H.nodes[0]["foo"] + # edge + assert G[1][2][0]["foo"] == H[1][2][0]["foo"] + G[1][2][0]["foo"].append(1) + assert G[1][2][0]["foo"] == H[1][2][0]["foo"] + + def is_deep(self, H, G): + # graph + assert G.graph["foo"] == H.graph["foo"] + G.graph["foo"].append(1) + assert G.graph["foo"] != H.graph["foo"] + # node + assert ( + G.nodes["test_graph_node/0"]["foo"] == H.nodes["test_graph_node/0"]["foo"] + ) + G.nodes["test_graph_node/0"]["foo"].append(1) + assert ( + G.nodes["test_graph_node/0"]["foo"] != H.nodes["test_graph_node/0"]["foo"] + ) + # edge + edge_id = G[1][2][0]["_id"] + assert ( + G[1][2][0]["foo"] + == H["test_graph_node/1"]["test_graph_node/2"][edge_id]["foo"] + ) + G[1][2][edge_id]["foo"].append(1) + assert ( + G[1][2][0]["foo"] + != H["test_graph_node/1"]["test_graph_node/2"][edge_id]["foo"] + ) + + def test_to_undirected(self): + # MultiDiGraph -> MultiGraph changes number of edges so it is + # not a copy operation... use is_shallow, not is_shallow_copy + G = self.K3Graph() + self.add_attributes(G) + H = nxadb.MultiGraph(G) + # self.is_shallow(H,G) + # the result is traversal order dependent so we + # can't use the is_shallow() test here. + try: + assert edges_equal( + H.edges(), + [ + ("test_graph_node/0", "test_graph_node/2"), + ("test_graph_node/0", "test_graph_node/1"), + ("test_graph_node/1", "test_graph_node/2"), + ], + ) + except AssertionError: + assert edges_equal( + H.edges(), + [ + ("test_graph_node/0", "test_graph_node/1"), + ("test_graph_node/1", "test_graph_node/2"), + ("test_graph_node/1", "test_graph_node/2"), + ("test_graph_node/2", "test_graph_node/0"), + ], + ) + H = G.to_undirected() + self.is_deep(H, G) + + def test_has_successor(self): + G = self.K3Graph() + assert G.has_successor(0, 1) + assert not G.has_successor(0, -1) + + def test_successors(self): + G = self.K3Graph() + assert sorted(G.successors(0)) == ["test_graph_node/1", "test_graph_node/2"] + pytest.raises((KeyError, nx.NetworkXError), G.successors, -1) + + def test_has_predecessor(self): + G = self.K3Graph() + assert G.has_predecessor(0, 1) + assert not G.has_predecessor(0, -1) + + def test_predecessors(self): + G = self.K3Graph() + assert sorted(G.predecessors(0)) == ["test_graph_node/1", "test_graph_node/2"] + pytest.raises((KeyError, nx.NetworkXError), G.predecessors, -1) + + def test_degree(self): + G = self.K3Graph() + assert sorted(G.degree()) == [ + ("test_graph_node/0", 4), + ("test_graph_node/1", 4), + ("test_graph_node/2", 4), + ] + assert dict(G.degree()) == { + "test_graph_node/0": 4, + "test_graph_node/1": 4, + "test_graph_node/2": 4, + } + assert G.degree(0) == 4 + assert list(G.degree(iter([0]))) == [("test_graph_node/0", 4)] + edge_id = G.add_edge(0, 1, weight=0.3, other=1.2) + doc = db.document(edge_id) + assert doc["weight"] == 0.3 + assert doc["other"] == 1.2 + assert sorted(G.degree(weight="weight")) == [ + ("test_graph_node/0", 4.3), + ("test_graph_node/1", 4.3), + ("test_graph_node/2", 4), + ] + assert sorted(G.degree(weight="other")) == [ + ("test_graph_node/0", 5.2), + ("test_graph_node/1", 5.2), + ("test_graph_node/2", 4), + ] + + def test_in_degree(self): + G = self.K3Graph() + assert sorted(G.in_degree()) == [ + ("test_graph_node/0", 2), + ("test_graph_node/1", 2), + ("test_graph_node/2", 2), + ] + assert dict(G.in_degree()) == { + "test_graph_node/0": 2, + "test_graph_node/1": 2, + "test_graph_node/2": 2, + } + assert G.in_degree(0) == 2 + assert list(G.in_degree(iter([0]))) == [("test_graph_node/0", 2)] + assert G.in_degree(0, weight="weight") == 2 + + def test_out_degree(self): + G = self.K3Graph() + assert sorted(G.out_degree()) == [ + ("test_graph_node/0", 2), + ("test_graph_node/1", 2), + ("test_graph_node/2", 2), + ] + assert dict(G.out_degree()) == { + "test_graph_node/0": 2, + "test_graph_node/1": 2, + "test_graph_node/2": 2, + } + assert G.out_degree(0) == 2 + assert list(G.out_degree(iter([0]))) == [("test_graph_node/0", 2)] + assert G.out_degree(0, weight="weight") == 2 + + def test_size(self): + G = self.K3Graph() + assert G.size() == 6 + assert G.number_of_edges() == 6 + G.add_edge(0, 1, weight=0.3, other=1.2) + assert G.number_of_edges() == 7 + assert round(G.size(weight="weight"), 2) == 6.3 + assert round(G.size(weight="other"), 2) == 7.2 + + def test_to_undirected_reciprocal(self): + G = self.EmptyGraph() + assert G.number_of_edges() == 0 + G.add_edge(1, 2) + assert G.number_of_edges() == 1 + + G_undirected = G.to_undirected() + assert G_undirected.number_of_edges() == 1 + assert G_undirected.has_edge("test_graph_node/1", "test_graph_node/2") + assert G_undirected.has_edge("test_graph_node/2", "test_graph_node/1") + + G_undirected_reciprocal = G.to_undirected(reciprocal=True) + assert G_undirected_reciprocal.number_of_edges() == 0 + assert not G_undirected_reciprocal.has_edge( + "test_graph_node/1", "test_graph_node/2" + ) + + edge_2_1_id = G.add_edge("test_graph_node/2", "test_graph_node/1", foo="bar") + assert G.number_of_edges() == 2 + G_undirected_reciprocal = G.to_undirected(reciprocal=True) + assert G_undirected_reciprocal.number_of_edges() == 2 + assert G_undirected_reciprocal.has_edge( + "test_graph_node/1", "test_graph_node/2" + ) + assert G_undirected_reciprocal.has_edge( + "test_graph_node/2", "test_graph_node/1" + ) + # notice how edge_1_2 now has the same data as edge_2_1 (+ the same _id) + edge_1_2 = G_undirected_reciprocal["test_graph_node/1"]["test_graph_node/2"][ + edge_2_1_id + ] + edge_2_1 = G_undirected_reciprocal["test_graph_node/2"]["test_graph_node/1"][ + edge_2_1_id + ] + assert edge_1_2 == edge_2_1 + assert edge_1_2["foo"] == "bar" + + def test_reverse_copy(self): + G = self.EmptyGraph([(0, 1), (0, 1)]) + R = G.reverse() + assert sorted(R.edges()) == [ + ("test_graph_node/1", "test_graph_node/0"), + ("test_graph_node/1", "test_graph_node/0"), + ] + R.remove_edge("test_graph_node/1", "test_graph_node/0") + assert sorted(R.edges()) == [("test_graph_node/1", "test_graph_node/0")] + assert sorted(G.edges()) == [ + ("test_graph_node/0", "test_graph_node/1"), + ("test_graph_node/0", "test_graph_node/1"), + ] + + def test_reverse_nocopy(self): + G = self.EmptyGraph([(0, 1), (0, 1)]) + with pytest.raises(NotImplementedError): + G.reverse(copy=False) # nocopy not supported yet + pytest.skip("NotImplementedError: In-place reverse is not supported yet.") + # assert sorted(R.edges()) == [ + # ("test_graph_node/1", "test_graph_node/0"), + # ("test_graph_node/1", "test_graph_node/0"), + # ] + # pytest.raises(nx.NetworkXError, R.remove_edge, 1, 0) + + def test_di_attributes_cached(self): + G = self.K3Graph().copy() + assert id(G.in_edges) == id(G.in_edges) + assert id(G.out_edges) == id(G.out_edges) + assert id(G.in_degree) == id(G.in_degree) + assert id(G.out_degree) == id(G.out_degree) + assert id(G.succ) == id(G.succ) + assert id(G.pred) == id(G.pred) + + +class TestMultiDiGraph(BaseMultiDiGraphTester, _TestMultiGraph): + def setup_method(self): + self.Graph = nx.MultiDiGraph + # build K3 + self.k3edges = [(0, 1), (0, 2), (1, 2)] + self.k3nodes = [0, 1, 2] + self.K3 = self.Graph() + self.K3._succ = {0: {}, 1: {}, 2: {}} + # K3._adj is synced with K3._succ + self.K3._pred = {0: {}, 1: {}, 2: {}} + for u in self.k3nodes: + for v in self.k3nodes: + if u == v: + continue + d = {0: {}} + self.K3._succ[u][v] = d + self.K3._pred[v][u] = d + self.K3._node = {} + self.K3._node[0] = {} + self.K3._node[1] = {} + self.K3._node[2] = {} + + def nxadb_graph_constructor(*args, **kwargs) -> nxadb.MultiDiGraph: + db.delete_graph(GRAPH_NAME, drop_collections=True, ignore_missing=True) + G = nxadb.MultiDiGraph(*args, **kwargs, name=GRAPH_NAME) + # Experimenting with a delay to see if it helps with CircleCI... + time.sleep(0.10) + return G + + self.K3Graph = lambda *args, **kwargs: nxadb_graph_constructor( + *args, **kwargs, incoming_graph_data=self.K3 + ) + self.EmptyGraph = lambda *args, **kwargs: nxadb_graph_constructor( + *args, **kwargs + ) + self.k3nodes = ["test_graph_node/0", "test_graph_node/1", "test_graph_node/2"] + + self.edges_all = [ + ("test_graph_node/0", "test_graph_node/1"), + ("test_graph_node/1", "test_graph_node/0"), + ("test_graph_node/0", "test_graph_node/2"), + ("test_graph_node/2", "test_graph_node/0"), + ("test_graph_node/1", "test_graph_node/2"), + ("test_graph_node/2", "test_graph_node/1"), + ] + self.edges_0 = [ + ("test_graph_node/0", "test_graph_node/1"), + ("test_graph_node/0", "test_graph_node/2"), + ] + self.edges_0_1 = [ + ("test_graph_node/0", "test_graph_node/2"), + ("test_graph_node/0", "test_graph_node/1"), + ("test_graph_node/1", "test_graph_node/2"), + ("test_graph_node/1", "test_graph_node/0"), + ] + + def test_add_edge(self): + G = self.EmptyGraph() + edge_id = G.add_edge(0, 1) + edge_doc = get_doc(edge_id) + assert G._adj == { + "test_graph_node/0": {"test_graph_node/1": {edge_id: edge_doc}}, + "test_graph_node/1": {}, + } + assert G._succ == { + "test_graph_node/0": {"test_graph_node/1": {edge_id: edge_doc}}, + "test_graph_node/1": {}, + } + assert G._pred == { + "test_graph_node/0": {}, + "test_graph_node/1": {"test_graph_node/0": {edge_id: edge_doc}}, + } + G = self.EmptyGraph() + edge_id = G.add_edge(*(0, 1)) + edge_doc = get_doc(edge_id) + assert G._adj == { + "test_graph_node/0": {"test_graph_node/1": {edge_id: edge_doc}}, + "test_graph_node/1": {}, + } + assert G._succ == { + "test_graph_node/0": {"test_graph_node/1": {edge_id: edge_doc}}, + "test_graph_node/1": {}, + } + assert G._pred == { + "test_graph_node/0": {}, + "test_graph_node/1": {"test_graph_node/0": {edge_id: edge_doc}}, + } + with pytest.raises(ValueError, match="Key cannot be None"): + G.add_edge(None, 3) + + def test_add_edges_from(self): + G = self.EmptyGraph() + G.add_edges_from([(0, 1), (0, 1, {"weight": 3})]) + edge_0_1_0_id = G[0][1][0]["_id"] + edge_0_1_0 = get_doc(edge_0_1_0_id) + edge_0_1_1_id = G[0][1][1]["_id"] + edge_0_1_1 = get_doc(edge_0_1_1_id) + assert edge_0_1_1["weight"] == 3 + assert G._adj == { + "test_graph_node/0": { + "test_graph_node/1": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + } + }, + "test_graph_node/1": {}, + } + + assert G._succ == { + "test_graph_node/0": { + "test_graph_node/1": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + } + }, + "test_graph_node/1": {}, + } + + assert G._pred == { + "test_graph_node/1": { + "test_graph_node/0": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + } + }, + "test_graph_node/0": {}, + } + + G.add_edges_from([(0, 1), (0, 1, {"weight": 3})], weight=2) + edge_0_1_2_id = G[0][1][2]["_id"] + edge_0_1_2 = get_doc(edge_0_1_2_id) + assert edge_0_1_2["weight"] == 2 + + edge_0_1_3_id = G[0][1][3]["_id"] + edge_0_1_3 = get_doc(edge_0_1_3_id) + assert edge_0_1_3["weight"] == 3 + + assert G._adj == { + "test_graph_node/0": { + "test_graph_node/1": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + edge_0_1_2_id: edge_0_1_2, + edge_0_1_3_id: edge_0_1_3, + } + }, + "test_graph_node/1": {}, + } + + assert G._succ == { + "test_graph_node/0": { + "test_graph_node/1": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + edge_0_1_2_id: edge_0_1_2, + edge_0_1_3_id: edge_0_1_3, + } + }, + "test_graph_node/1": {}, + } + + assert G._pred == { + "test_graph_node/1": { + "test_graph_node/0": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + edge_0_1_2_id: edge_0_1_2, + edge_0_1_3_id: edge_0_1_3, + } + }, + "test_graph_node/0": {}, + } + + G = self.EmptyGraph() + edges = [ + (0, 1, {"weight": 3}), + (0, 1, (("weight", 2),)), + (0, 1, 5), + (0, 1, "s"), + ] + G.add_edges_from(edges) + + edge_0_1_0_id = G[0][1][0]["_id"] + edge_0_1_0 = get_doc(edge_0_1_0_id) + assert edge_0_1_0["weight"] == 3 + + edge_0_1_1_id = G[0][1][1]["_id"] + edge_0_1_1 = get_doc(edge_0_1_1_id) + assert edge_0_1_1["weight"] == 2 + + edge_0_1_2_id = G[0][1][2]["_id"] + edge_0_1_2 = get_doc(edge_0_1_2_id) + assert edge_0_1_2_id != 5 + assert "weight" not in edge_0_1_2 + + edge_0_1_3_id = G[0][1][3]["_id"] + edge_0_1_3 = get_doc(edge_0_1_3_id) + assert edge_0_1_3_id != "s" + assert "weight" not in edge_0_1_3 + + assert G._succ == { + "test_graph_node/0": { + "test_graph_node/1": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + edge_0_1_2_id: edge_0_1_2, + edge_0_1_3_id: edge_0_1_3, + } + }, + "test_graph_node/1": {}, + } + + assert G._pred == { + "test_graph_node/1": { + "test_graph_node/0": { + edge_0_1_0_id: edge_0_1_0, + edge_0_1_1_id: edge_0_1_1, + edge_0_1_2_id: edge_0_1_2, + edge_0_1_3_id: edge_0_1_3, + } + }, + "test_graph_node/0": {}, + } + + # too few in tuple + pytest.raises(nx.NetworkXError, G.add_edges_from, [(0,)]) + # too many in tuple + pytest.raises(nx.NetworkXError, G.add_edges_from, [(0, 1, 2, 3, 4)]) + # not a tuple + pytest.raises(TypeError, G.add_edges_from, [0]) + with pytest.raises(ValueError, match="Key cannot be None"): + G.add_edges_from([(None, 3), (3, 2)]) + + def test_remove_edge(self): + G = self.K3Graph() + assert db.has_document(list(G[0][1])[0]) + G.remove_edge(0, 1) + with pytest.raises(KeyError): + G[0][1] + + edge_1_0_id = list(G[1][0])[0] + edge_1_0 = get_doc(edge_1_0_id) + + edge_0_2_id = list(G[0][2])[0] + edge_0_2 = get_doc(edge_0_2_id) + + edge_2_0_id = list(G[2][0])[0] + edge_2_0 = get_doc(edge_2_0_id) + + edge_1_2_id = list(G[1][2])[0] + edge_1_2 = get_doc(edge_1_2_id) + + edge_2_1_id = list(G[2][1])[0] + edge_2_1 = get_doc(edge_2_1_id) + + assert G._succ == { + "test_graph_node/0": {"test_graph_node/2": {edge_0_2_id: edge_0_2}}, + "test_graph_node/1": { + "test_graph_node/0": {edge_1_0_id: edge_1_0}, + "test_graph_node/2": {edge_1_2_id: edge_1_2}, + }, + "test_graph_node/2": { + "test_graph_node/0": {edge_2_0_id: edge_2_0}, + "test_graph_node/1": {edge_2_1_id: edge_2_1}, + }, + } + + assert G._pred == { + "test_graph_node/0": { + "test_graph_node/1": {edge_1_0_id: edge_1_0}, + "test_graph_node/2": {edge_2_0_id: edge_2_0}, + }, + "test_graph_node/1": { + "test_graph_node/2": {edge_2_1_id: edge_2_1}, + }, + "test_graph_node/2": { + "test_graph_node/0": {edge_0_2_id: edge_0_2}, + "test_graph_node/1": {edge_1_2_id: edge_1_2}, + }, + } + + pytest.raises((KeyError, nx.NetworkXError), G.remove_edge, -1, 0) + pytest.raises((KeyError, nx.NetworkXError), G.remove_edge, 0, 2, key=1) + + +# TODO: Revisit +# Subgraphing not implemented yet +# class TestEdgeSubgraph(_TestMultiGraphEdgeSubgraph): +# """Unit tests for the :meth:`MultiDiGraph.edge_subgraph` method.""" + +# def setup_method(self): +# # Create a quadruply-linked path graph on five nodes. +# G = nx.MultiDiGraph() +# nx.add_path(G, range(5)) +# nx.add_path(G, range(5)) +# nx.add_path(G, reversed(range(5))) +# nx.add_path(G, reversed(range(5))) +# # Add some node, edge, and graph attributes. +# for i in range(5): +# G.nodes[i]["name"] = f"node{i}" +# G.adj[0][1][0]["name"] = "edge010" +# G.adj[0][1][1]["name"] = "edge011" +# G.adj[3][4][0]["name"] = "edge340" +# G.adj[3][4][1]["name"] = "edge341" +# G.graph["name"] = "graph" +# # Get the subgraph induced by one of the first edges and one of +# # the last edges. +# self.G = G +# self.H = G.edge_subgraph([(0, 1, 0), (3, 4, 1)]) + + +# class CustomDictClass(UserDict): +# pass + + +# class MultiDiGraphSubClass(nx.MultiDiGraph): +# node_dict_factory = CustomDictClass # type: ignore[assignment] +# node_attr_dict_factory = CustomDictClass # type: ignore[assignment] +# adjlist_outer_dict_factory = CustomDictClass # type: ignore[assignment] +# adjlist_inner_dict_factory = CustomDictClass # type: ignore[assignment] +# edge_key_dict_factory = CustomDictClass # type: ignore[assignment] +# edge_attr_dict_factory = CustomDictClass # type: ignore[assignment] +# graph_attr_dict_factory = CustomDictClass # type: ignore[assignment] + + +# class TestMultiDiGraphSubclass(TestMultiDiGraph): +# def setup_method(self): +# self.Graph = MultiDiGraphSubClass +# # build K3 +# self.k3edges = [(0, 1), (0, 2), (1, 2)] +# self.k3nodes = [0, 1, 2] +# self.K3 = self.Graph() +# self.K3._succ = self.K3.adjlist_outer_dict_factory( +# { +# 0: self.K3.adjlist_inner_dict_factory(), +# 1: self.K3.adjlist_inner_dict_factory(), +# 2: self.K3.adjlist_inner_dict_factory(), +# } +# ) +# # K3._adj is synced with K3._succ +# self.K3._pred = {0: {}, 1: {}, 2: {}} +# for u in self.k3nodes: +# for v in self.k3nodes: +# if u == v: +# continue +# d = {0: {}} +# self.K3._succ[u][v] = d +# self.K3._pred[v][u] = d +# self.K3._node = self.K3.node_dict_factory() +# self.K3._node[0] = self.K3.node_attr_dict_factory() +# self.K3._node[1] = self.K3.node_attr_dict_factory() +# self.K3._node[2] = self.K3.node_attr_dict_factory() diff --git a/tests/test_multigraph.py b/tests/test_multigraph.py index 1380999a..7a8dfd30 100644 --- a/tests/test_multigraph.py +++ b/tests/test_multigraph.py @@ -11,12 +11,10 @@ from nx_arangodb.classes.dict.adj import EdgeAttrDict, EdgeKeyDict from .conftest import db -from .test_graph import BaseAttrGraphTester +from .test_graph import GRAPH_NAME, BaseAttrGraphTester from .test_graph import TestGraph as _TestGraph from .test_graph import get_doc -GRAPH_NAME = "test_graph" - class BaseMultiGraphTester(BaseAttrGraphTester): def test_has_edge(self): @@ -276,7 +274,10 @@ def nxadb_graph_constructor(*args, **kwargs) -> nxadb.MultiGraph: def test_data_input(self): G = self.EmptyGraph({1: [2], 2: [1]}) - assert G.number_of_edges() == 1 + if G.is_directed(): + assert G.number_of_edges() == 2 + else: + assert G.number_of_edges() == 1 assert G.number_of_nodes() == 2 assert sorted(G.adj.items()) == [ ("test_graph_node/1", G.adj[1]), @@ -481,13 +482,24 @@ def test_remove_node(self): G.remove_node(0) assert 0 not in G.nodes assert G.number_of_nodes() == 2 - assert G.number_of_edges() == 1 assert len(G[1][2]) == 1 + edge_1_2 = get_doc(list(G[1][2])[0]) - assert G.adj == { - "test_graph_node/1": {"test_graph_node/2": {edge_1_2["_id"]: edge_1_2}}, - "test_graph_node/2": {"test_graph_node/1": {edge_1_2["_id"]: edge_1_2}}, - } + if G.is_directed(): + edge_2_1 = get_doc(list(G[2][1])[0]) + assert edge_2_1["_id"] != edge_1_2["_id"] + assert G.number_of_edges() == 2 + assert G.adj == { + "test_graph_node/1": {"test_graph_node/2": {edge_1_2["_id"]: edge_1_2}}, + "test_graph_node/2": {"test_graph_node/1": {edge_2_1["_id"]: edge_2_1}}, + } + else: + assert G.adj == { + "test_graph_node/1": {"test_graph_node/2": {edge_1_2["_id"]: edge_1_2}}, + "test_graph_node/2": {"test_graph_node/1": {edge_1_2["_id"]: edge_1_2}}, + } + assert G.number_of_edges() == 1 + with pytest.raises(nx.NetworkXError): G.remove_node(-1)