diff --git a/bigquery_magics/bigquery.py b/bigquery_magics/bigquery.py index 33a01d0..af0e93e 100644 --- a/bigquery_magics/bigquery.py +++ b/bigquery_magics/bigquery.py @@ -596,19 +596,31 @@ def _handle_result(result, args): return result -def _is_colab() -> bool: - """Check if code is running in Google Colab""" - try: - import google.colab # noqa: F401 +def _colab_query_callback(query: str, params: str): + return IPython.core.display.JSON( + graph_server.convert_graph_data(query_results=json.loads(params)) + ) - return True - except ImportError: - return False +def _colab_node_expansion_callback(request: dict, params_str: str): + """Handle node expansion requests in Google Colab environment + + Args: + request: A dictionary containing node expansion details including: + - uid: str - Unique identifier of the node to expand + - node_labels: List[str] - Labels of the node + - node_properties: List[Dict] - Properties of the node with key, value, and type + - direction: str - Direction of expansion ("INCOMING" or "OUTGOING") + - edge_label: Optional[str] - Label of edges to filter by + params_str: A JSON string containing connection parameters -def _colab_callback(query: str, params: str): + Returns: + JSON: A JSON-serialized response containing either: + - The query results with nodes and edges + - An error message if the request failed + """ return IPython.core.display.JSON( - graph_server.convert_graph_data(query_results=json.loads(params)) + graph_server.execute_node_expansion(params_str, request) ) @@ -628,20 +640,30 @@ def _add_graph_widget(query_result): # visualizer widget. In colab, we are not able to create an http server on a # background thread, so we use a special colab-specific api to register a callback, # to be invoked from Javascript. - if _is_colab(): + port = None + try: from google.colab import output - output.register_callback("graph_visualization.Query", _colab_callback) - else: + output.register_callback("graph_visualization.Query", _colab_query_callback) + output.register_callback( + "graph_visualization.NodeExpansion", _colab_node_expansion_callback + ) + + # In colab mode, the Javascript doesn't use the port value we pass in, as there is no + # graph server, but it still has to be set to avoid triggering an exception. + # TODO: Clean this up when the Javascript is fixed on the spanner-graph-notebook side. + port = 0 + except ImportError: global singleton_server_thread alive = singleton_server_thread and singleton_server_thread.is_alive() if not alive: singleton_server_thread = graph_server.graph_server.init() + port = graph_server.graph_server.port # Create html to invoke the graph server html_content = generate_visualization_html( query="placeholder query", - port=graph_server.graph_server.port, + port=port, params=query_result.to_json().replace("\\", "\\\\").replace('"', '\\"'), ) IPython.display.display(IPython.core.display.HTML(html_content)) @@ -656,11 +678,13 @@ def _is_valid_json(s: str): def _supports_graph_widget(query_result: pandas.DataFrame): - num_rows, num_columns = query_result.shape + # Visualization is supported if we have any json items to display. + # (Non-json items are excluded from visualization, but we still want to bring up + # the visualizer for the json items.) for column in query_result.columns: - if not query_result[column].apply(_is_valid_json).all(): - return False - return True + if query_result[column].apply(_is_valid_json).any(): + return True + return False def _make_bq_query( diff --git a/bigquery_magics/graph_server.py b/bigquery_magics/graph_server.py index 6104328..263a027 100644 --- a/bigquery_magics/graph_server.py +++ b/bigquery_magics/graph_server.py @@ -20,6 +20,10 @@ from typing import Dict, List +def execute_node_expansion(params, request): + return {"error": "Node expansion not yet implemented"} + + def convert_graph_data(query_results: Dict[str, Dict[str, str]]): """ Converts graph data to the form expected by the visualization framework. @@ -49,16 +53,12 @@ def convert_graph_data(query_results: Dict[str, Dict[str, str]]): # does not even get called unless spanner_graphs has already been confirmed # to exist upstream. from google.cloud.spanner_v1.types import StructType, Type, TypeCode - import networkx - from spanner_graphs.conversion import ( - columns_to_native_numpy, - prepare_data_for_graphing, - ) + from spanner_graphs.conversion import get_nodes_edges try: fields: List[StructType.Field] = [] data = {} - rows = [] + tabular_data = {} for key, value in query_results.items(): column_name = None column_value = None @@ -73,45 +73,39 @@ def convert_graph_data(query_results: Dict[str, Dict[str, str]]): StructType.Field(name=column_name, type=Type(code=TypeCode.JSON)) ) data[column_name] = [] + tabular_data[column_name] = [] for value_key, value_value in column_value.items(): - if not isinstance(value_key, str): - raise ValueError( - f"Expected inner key to be str, got {type(value_key)}" - ) - if not isinstance(value_value, str): - raise ValueError( - f"Expected inner value to be str, got {type(value_value)}" - ) - row_json = json.loads(value_value) - - if row_json is not None: + try: + row_json = json.loads(value_value) data[column_name].append(row_json) - rows.append([row_json]) - - d, ignored_columns = columns_to_native_numpy(data, fields) - - graph: networkx.classes.DiGraph = prepare_data_for_graphing( - incoming=d, schema_json=None - ) - - nodes = [] - for node_id, node in graph.nodes(data=True): - nodes.append(node) - - edges = [] - for from_id, to_id, edge in graph.edges(data=True): - edges.append(edge) + tabular_data[column_name].append(row_json) + except (ValueError, TypeError): + # Non-JSON columns cannot be visualized, but we still want them + # in the tabular view. + tabular_data[column_name].append(str(value_value)) + + nodes, edges = get_nodes_edges(data, fields, schema_json=None) + + # Convert nodes and edges to json objects. + # (Unfortunately, the code coverage tooling does not allow this + # to be expressed as list comprehension). + nodes_json = [] + for node in nodes: + nodes_json.append(node.to_json()) + edges_json = [] + for edge in edges: + edges_json.append(edge.to_json()) return { "response": { # These fields populate the graph result view. - "nodes": nodes, - "edges": edges, + "nodes": nodes_json, + "edges": edges_json, # This populates the visualizer's schema view, but not yet implemented on the # BigQuery side. "schema": None, # This field is used to populate the visualizer's tabular view. - "query_result": data, + "query_result": tabular_data, } } except Exception as e: @@ -133,6 +127,7 @@ class GraphServer: endpoints = { "get_ping": "/get_ping", "post_ping": "/post_ping", + "post_node_expansion": "/post_node_expansion", "post_query": "/post_query", } @@ -228,6 +223,24 @@ def handle_post_query(self): response = convert_graph_data(query_results=json.loads(data["params"])) self.do_data_response(response) + def handle_post_node_expansion(self): + """Handle POST requests for node expansion. + + Expects a JSON payload with: + - params: A JSON string containing connection parameters (project, instance, database, graph) + - request: A dictionary with node details (uid, node_labels, node_properties, direction, edge_label) + """ + data = self.parse_post_data() + + # Execute node expansion with: + # - params_str: JSON string with connection parameters (project, instance, database, graph) + # - request: Dict with node details (uid, node_labels, node_properties, direction, edge_label) + self.do_data_response( + execute_node_expansion( + params=data.get("params"), request=data.get("request") + ) + ) + def do_GET(self): assert self.path == GraphServer.endpoints["get_ping"] self.handle_get_ping() @@ -235,6 +248,8 @@ def do_GET(self): def do_POST(self): if self.path == GraphServer.endpoints["post_ping"]: self.handle_post_ping() + elif self.path == GraphServer.endpoints["post_node_expansion"]: + self.handle_post_node_expansion() else: assert self.path == GraphServer.endpoints["post_query"] self.handle_post_query() diff --git a/noxfile.py b/noxfile.py index 562dd6b..efae5e8 100644 --- a/noxfile.py +++ b/noxfile.py @@ -481,7 +481,7 @@ def prerelease_deps(session, protobuf_implementation): ] for dep in prerel_deps: - session.install("--pre", "--no-deps", "--upgrade", dep) + session.install("--pre", "--no-deps", "--upgrade", dep) # Remaining dependencies other_deps = [ @@ -489,6 +489,14 @@ def prerelease_deps(session, protobuf_implementation): ] session.install(*other_deps) + # Install spanner-graph-notebook, python-bigquery, and python-bigquery-storage + # from main to detect any potential breaking changes. For context, see: + # https://github.com/googleapis/python-bigquery-pandas/issues/854 + session.install( + "https://github.com/cloudspannerecosystem/spanner-graph-notebook/archive/refs/heads/main.zip", + "https://github.com/googleapis/python-bigquery/archive/main.zip", + "https://github.com/googleapis/python-bigquery-storage/archive/main.zip", + ) # Print out prerelease package versions session.run( "python", "-c", "import google.protobuf; print(google.protobuf.__version__)" diff --git a/owlbot.py b/owlbot.py index 4347527..904bd9f 100644 --- a/owlbot.py +++ b/owlbot.py @@ -59,6 +59,7 @@ # Multi-processing note isn't relevant, as bigquery-magics is responsible for # creating clients, not the end user. "docs/multiprocessing.rst", + "noxfile.py", "README.rst", ".github/workflows/unittest.yml", ], diff --git a/setup.py b/setup.py index 8311429..1d7c595 100644 --- a/setup.py +++ b/setup.py @@ -57,8 +57,7 @@ "bigframes": ["bigframes >= 1.17.0"], "geopandas": ["geopandas >= 1.0.1"], "spanner-graph-notebook": [ - "spanner-graph-notebook >= 1.1.1, <=1.1.1", - "networkx", + "spanner-graph-notebook >= 1.1.3", "portpicker", ], } diff --git a/tests/unit/test_bigquery.py b/tests/unit/test_bigquery.py index 33db7bc..83a4356 100644 --- a/tests/unit/test_bigquery.py +++ b/tests/unit/test_bigquery.py @@ -891,8 +891,8 @@ def test_bigquery_graph_colab(monkeypatch): graph_visualization is None or bigquery_storage is None, reason="Requires `spanner-graph-notebook` and `google-cloud-bigquery-storage`", ) -def test_colab_callback(): - result = bigquery_magics.bigquery._colab_callback( +def test_colab_query_callback(): + result = bigquery_magics.bigquery._colab_query_callback( "query", json.dumps({"result": {}}) ) assert result.data == { @@ -905,6 +905,26 @@ def test_colab_callback(): } +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif( + graph_visualization is None or bigquery_storage is None, + reason="Requires `spanner-graph-notebook` and `google-cloud-bigquery-storage`", +) +def test_colab_node_expansion_callback(): + result = bigquery_magics.bigquery._colab_node_expansion_callback( + request={ + "uid": "test_uid", + "node_labels": ["label1, label2"], + "node_properites": {}, + "direction": "INCOMING", + "edge_label": None, + }, + params_str="{}", + ) + + assert result.data == {"error": "Node expansion not yet implemented"} + + @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif( graph_visualization is not None or bigquery_storage is None, @@ -932,7 +952,18 @@ def test_bigquery_graph_missing_spanner_deps(monkeypatch): "google.cloud.bigquery_storage.BigQueryReadClient", bqstorage_mock ) sql = "SELECT graph_json FROM t" - result = pandas.DataFrame([], columns=["graph_json"]) + graph_json_rows = [ + """ + [{"identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQI=","kind":"node","labels":["Person"],"properties":{"birthday":"1991-12-21T08:00:00Z","city":"Adelaide","country":"Australia","id":1,"name":"Alex"}},{"destination_node_identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEO","identifier":"mUZpbkdyYXBoLlBlcnNvbk93bkFjY291bnQAeJECkQ6ZRmluR3JhcGguUGVyc29uAHiRAplGaW5HcmFwaC5BY2NvdW50AHiRDg==","kind":"edge","labels":["Owns"],"properties":{"account_id":7,"create_time":"2020-01-10T14:22:20.222Z","id":1},"source_node_identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQI="},{"identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEO","kind":"node","labels":["Account"],"properties":{"create_time":"2020-01-10T14:22:20.222Z","id":7,"is_blocked":false,"nick_name":"Vacation Fund"}}] + """, + """ + [{"identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQY=","kind":"node","labels":["Person"],"properties":{"birthday":"1986-12-07T08:00:00Z","city":"Kollam","country":"India","id":3,"name":"Lee"}},{"destination_node_identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEg","identifier":"mUZpbkdyYXBoLlBlcnNvbk93bkFjY291bnQAeJEGkSCZRmluR3JhcGguUGVyc29uAHiRBplGaW5HcmFwaC5BY2NvdW50AHiRIA==","kind":"edge","labels":["Owns"],"properties":{"account_id":16,"create_time":"2020-02-18T13:44:20.655Z","id":3},"source_node_identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQY="},{"identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEg","kind":"node","labels":["Account"],"properties":{"create_time":"2020-01-28T01:55:09.206Z","id":16,"is_blocked":true,"nick_name":"Vacation Fund"}}] + """, + """ + [{"identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQQ=","kind":"node","labels":["Person"],"properties":{"birthday":"1980-10-31T08:00:00Z","city":"Moravia","country":"Czech_Republic","id":2,"name":"Dana"}},{"destination_node_identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEo","identifier":"mUZpbkdyYXBoLlBlcnNvbk93bkFjY291bnQAeJEEkSiZRmluR3JhcGguUGVyc29uAHiRBJlGaW5HcmFwaC5BY2NvdW50AHiRKA==","kind":"edge","labels":["Owns"],"properties":{"account_id":20,"create_time":"2020-01-28T01:55:09.206Z","id":2},"source_node_identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQQ="},{"identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEo","kind":"node","labels":["Account"],"properties":{"create_time":"2020-02-18T13:44:20.655Z","id":20,"is_blocked":false,"nick_name":"Rainy Day Fund"}}] + """, + ] + result = pandas.DataFrame(graph_json_rows, columns=["graph_json"]) run_query_patch = mock.patch("bigquery_magics.bigquery._run_query", autospec=True) display_patch = mock.patch("IPython.display.display", autospec=True) query_job_mock = mock.create_autospec( diff --git a/tests/unit/test_graph_server.py b/tests/unit/test_graph_server.py index 434b722..b56e0c6 100644 --- a/tests/unit/test_graph_server.py +++ b/tests/unit/test_graph_server.py @@ -116,18 +116,17 @@ def _validate_nodes_and_edges(result): for edge in result["response"]["edges"]: - assert "id" in edge - assert edge["label"] == "Owns" - assert "source" in edge - assert "target" in edge + assert "source_node_identifier" in edge + assert "destination_node_identifier" in edge + assert "identifier" in edge + assert "Owns" in edge["labels"] assert "properties" in edge + print(result["response"]["nodes"]) for node in result["response"]["nodes"]: - assert "id" in node - assert "key_property_names" in node - assert node["label"] in ("Account", "Person") + assert "identifier" in node + assert "Account" in node["labels"] or "Person" in node["labels"] assert "properties" in node - assert "value" in node @pytest.mark.skipif( @@ -169,23 +168,26 @@ def test_convert_one_column_one_row_one_column(): @pytest.mark.skipif( graph_visualization is None, reason="Requires `spanner-graph-notebook`" ) -def test_convert_one_column_one_row_one_column_null_json(): +def test_convert_one_column_two_rows_one_column_null_json(): result = graph_server.convert_graph_data( { "result": { "0": json.dumps(None), + "1": json.dumps(row_alex_owns_account), } } ) - assert result == { - "response": { - "edges": [], - "nodes": [], - "query_result": {"result": []}, - "schema": None, - }, + # Null JSON element should be ignored in visualization, but should still be present in tabular view. + assert len(result["response"]["nodes"]) == 2 + assert len(result["response"]["edges"]) == 1 + + _validate_nodes_and_edges(result) + + assert result["response"]["query_result"] == { + "result": [None, row_alex_owns_account] } + assert result["response"]["schema"] is None _validate_nodes_and_edges(result) @@ -228,7 +230,6 @@ def test_convert_one_row_two_columns(): }, } ) - print(json.dumps(result)) assert len(result["response"]["nodes"]) == 4 assert len(result["response"]["edges"]) == 2 @@ -288,29 +289,29 @@ def test_convert_outer_value_not_dict(): @pytest.mark.skipif( graph_visualization is None, reason="Requires `spanner-graph-notebook`" ) -def test_convert_inner_key_not_string(): +def test_convert_inner_value_not_string(): result = graph_server.convert_graph_data( { - "result": { - 0: json.dumps({"foo": 1, "bar": 2}), - } + "col1": { + "0": json.dumps(row_alex_owns_account), + }, + "col2": { + "0": 12345, + }, } ) - assert result == {"error": "Expected inner key to be str, got "} + # Non-JSON column should be ignored in visualizer view, but still appear in tabular view. + assert len(result["response"]["nodes"]) == 2 + assert len(result["response"]["edges"]) == 1 -@pytest.mark.skipif( - graph_visualization is None, reason="Requires `spanner-graph-notebook`" -) -def test_convert_inner_value_not_string(): - result = graph_server.convert_graph_data( - { - "result": { - "0": 1, - } - } - ) - assert result == {"error": "Expected inner value to be str, got "} + _validate_nodes_and_edges(result) + + assert result["response"]["query_result"] == { + "col1": [row_alex_owns_account], + "col2": ["12345"], + } + assert result["response"]["schema"] is None @pytest.mark.skipif( @@ -415,6 +416,45 @@ def test_post_query(self): ) self.assertIsNone(response_data["schema"]) + @pytest.mark.skipif( + graph_visualization is None, reason="Requires `spanner-graph-notebook`" + ) + def test_post_node_expansion(self): + self.assertTrue(self.server_thread.is_alive()) + route = graph_server.graph_server.build_route( + graph_server.GraphServer.endpoints["post_node_expansion"] + ) + request = { + "request": { + "uid": "test_uid", + "node_labels": ["label1, label2"], + "node_properites": {}, + "direction": "INCOMING", + "edge_label": None, + }, + "params": "{}", + } + response = requests.post(route, json={"params": json.dumps(request)}) + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.json(), {"error": "Node expansion not yet implemented"} + ) + + @pytest.mark.skipif( + graph_visualization is None, reason="Requires `spanner-graph-notebook`" + ) + def test_post_node_expansion_invalid_request(self): + self.assertTrue(self.server_thread.is_alive()) + route = graph_server.graph_server.build_route( + graph_server.GraphServer.endpoints["post_node_expansion"] + ) + request = {} + response = requests.post(route, json={"params": json.dumps(request)}) + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.json(), {"error": "Node expansion not yet implemented"} + ) + def test_stop_server_never_started(): graph_server.graph_server.stop_server()